mirror of
https://github.com/robertmartin8/PyPortfolioOpt.git
synced 2022-11-27 18:02:41 +03:00
migrated min_vol to cvxpy (with l2 reg)
This commit is contained in:
@@ -0,0 +1,20 @@
|
||||
from .black_litterman import (
|
||||
market_implied_prior_returns,
|
||||
market_implied_risk_aversion,
|
||||
BlackLittermanModel,
|
||||
)
|
||||
from .cla import CLA
|
||||
from .discrete_allocation import get_latest_prices, DiscreteAllocation
|
||||
from .efficient_frontier import EfficientFrontier
|
||||
from .hierarchical_risk_parity import HRPOpt
|
||||
|
||||
__all__ = [
|
||||
"market_implied_prior_returns",
|
||||
"market_implied_risk_aversion",
|
||||
"BlackLittermanModel",
|
||||
"CLA",
|
||||
"get_latest_prices",
|
||||
"DiscreteAllocation",
|
||||
"EfficientFrontier",
|
||||
"HRPOpt",
|
||||
]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
The ``base_optimizer`` module houses the parent classes ``BaseOptimizer`` and
|
||||
``BaseScipyOptimizer``, from which all optimisers will inherit. The later is for
|
||||
``BaseConvexOptimizer``, from which all optimisers will inherit. The later is for
|
||||
optimisers that use the scipy solver.
|
||||
|
||||
Additionally, we define a general utility function ``portfolio_performance`` to
|
||||
@@ -10,6 +10,7 @@ evaluate return and risk for a given set of portfolio weights.
|
||||
import json
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import cvxpy as cp
|
||||
from . import objective_functions
|
||||
|
||||
|
||||
@@ -96,7 +97,7 @@ class BaseOptimizer:
|
||||
f.write(str(clean_weights))
|
||||
|
||||
|
||||
class BaseScipyOptimizer(BaseOptimizer):
|
||||
class BaseConvexOptimizer(BaseOptimizer):
|
||||
|
||||
"""
|
||||
Instance variables:
|
||||
@@ -105,9 +106,13 @@ class BaseScipyOptimizer(BaseOptimizer):
|
||||
- ``tickers`` - str list
|
||||
- ``weights`` - np.ndarray
|
||||
- ``bounds`` - float tuple OR (float tuple) list
|
||||
- ``initial_guess`` - np.ndarray
|
||||
- ``constraints`` - dict list
|
||||
- ``opt_method`` - the optimisation algorithm to use. Defaults to SLSQP.
|
||||
|
||||
Public methods:
|
||||
|
||||
- ``set_weights()`` creates self.weights (np.ndarray) from a weights dict
|
||||
- ``clean_weights()`` rounds the weights and clips near-zeros.
|
||||
- ``save_weights_to_file()`` saves the weights to csv, json, or txt.
|
||||
"""
|
||||
|
||||
def __init__(self, n_assets, tickers=None, weight_bounds=(0, 1)):
|
||||
@@ -118,46 +123,58 @@ class BaseScipyOptimizer(BaseOptimizer):
|
||||
:type weight_bounds: tuple OR tuple list, optional
|
||||
"""
|
||||
super().__init__(n_assets, tickers)
|
||||
self.bounds = self._make_valid_bounds(weight_bounds)
|
||||
# Optimisation parameters
|
||||
self.initial_guess = np.array([1 / self.n_assets] * self.n_assets)
|
||||
self.constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]
|
||||
self.opt_method = "SLSQP"
|
||||
|
||||
def _make_valid_bounds(self, test_bounds):
|
||||
# Optimisation variables
|
||||
self._w = cp.Variable(n_assets)
|
||||
self._objective = None
|
||||
self._additional_objectives = []
|
||||
self._constraints = []
|
||||
self._map_bounds_to_constraints(weight_bounds)
|
||||
|
||||
def _map_bounds_to_constraints(self, test_bounds):
|
||||
"""
|
||||
Private method: process input bounds into a form acceptable by scipy.optimize,
|
||||
and check the validity of said bounds.
|
||||
Process input bounds into a form acceptable by cvxpy and add to the constraints list.
|
||||
|
||||
:param test_bounds: minimum and maximum weight of each asset OR single min/max pair
|
||||
if all identical, defaults to (0, 1).
|
||||
:type test_bounds: tuple OR list/tuple of tuples.
|
||||
:raises ValueError: if ``test_bounds`` is not a tuple of length two OR a collection
|
||||
of pairs.
|
||||
:raises ValueError: if the lower bound is too high
|
||||
:return: a tuple of bounds, e.g ((0, 1), (0, 1), (0, 1) ...)
|
||||
:rtype: tuple of tuples
|
||||
if all identical OR pair of arrays corresponding to lower/upper bounds. defaults to (0, 1).
|
||||
:type test_bounds: tuple OR list/tuple of tuples OR pair of np arrays
|
||||
:raises TypeError: if ``test_bounds`` is not of the right type
|
||||
:return: bounds suitable for cvxpy
|
||||
:rtype: tuple pair of np.ndarray
|
||||
"""
|
||||
# If it is a collection with the right length, assume they are all bounds.
|
||||
if len(test_bounds) == self.n_assets and not isinstance(
|
||||
test_bounds[0], (float, int)
|
||||
):
|
||||
bounds = test_bounds
|
||||
bounds = np.array(test_bounds, dtype=np.float)
|
||||
lower = np.nan_to_num(bounds[:, 0], nan=-np.inf)
|
||||
upper = np.nan_to_num(bounds[:, 1], nan=np.inf)
|
||||
else:
|
||||
if len(test_bounds) != 2 or not isinstance(test_bounds, tuple):
|
||||
raise ValueError(
|
||||
"test_bounds must be a tuple of (lower bound, upper bound) "
|
||||
"OR collection of bounds for each asset"
|
||||
# Otherwise this must be a pair.
|
||||
if len(test_bounds) != 2 or not isinstance(test_bounds, (tuple, list)):
|
||||
raise TypeError(
|
||||
"test_bounds must be a pair (lower bound, upper bound) "
|
||||
"OR a collection of bounds for each asset"
|
||||
)
|
||||
bounds = (test_bounds,) * self.n_assets
|
||||
lower, upper = test_bounds
|
||||
|
||||
# Ensure lower bound is not too high
|
||||
if sum((0 if b[0] is None else b[0]) for b in bounds) > 1:
|
||||
raise ValueError(
|
||||
"Lower bound is too high. Impossible to construct valid portfolio"
|
||||
)
|
||||
# Replace None values with the appropriate infinity.
|
||||
if np.isscalar(lower) or lower is None:
|
||||
lower = -np.inf if lower is None else lower
|
||||
upper = np.inf if upper is None else upper
|
||||
else:
|
||||
lower = np.nan_to_num(lower, nan=-np.inf)
|
||||
upper = np.nan_to_num(upper, nan=np.inf)
|
||||
|
||||
return bounds
|
||||
self._constraints.append(self._w >= lower)
|
||||
self._constraints.append(self._w <= upper)
|
||||
|
||||
@staticmethod
|
||||
def _make_scipy_bounds():
|
||||
"""
|
||||
Convert the current cvxpy bounds to scipy bounds
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def portfolio_performance(
|
||||
@@ -199,7 +216,8 @@ def portfolio_performance(
|
||||
new_weights = np.asarray(weights)
|
||||
else:
|
||||
raise ValueError("Weights is None")
|
||||
sigma = np.sqrt(objective_functions.volatility(new_weights, cov_matrix))
|
||||
|
||||
sigma = np.sqrt(objective_functions.portfolio_variance(new_weights, cov_matrix))
|
||||
mu = new_weights.dot(expected_returns)
|
||||
|
||||
sharpe = -objective_functions.negative_sharpe(
|
||||
|
||||
@@ -6,14 +6,16 @@ generates optimal portfolios for various possible objective functions and parame
|
||||
import warnings
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import cvxpy as cp
|
||||
import scipy.optimize as sco
|
||||
from . import exceptions
|
||||
from . import objective_functions, base_optimizer
|
||||
|
||||
|
||||
class EfficientFrontier(base_optimizer.BaseScipyOptimizer):
|
||||
class EfficientFrontier(base_optimizer.BaseConvexOptimizer):
|
||||
|
||||
"""
|
||||
An EfficientFrontier object (inheriting from BaseScipyOptimizer) contains multiple
|
||||
An EfficientFrontier object (inheriting from BaseConvexOptimizer) contains multiple
|
||||
optimisation methods that can be called (corresponding to different objective
|
||||
functions) with various parameters.
|
||||
|
||||
@@ -24,8 +26,8 @@ class EfficientFrontier(base_optimizer.BaseScipyOptimizer):
|
||||
- ``n_assets`` - int
|
||||
- ``tickers`` - str list
|
||||
- ``bounds`` - float tuple OR (float tuple) list
|
||||
- ``cov_matrix`` - pd.DataFrame
|
||||
- ``expected_returns`` - pd.Series
|
||||
- ``cov_matrix`` - np.ndarray
|
||||
- ``expected_returns`` - np.ndarray
|
||||
|
||||
- Optimisation parameters:
|
||||
|
||||
@@ -67,33 +69,104 @@ class EfficientFrontier(base_optimizer.BaseScipyOptimizer):
|
||||
:raises TypeError: if ``cov_matrix`` is not a dataframe or array
|
||||
"""
|
||||
# Inputs
|
||||
self.cov_matrix = cov_matrix
|
||||
if expected_returns is not None:
|
||||
if not isinstance(expected_returns, (pd.Series, list, np.ndarray)):
|
||||
raise TypeError("expected_returns is not a series, list or array")
|
||||
if not isinstance(cov_matrix, (pd.DataFrame, np.ndarray)):
|
||||
raise TypeError("cov_matrix is not a dataframe or array")
|
||||
self.expected_returns = expected_returns
|
||||
self.cov_matrix = EfficientFrontier._validate_cov_matrix(cov_matrix)
|
||||
self.expected_returns = EfficientFrontier._validate_expected_returns(
|
||||
expected_returns
|
||||
)
|
||||
|
||||
# Labels
|
||||
if isinstance(expected_returns, pd.Series):
|
||||
tickers = list(expected_returns.index)
|
||||
elif isinstance(cov_matrix, pd.DataFrame):
|
||||
tickers = list(cov_matrix.columns)
|
||||
else:
|
||||
else: # use integer labels
|
||||
tickers = list(range(len(expected_returns)))
|
||||
|
||||
if cov_matrix.shape != (len(expected_returns), len(expected_returns)):
|
||||
raise ValueError("Covariance matrix does not match expected returns")
|
||||
|
||||
super().__init__(len(tickers), tickers, weight_bounds)
|
||||
|
||||
if not isinstance(gamma, (int, float)):
|
||||
raise ValueError("gamma should be numeric")
|
||||
if gamma < 0:
|
||||
warnings.warn("in most cases, gamma should be positive", UserWarning)
|
||||
self.gamma = gamma
|
||||
@staticmethod
|
||||
def _validate_expected_returns(expected_returns):
|
||||
if expected_returns is None:
|
||||
raise ValueError("expected_returns must be provided")
|
||||
elif isinstance(expected_returns, pd.Series):
|
||||
return expected_returns.values
|
||||
elif isinstance(expected_returns, list):
|
||||
return np.array(expected_returns)
|
||||
elif isinstance(expected_returns, np.ndarray):
|
||||
return expected_returns.ravel()
|
||||
else:
|
||||
raise TypeError("expected_returns is not a series, list or array")
|
||||
|
||||
@staticmethod
|
||||
def _validate_cov_matrix(cov_matrix):
|
||||
if cov_matrix is None:
|
||||
raise ValueError("cov_matrix must be provided")
|
||||
elif isinstance(cov_matrix, pd.DataFrame):
|
||||
return cov_matrix.values
|
||||
elif isinstance(cov_matrix, np.ndarray):
|
||||
return cov_matrix
|
||||
else:
|
||||
raise TypeError("cov_matrix is not a series, list or array")
|
||||
|
||||
def add_objective(self, new_objective, **kwargs):
|
||||
"""
|
||||
Add a new term into the objective function. This term must be convex,
|
||||
and built from cvxpy atomic functions.
|
||||
|
||||
Example:
|
||||
|
||||
def L1_norm(w, k=1):
|
||||
return k * cp.norm(w, 1)
|
||||
|
||||
ef.add_objective(L1_norm, k=2)
|
||||
|
||||
:param new_objective: the objective to be added
|
||||
:type new_objective: cp.Expression (i.e function of cp.Variable)
|
||||
"""
|
||||
self._additional_objectives.append(new_objective(self._w, **kwargs))
|
||||
|
||||
def add_constraint(self, new_constraint):
|
||||
"""
|
||||
Add a new constraint to the optimisation problem. This constraint must be linear and
|
||||
must be either an equality or simple inequality.
|
||||
|
||||
Examples:
|
||||
|
||||
ef.add_constraint(lambda x : x[0] == 0.02)
|
||||
ef.add_constraint(lambda x : x >= 0.01)
|
||||
ef.add_constraint(lambda x: x <= np.array([0.01, 0.08, ..., 0.5]))
|
||||
|
||||
:param new_constraint: the constraint to be added
|
||||
:type constraintfunc: lambda function
|
||||
"""
|
||||
if not callable(new_constraint):
|
||||
raise TypeError("New constraint must be provided as a lambda function")
|
||||
self._constraints.append(new_constraint(self._w))
|
||||
|
||||
def convex_optimize(custom_objective, constraints):
|
||||
pass
|
||||
|
||||
def nonconvex_optimize(custom_objective, constraints):
|
||||
# opt using scip
|
||||
# args = (self.cov_matrix, self.gamma)
|
||||
# result = sco.minimize(
|
||||
# objective_functions.volatility,
|
||||
# x0=self.initial_guess,
|
||||
# args=args,
|
||||
# method=self.opt_method,
|
||||
# bounds=self.bounds,
|
||||
# constraints=self.constraints,
|
||||
# )
|
||||
# self.weights = result["x"]
|
||||
pass
|
||||
|
||||
def max_sharpe(self, risk_free_rate=0.02):
|
||||
"""
|
||||
Maximise the Sharpe Ratio. The result is also referred to as the tangency portfolio,
|
||||
as it is the tangent to the efficient frontier curve that intercepts the risk-free
|
||||
rate.
|
||||
as it is the portfolio for which the capital market line is tangent to the efficient frontier.
|
||||
|
||||
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02.
|
||||
The period of the risk-free rate should correspond to the
|
||||
@@ -125,16 +198,23 @@ class EfficientFrontier(base_optimizer.BaseScipyOptimizer):
|
||||
:return: asset weights for the volatility-minimising portfolio
|
||||
:rtype: dict
|
||||
"""
|
||||
args = (self.cov_matrix, self.gamma)
|
||||
result = sco.minimize(
|
||||
objective_functions.volatility,
|
||||
x0=self.initial_guess,
|
||||
args=args,
|
||||
method=self.opt_method,
|
||||
bounds=self.bounds,
|
||||
constraints=self.constraints,
|
||||
self._objective = objective_functions.portfolio_variance(
|
||||
self._w, self.cov_matrix
|
||||
)
|
||||
self.weights = result["x"]
|
||||
for obj in self._additional_objectives:
|
||||
self._objective += obj
|
||||
|
||||
self._constraints.append(cp.sum(self._w) == 1)
|
||||
|
||||
try:
|
||||
opt = cp.Problem(cp.Minimize(self._objective), self._constraints)
|
||||
except TypeError:
|
||||
raise exceptions.OptimizationError
|
||||
|
||||
opt.solve()
|
||||
if opt.status != "optimal":
|
||||
raise exceptions.OptimizationError
|
||||
self.weights = self._w.value.round(20)
|
||||
return dict(zip(self.tickers, self.weights))
|
||||
|
||||
def max_unconstrained_utility(self, risk_aversion=1):
|
||||
@@ -224,7 +304,7 @@ class EfficientFrontier(base_optimizer.BaseScipyOptimizer):
|
||||
"Market neutrality requires shorting - bounds have been amended",
|
||||
RuntimeWarning,
|
||||
)
|
||||
self.bounds = self._make_valid_bounds((-1, 1))
|
||||
self.bounds = self._map_bounds_to_constraints((-1, 1))
|
||||
constraints = [
|
||||
{"type": "eq", "fun": lambda x: np.sum(x)},
|
||||
target_constraint,
|
||||
@@ -283,7 +363,7 @@ class EfficientFrontier(base_optimizer.BaseScipyOptimizer):
|
||||
"Market neutrality requires shorting - bounds have been amended",
|
||||
RuntimeWarning,
|
||||
)
|
||||
self.bounds = self._make_valid_bounds((-1, 1))
|
||||
self.bounds = self._map_bounds_to_constraints((-1, 1))
|
||||
constraints = [
|
||||
{"type": "eq", "fun": lambda x: np.sum(x)},
|
||||
target_constraint,
|
||||
|
||||
@@ -12,7 +12,9 @@ class OptimizationError(Exception):
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
default_message = "Please check your constraints or use a different solver."
|
||||
default_message = (
|
||||
"Please check your objectives/constraints or use a different solver."
|
||||
)
|
||||
|
||||
if not (args or kwargs):
|
||||
args = (default_message,)
|
||||
|
||||
@@ -19,7 +19,45 @@ Currently implemented:
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import scipy.stats
|
||||
import cvxpy as cp
|
||||
import pandas as pd
|
||||
|
||||
|
||||
def _objective_value(w, obj):
|
||||
"""
|
||||
Helper method to return either the value of the objective function
|
||||
or the objective function as a cvxpy object depending on whether
|
||||
w is a cvxpy variable or np array.
|
||||
|
||||
:param w: weights
|
||||
:type w: np.ndarray OR cp.Variable
|
||||
:param obj: objective function expression
|
||||
:type obj: cp.Expression
|
||||
:return: value of the objective function OR objective function expression
|
||||
:rtype: float OR cp.Expression
|
||||
"""
|
||||
if isinstance(w, np.ndarray):
|
||||
if np.isscalar(obj.value):
|
||||
return obj.value
|
||||
else:
|
||||
return obj.value.item()
|
||||
else:
|
||||
return obj
|
||||
|
||||
|
||||
def portfolio_variance(w, cov_matrix):
|
||||
if isinstance(w, pd.Series):
|
||||
w = w.values
|
||||
|
||||
variance = cp.quad_form(w, cov_matrix)
|
||||
return _objective_value(w, variance)
|
||||
|
||||
|
||||
def L2_reg(w, gamma=1):
|
||||
if isinstance(w, pd.Series):
|
||||
w = w.values
|
||||
L2_reg = gamma * cp.sum_squares(w)
|
||||
return _objective_value(w, L2_reg)
|
||||
|
||||
|
||||
def negative_mean_return(weights, expected_returns):
|
||||
@@ -107,32 +145,33 @@ def negative_quadratic_utility(
|
||||
return -(mu - 0.5 * risk_aversion * portfolio_volatility) + L2_reg
|
||||
|
||||
|
||||
def negative_cvar(weights, returns, s=10000, beta=0.95, random_state=None):
|
||||
"""
|
||||
Calculate the negative CVaR. Though we want the "min CVaR portfolio", we
|
||||
actually need to maximise the expected return of the worst q% cases, thus
|
||||
we need this value to be negative.
|
||||
# def negative_cvar(weights, returns, s=10000, beta=0.95, random_state=None):
|
||||
# """
|
||||
# Calculate the negative CVaR. Though we want the "min CVaR portfolio", we
|
||||
# actually need to maximise the expected return of the worst q% cases, thus
|
||||
# we need this value to be negative.
|
||||
|
||||
:param weights: asset weights of the portfolio
|
||||
:type weights: np.ndarray
|
||||
:param returns: asset returns
|
||||
:type returns: pd.DataFrame or np.ndarray
|
||||
:param s: number of bootstrap draws, defaults to 10000
|
||||
:type s: int, optional
|
||||
:param beta: "significance level" (i. 1 - q), defaults to 0.95
|
||||
:type beta: float, optional
|
||||
:param random_state: seed for random sampling, defaults to None
|
||||
:type random_state: int, optional
|
||||
:return: negative CVaR
|
||||
:rtype: float
|
||||
"""
|
||||
np.random.seed(seed=random_state)
|
||||
# Calcualte the returns given the weights
|
||||
portfolio_returns = (weights * returns).sum(axis=1)
|
||||
# Sample from the historical distribution
|
||||
dist = scipy.stats.gaussian_kde(portfolio_returns)
|
||||
sample = dist.resample(s)
|
||||
# Calculate the value at risk
|
||||
var = portfolio_returns.quantile(1 - beta)
|
||||
# Mean of all losses worse than the value at risk
|
||||
return -sample[sample < var].mean()
|
||||
# :param weights: asset weights of the portfolio
|
||||
# :type weights: np.ndarray
|
||||
# :param returns: asset returns
|
||||
# :type returns: pd.DataFrame or np.ndarray
|
||||
# :param s: number of bootstrap draws, defaults to 10000
|
||||
# :type s: int, optional
|
||||
# :param beta: "significance level" (i. 1 - q), defaults to 0.95
|
||||
# :type beta: float, optional
|
||||
# :param random_state: seed for random sampling, defaults to None
|
||||
# :type random_state: int, optional
|
||||
# :return: negative CVaR
|
||||
# :rtype: float
|
||||
# """
|
||||
# import scipy.stats
|
||||
# np.random.seed(seed=random_state)
|
||||
# # Calcualte the returns given the weights
|
||||
# portfolio_returns = (weights * returns).sum(axis=1)
|
||||
# # Sample from the historical distribution
|
||||
# dist = scipy.stats.gaussian_kde(portfolio_returns)
|
||||
# sample = dist.resample(s)
|
||||
# # Calculate the value at risk
|
||||
# var = portfolio_returns.quantile(1 - beta)
|
||||
# # Mean of all losses worse than the value at risk
|
||||
# return -sample[sample < var].mean()
|
||||
|
||||
@@ -2,7 +2,8 @@ import json
|
||||
import os
|
||||
import numpy as np
|
||||
import pytest
|
||||
from pypfopt.efficient_frontier import EfficientFrontier
|
||||
from pypfopt import EfficientFrontier
|
||||
from pypfopt import exceptions
|
||||
from tests.utilities_for_tests import get_data, setup_efficient_frontier
|
||||
|
||||
|
||||
@@ -10,7 +11,7 @@ def test_custom_upper_bound():
|
||||
ef = EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=(0, 0.10)
|
||||
)
|
||||
ef.max_sharpe()
|
||||
ef.min_volatility()
|
||||
ef.portfolio_performance()
|
||||
assert ef.weights.max() <= 0.1
|
||||
np.testing.assert_almost_equal(ef.weights.sum(), 1)
|
||||
@@ -20,7 +21,7 @@ def test_custom_lower_bound():
|
||||
ef = EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=(0.02, 1)
|
||||
)
|
||||
ef.max_sharpe()
|
||||
ef.min_volatility()
|
||||
assert ef.weights.min() >= 0.02
|
||||
np.testing.assert_almost_equal(ef.weights.sum(), 1)
|
||||
|
||||
@@ -29,18 +30,18 @@ def test_custom_bounds_same():
|
||||
ef = EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=(0.03, 0.13)
|
||||
)
|
||||
ef.max_sharpe()
|
||||
ef.min_volatility()
|
||||
assert ef.weights.min() >= 0.03
|
||||
assert ef.weights.max() <= 0.13
|
||||
np.testing.assert_almost_equal(ef.weights.sum(), 1)
|
||||
|
||||
|
||||
def test_custom_bounds_different():
|
||||
def test_custom_bounds_different_values():
|
||||
bounds = [(0.01, 0.13), (0.02, 0.11)] * 10
|
||||
ef = EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=bounds
|
||||
)
|
||||
ef.max_sharpe()
|
||||
ef.min_volatility()
|
||||
assert (0.01 <= ef.weights[::2]).all() and (ef.weights[::2] <= 0.13).all()
|
||||
assert (0.02 <= ef.weights[1::2]).all() and (ef.weights[1::2] <= 0.11).all()
|
||||
np.testing.assert_almost_equal(ef.weights.sum(), 1)
|
||||
@@ -51,22 +52,49 @@ def test_custom_bounds_different():
|
||||
)
|
||||
|
||||
|
||||
def test_bounds_errors():
|
||||
with pytest.raises(ValueError):
|
||||
EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=(0.06, 1)
|
||||
def test_bound_input_types():
|
||||
bounds = [0.01, 0.13]
|
||||
ef = EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=bounds
|
||||
)
|
||||
lb = np.array([0.01, 0.02] * 10)
|
||||
ub = np.array([0.07, 0.2] * 10)
|
||||
assert EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=(lb, ub)
|
||||
)
|
||||
bounds = ((0.01, 0.13), (0.02, 0.11)) * 10
|
||||
assert EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=bounds
|
||||
)
|
||||
|
||||
|
||||
def test_bound_failure():
|
||||
# Ensure optimisation fails when lower bound is too high or upper bound is too low
|
||||
ef = EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=(0.06, 0.13)
|
||||
)
|
||||
with pytest.raises(exceptions.OptimizationError):
|
||||
ef.min_volatility()
|
||||
|
||||
ef = EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=(0, 0.04)
|
||||
)
|
||||
with pytest.raises(exceptions.OptimizationError):
|
||||
ef.min_volatility()
|
||||
|
||||
|
||||
def test_bounds_errors():
|
||||
assert EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=(0, 1)
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
with pytest.raises(TypeError):
|
||||
EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=(0.06, 1, 3)
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
with pytest.raises(TypeError):
|
||||
# Not enough bounds
|
||||
bounds = [(0.01, 0.13), (0.02, 0.11)] * 5
|
||||
EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=bounds
|
||||
@@ -75,7 +103,7 @@ def test_bounds_errors():
|
||||
|
||||
def test_clean_weights():
|
||||
ef = setup_efficient_frontier()
|
||||
ef.max_sharpe()
|
||||
ef.min_volatility()
|
||||
number_tiny_weights = sum(ef.weights < 1e-4)
|
||||
cleaned = ef.clean_weights(cutoff=1e-4, rounding=5)
|
||||
cleaned_weights = cleaned.values()
|
||||
@@ -91,7 +119,7 @@ def test_clean_weights_short():
|
||||
ef = EfficientFrontier(
|
||||
*setup_efficient_frontier(data_only=True), weight_bounds=(-1, 1)
|
||||
)
|
||||
ef.max_sharpe()
|
||||
ef.min_volatility()
|
||||
# In practice we would never use such a high cutoff
|
||||
number_tiny_weights = sum(np.abs(ef.weights) < 0.05)
|
||||
cleaned = ef.clean_weights(cutoff=0.05)
|
||||
@@ -104,7 +132,7 @@ def test_clean_weights_error():
|
||||
ef = setup_efficient_frontier()
|
||||
with pytest.raises(AttributeError):
|
||||
ef.clean_weights()
|
||||
ef.max_sharpe()
|
||||
ef.min_volatility()
|
||||
with pytest.raises(ValueError):
|
||||
ef.clean_weights(rounding=1.3)
|
||||
with pytest.raises(ValueError):
|
||||
@@ -114,7 +142,7 @@ def test_clean_weights_error():
|
||||
|
||||
def test_clean_weights_no_rounding():
|
||||
ef = setup_efficient_frontier()
|
||||
ef.max_sharpe()
|
||||
ef.min_volatility()
|
||||
# ensure the call does not fail
|
||||
# in previous commits, this call would raise a ValueError
|
||||
cleaned = ef.clean_weights(rounding=None, cutoff=0)
|
||||
@@ -136,7 +164,7 @@ def test_efficient_frontier_init_errors():
|
||||
|
||||
def test_set_weights():
|
||||
ef = setup_efficient_frontier()
|
||||
w1 = ef.max_sharpe()
|
||||
w1 = ef.min_volatility()
|
||||
test_weights = ef.weights
|
||||
ef.min_volatility()
|
||||
ef.set_weights(w1)
|
||||
@@ -145,7 +173,7 @@ def test_set_weights():
|
||||
|
||||
def test_save_weights_to_file():
|
||||
ef = setup_efficient_frontier()
|
||||
ef.max_sharpe()
|
||||
ef.min_volatility()
|
||||
ef.save_weights_to_file("tests/test.txt")
|
||||
with open("tests/test.txt", "r") as f:
|
||||
file = f.read()
|
||||
|
||||
951
tests/test_efficient_frontier.py
Executable file → Normal file
951
tests/test_efficient_frontier.py
Executable file → Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user