sector constraints #10

This commit is contained in:
robertmartin8
2020-04-28 16:02:11 +08:00
parent fa0cadfb68
commit 2bda7e5dd9
5 changed files with 389 additions and 27 deletions

View File

@@ -97,11 +97,26 @@ Basic Usage
risk-aversion parameter, which gives a useful estimate in the absence of other risk-aversion parameter, which gives a useful estimate in the absence of other
information! information!
.. caution:: .. automethod:: efficient_risk
If you pass an unreasonable target into :py:meth:`efficient_risk` or .. caution::
:py:meth:`efficient_return`, the optimiser will fail silently and return
weird weights. *Caveat emptor* applies! If you pass an unreasonable target into :py:meth:`efficient_risk` or
:py:meth:`efficient_return`, the optimiser will fail silently and return
weird weights. *Caveat emptor* applies!
.. automethod:: portfolio_performance
.. tip::
If you would like to use the ``portfolio_performance`` function independently of any
optimiser (e.g for debugging purposes), you can use::
from pypfopt import base_optimizer
base_optimizer.portfolio_performance(
weights, expected_returns, cov_matrix, verbose=True, risk_free_rate=0.02
)
Adding objectives and constraints Adding objectives and constraints
================================= =================================
@@ -114,6 +129,8 @@ add constraints and objectives are documented below:
.. automethod:: add_constraint .. automethod:: add_constraint
.. automethod:: add_sector_constraints
.. automethod:: add_objective .. automethod:: add_objective
@@ -164,6 +181,7 @@ used to make them larger).
universes, or if you want more non-negligible weights in the final portfolio, universes, or if you want more non-negligible weights in the final portfolio,
increase ``gamma``. increase ``gamma``.
.. _custom-optimisation: .. _custom-optimisation:
Custom optimisation problems Custom optimisation problems

View File

@@ -8,6 +8,7 @@ evaluate return and risk for a given set of portfolio weights.
""" """
import json import json
import warnings
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import cvxpy as cp import cvxpy as cp
@@ -138,7 +139,6 @@ class BaseConvexOptimizer(BaseOptimizer):
self._w = cp.Variable(n_assets) self._w = cp.Variable(n_assets)
self._objective = None self._objective = None
self._additional_objectives = [] self._additional_objectives = []
self._additional_constraints_raw = []
self._constraints = [] self._constraints = []
self._lower_bounds = None self._lower_bounds = None
self._upper_bounds = None self._upper_bounds = None
@@ -233,12 +233,45 @@ class BaseConvexOptimizer(BaseOptimizer):
""" """
if not callable(new_constraint): if not callable(new_constraint):
raise TypeError("New constraint must be provided as a lambda function") raise TypeError("New constraint must be provided as a lambda function")
# Save raw constraint (needed for e.g max_sharpe)
self._additional_constraints_raw.append(new_constraint)
# Add constraint
self._constraints.append(new_constraint(self._w)) self._constraints.append(new_constraint(self._w))
def add_sector_constraints(self, sector_mapper, sector_lower, sector_upper):
"""
Add sector constraints, e.g portfolio's exposure to tech must be less than x%::
sector_mapper = {
"GOOG": "tech",
"FB": "tech",,
"XOM": "Oil/Gas",
"RRC": "Oil/Gas",
"MA": "Financials",
"JPM": "Financials",
}
sector_lower = {"tech": 0.1} # at least 10% to tech
sector_upper = {
"tech": 0.4, # less than 40% tech
"Oil/Gas: 0.1 # less than 10% oil and gas
}
:param sector_mapper: dict that maps tickers to sectors
:type sector_mapper: {str: str} dict
:param sector_lower: lower bounds for each sector
:type sector_lower: {str: float} dict
:param sector_upper: upper bounds for each sector
:type sector_upper: {str:float} dict
"""
if np.any(self._lower_bounds < 0):
warnings.warn(
"Sector constraints may not produce reasonable results if shorts are allowed."
)
for sector in sector_upper:
is_sector = [v == sector for k, v in sector_mapper.items()]
self._constraints.append(cp.sum(self._w[is_sector]) <= sector_upper[sector])
for sector in sector_lower:
is_sector = [v == sector for k, v in sector_mapper.items()]
self._constraints.append(cp.sum(self._w[is_sector]) >= sector_lower[sector])
def convex_objective(self, custom_objective, weights_sum_to_one=True, **kwargs): def convex_objective(self, custom_objective, weights_sum_to_one=True, **kwargs):
""" """
Optimise a custom convex objective function. Constraints should be added with Optimise a custom convex objective function. Constraints should be added with

View File

@@ -185,22 +185,30 @@ class EfficientFrontier(base_optimizer.BaseConvexOptimizer):
for obj in self._additional_objectives: for obj in self._additional_objectives:
self._objective += obj self._objective += obj
# Overwrite original constraints with suitable constraints new_constraints = []
# for the transformed max_sharpe problem # Must rebuild the constraints
for constr in self._constraints:
if isinstance(constr, cp.constraints.nonpos.Inequality):
# Either the first or second item is the expression
if isinstance(
constr.args[0], cp.expressions.constants.constant.Constant
):
new_constraints.append(constr.args[1] >= constr.args[0] * k)
else:
new_constraints.append(constr.args[0] <= constr.args[1] * k)
elif isinstance(constr, cp.constraints.zero.Equality):
new_constraints.append(constr.args[0] == constr.args[1] * k)
else:
raise TypeError(
"Please check that your constraints are in a suitable format"
)
# Transformed max_sharpe convex problem:
self._constraints = [ self._constraints = [
(self.expected_returns - risk_free_rate).T * self._w == 1, (self.expected_returns - risk_free_rate).T * self._w == 1,
cp.sum(self._w) == k, cp.sum(self._w) == k,
k >= 0, k >= 0,
] ] + new_constraints
#  Rebuild original constraints with scaling factor
for raw_constr in self._additional_constraints_raw:
self._constraints.append(raw_constr(self._w / k))
# Sharpe ratio is invariant w.r.t scaled weights, so we must
# replace infinities and negative infinities
# new_lower_bound = np.nan_to_num(self._lower_bounds, neginf=-1)
# new_upper_bound = np.nan_to_num(self._upper_bounds, posinf=1)
self._constraints.append(self._w >= k * self._lower_bounds)
self._constraints.append(self._w <= k * self._upper_bounds)
self._solve_cvxpy_opt_problem() self._solve_cvxpy_opt_problem()
# Inverse-transform # Inverse-transform

View File

@@ -17,6 +17,20 @@ def test_custom_convex_equal_weights():
np.testing.assert_allclose(ef.weights, np.array([1 / 20] * 20)) np.testing.assert_allclose(ef.weights, np.array([1 / 20] * 20))
def test_custom_convex_abs_exposure():
ef = EfficientFrontier(
*setup_efficient_frontier(data_only=True), weight_bounds=(None, None)
)
ef.add_constraint(lambda x: cp.norm(x, 1) <= 2)
ef.min_volatility()
ef.convex_objective(
objective_functions.portfolio_variance,
cov_matrix=ef.cov_matrix,
weights_sum_to_one=False,
)
def test_custom_convex_min_var(): def test_custom_convex_min_var():
ef = setup_efficient_frontier() ef = setup_efficient_frontier()
ef.min_volatility() ef.min_volatility()

View File

@@ -118,7 +118,7 @@ def test_min_volatility_short():
def test_min_volatility_L2_reg(): def test_min_volatility_L2_reg():
ef = setup_efficient_frontier() ef = setup_efficient_frontier()
ef.add_objective(objective_functions.L2_reg, gamma=5) ef.add_objective(objective_functions.L2_reg, gamma=1)
weights = ef.min_volatility() weights = ef.min_volatility()
assert isinstance(weights, dict) assert isinstance(weights, dict)
assert set(weights.keys()) == set(ef.tickers) assert set(weights.keys()) == set(ef.tickers)
@@ -137,7 +137,7 @@ def test_min_volatility_L2_reg():
np.testing.assert_allclose( np.testing.assert_allclose(
ef.portfolio_performance(), ef.portfolio_performance(),
(0.2382083649754719, 0.20795460936504614, 1.049307662098637), (0.23129890623344232, 0.1955254118258614, 1.080672349748733),
) )
@@ -214,6 +214,57 @@ def test_min_volatility_cvxpy_vs_scipy():
assert cvxpy_var <= scipy_var assert cvxpy_var <= scipy_var
def test_min_volatility_sector_constraints():
sector_mapper = {
"GOOG": "tech",
"AAPL": "tech",
"FB": "tech",
"AMZN": "tech",
"BABA": "tech",
"GE": "utility",
"AMD": "tech",
"WMT": "retail",
"BAC": "fig",
"GM": "auto",
"T": "auto",
"UAA": "airline",
"SHLD": "retail",
"XOM": "energy",
"RRC": "energy",
"BBY": "retail",
"MA": "fig",
"PFE": "pharma",
"JPM": "fig",
"SBUX": "retail",
}
sector_upper = {
"tech": 0.2,
"utility": 0.1,
"retail": 0.2,
"fig": 0.4,
"airline": 0.05,
"energy": 0.2,
}
sector_lower = {"utility": 0.01, "fig": 0.02, "airline": 0.01}
# ef = setup_efficient_frontier()
ef = EfficientFrontier(
*setup_efficient_frontier(data_only=True), weight_bounds=(None, None)
)
ef.add_sector_constraints(sector_mapper, sector_lower, sector_upper)
weights = ef.min_volatility()
for sector in list(set().union(sector_upper, sector_lower)):
sector_sum = 0
for t, v in weights.items():
if sector_mapper[t] == sector:
sector_sum += v
assert sector_sum <= sector_upper.get(sector, 1) + 1e-5
assert sector_sum >= sector_lower.get(sector, 0) - 1e-5
def test_min_volatility_vs_max_sharpe(): def test_min_volatility_vs_max_sharpe():
# Test based on issue #75 # Test based on issue #75
expected_returns_daily = pd.Series( expected_returns_daily = pd.Series(
@@ -287,6 +338,19 @@ def test_max_sharpe_long_weight_bounds():
assert (0.02 <= ef.weights[1::2]).all() and (ef.weights[1::2] <= 0.11).all() assert (0.02 <= ef.weights[1::2]).all() and (ef.weights[1::2] <= 0.11).all()
def test_max_sharpe_explicit_bound():
ef = setup_efficient_frontier()
ef.add_constraint(lambda w: w[0] >= 0.2)
ef.add_constraint(lambda w: w[2] == 0.15)
ef.add_constraint(lambda w: w[3] + w[4] <= 0.10)
ef.max_sharpe()
np.testing.assert_almost_equal(ef.weights.sum(), 1)
assert ef.weights[0] >= 0.2 - 1e-5
np.testing.assert_almost_equal(ef.weights[2], 0.15)
assert ef.weights[3] + ef.weights[4] <= 0.10 + 1e-5
def test_max_sharpe_short(): def test_max_sharpe_short():
ef = EfficientFrontier( ef = EfficientFrontier(
*setup_efficient_frontier(data_only=True), weight_bounds=(None, None) *setup_efficient_frontier(data_only=True), weight_bounds=(None, None)
@@ -337,13 +401,15 @@ def test_max_sharpe_L2_reg():
def test_max_sharpe_L2_reg_many_values(): def test_max_sharpe_L2_reg_many_values():
warnings.filterwarnings("ignore")
ef = setup_efficient_frontier() ef = setup_efficient_frontier()
ef.max_sharpe() ef.max_sharpe()
# Count the number of weights more 1% # Count the number of weights more 1%
initial_number = sum(ef.weights > 0.01) initial_number = sum(ef.weights > 0.01)
for _ in range(10): for i in range(1, 20, 2):
print(initial_number) ef = setup_efficient_frontier()
ef.add_objective(objective_functions.L2_reg, gamma=0.05) ef.add_objective(objective_functions.L2_reg, gamma=0.05 * i)
ef.max_sharpe() ef.max_sharpe()
np.testing.assert_almost_equal(ef.weights.sum(), 1) np.testing.assert_almost_equal(ef.weights.sum(), 1)
new_number = sum(ef.weights > 0.01) new_number = sum(ef.weights > 0.01)
@@ -406,15 +472,239 @@ def test_max_sharpe_risk_free_rate():
ef = setup_efficient_frontier() ef = setup_efficient_frontier()
ef.max_sharpe() ef.max_sharpe()
_, _, initial_sharpe = ef.portfolio_performance() _, _, initial_sharpe = ef.portfolio_performance()
ef = setup_efficient_frontier()
ef.max_sharpe(risk_free_rate=0.10) ef.max_sharpe(risk_free_rate=0.10)
_, _, new_sharpe = ef.portfolio_performance(risk_free_rate=0.10) _, _, new_sharpe = ef.portfolio_performance(risk_free_rate=0.10)
assert new_sharpe <= initial_sharpe assert new_sharpe <= initial_sharpe
ef = setup_efficient_frontier()
ef.max_sharpe(risk_free_rate=0) ef.max_sharpe(risk_free_rate=0)
_, _, new_sharpe = ef.portfolio_performance(risk_free_rate=0) _, _, new_sharpe = ef.portfolio_performance(risk_free_rate=0)
assert new_sharpe >= initial_sharpe assert new_sharpe >= initial_sharpe
def test_min_vol_pair_constraint():
ef = setup_efficient_frontier()
ef.min_volatility()
old_sum = ef.weights[:2].sum()
ef = setup_efficient_frontier()
ef.add_constraint(lambda w: (w[1] + w[0] <= old_sum / 2))
ef.min_volatility()
new_sum = ef.weights[:2].sum()
assert new_sum <= old_sum / 2 + 1e-4
def test_max_sharpe_pair_constraint():
ef = setup_efficient_frontier()
ef.max_sharpe()
old_sum = ef.weights[:2].sum()
ef = setup_efficient_frontier()
ef.add_constraint(lambda w: (w[1] + w[0] <= old_sum / 2))
ef.max_sharpe()
new_sum = ef.weights[:2].sum()
assert new_sum <= old_sum / 2 + 1e-4
def test_max_sharpe_sector_constraints_manual():
sector_mapper = {
"GOOG": "tech",
"AAPL": "tech",
"FB": "tech",
"AMZN": "tech",
"BABA": "tech",
"GE": "utility",
"AMD": "tech",
"WMT": "retail",
"BAC": "fig",
"GM": "auto",
"T": "auto",
"UAA": "airline",
"SHLD": "retail",
"XOM": "energy",
"RRC": "energy",
"BBY": "retail",
"MA": "fig",
"PFE": "pharma",
"JPM": "fig",
"SBUX": "retail",
}
sector_upper = {
"tech": 0.2,
"utility": 0.1,
"retail": 0.2,
"fig": 0.4,
"airline": 0.05,
"energy": 0.2,
}
sector_lower = {"utility": 0.01, "fig": 0.02, "airline": 0.01}
ef = setup_efficient_frontier()
for sector in sector_upper:
is_sector = [v == sector for k, v in sector_mapper.items()]
ef.add_constraint(lambda w: cp.sum(w[is_sector]) <= sector_upper[sector])
for sector in sector_lower:
is_sector = [v == sector for k, v in sector_mapper.items()]
ef.add_constraint(lambda w: cp.sum(w[is_sector]) >= sector_lower[sector])
weights = ef.max_sharpe()
for sector in list(set().union(sector_upper, sector_lower)):
sector_sum = 0
for t, v in weights.items():
if sector_mapper[t] == sector:
sector_sum += v
assert sector_sum <= sector_upper.get(sector, 1) + 1e-5
assert sector_sum >= sector_lower.get(sector, 0) - 1e-5
def test_max_sharpe_sector_constraints_auto():
sector_mapper = {
"GOOG": "tech",
"AAPL": "tech",
"FB": "tech",
"AMZN": "tech",
"BABA": "tech",
"GE": "utility",
"AMD": "tech",
"WMT": "retail",
"BAC": "fig",
"GM": "auto",
"T": "auto",
"UAA": "airline",
"SHLD": "retail",
"XOM": "energy",
"RRC": "energy",
"BBY": "retail",
"MA": "fig",
"PFE": "pharma",
"JPM": "fig",
"SBUX": "retail",
}
sector_upper = {
"tech": 0.2,
"utility": 0.1,
"retail": 0.2,
"fig": 0.4,
"airline": 0.05,
"energy": 0.2,
}
sector_lower = {"utility": 0.01, "fig": 0.02, "airline": 0.01}
ef = setup_efficient_frontier()
ef.add_sector_constraints(sector_mapper, sector_lower, sector_upper)
weights = ef.max_sharpe()
for sector in list(set().union(sector_upper, sector_lower)):
sector_sum = 0
for t, v in weights.items():
if sector_mapper[t] == sector:
sector_sum += v
assert sector_sum <= sector_upper.get(sector, 1) + 1e-5
assert sector_sum >= sector_lower.get(sector, 0) - 1e-5
def test_efficient_risk_sector_constraints_manual():
sector_mapper = {
"GOOG": "tech",
"AAPL": "tech",
"FB": "tech",
"AMZN": "tech",
"BABA": "tech",
"GE": "utility",
"AMD": "tech",
"WMT": "retail",
"BAC": "fig",
"GM": "auto",
"T": "auto",
"UAA": "airline",
"SHLD": "retail",
"XOM": "energy",
"RRC": "energy",
"BBY": "retail",
"MA": "fig",
"PFE": "pharma",
"JPM": "fig",
"SBUX": "retail",
}
sector_upper = {
"tech": 0.2,
"utility": 0.1,
"retail": 0.2,
"fig": 0.4,
"airline": 0.05,
"energy": 0.2,
}
sector_lower = {"utility": 0.01, "fig": 0.02, "airline": 0.01}
ef = setup_efficient_frontier()
for sector in sector_upper:
is_sector = [v == sector for k, v in sector_mapper.items()]
ef.add_constraint(lambda w: cp.sum(w[is_sector]) <= sector_upper[sector])
for sector in sector_lower:
is_sector = [v == sector for k, v in sector_mapper.items()]
ef.add_constraint(lambda w: cp.sum(w[is_sector]) >= sector_lower[sector])
weights = ef.efficient_risk(0.19)
for sector in list(set().union(sector_upper, sector_lower)):
sector_sum = 0
for t, v in weights.items():
if sector_mapper[t] == sector:
sector_sum += v
assert sector_sum <= sector_upper.get(sector, 1) + 1e-5
assert sector_sum >= sector_lower.get(sector, 0) - 1e-5
def test_efficient_risk_sector_constraints_auto():
sector_mapper = {
"GOOG": "tech",
"AAPL": "tech",
"FB": "tech",
"AMZN": "tech",
"BABA": "tech",
"GE": "utility",
"AMD": "tech",
"WMT": "retail",
"BAC": "fig",
"GM": "auto",
"T": "auto",
"UAA": "airline",
"SHLD": "retail",
"XOM": "energy",
"RRC": "energy",
"BBY": "retail",
"MA": "fig",
"PFE": "pharma",
"JPM": "fig",
"SBUX": "retail",
}
sector_upper = {
"tech": 0.2,
"utility": 0.1,
"retail": 0.2,
"fig": 0.4,
"airline": 0.05,
"energy": 0.2,
}
sector_lower = {"utility": 0.01, "fig": 0.02, "airline": 0.01}
ef = setup_efficient_frontier()
ef.add_sector_constraints(sector_mapper, sector_lower, sector_upper)
weights = ef.efficient_risk(0.19)
for sector in list(set().union(sector_upper, sector_lower)):
sector_sum = 0
for t, v in weights.items():
if sector_mapper[t] == sector:
sector_sum += v
assert sector_sum <= sector_upper.get(sector, 1) + 1e-5
assert sector_sum >= sector_lower.get(sector, 0) - 1e-5
def test_max_quadratic_utility(): def test_max_quadratic_utility():
ef = setup_efficient_frontier() ef = setup_efficient_frontier()
w = ef.max_quadratic_utility(risk_aversion=2) w = ef.max_quadratic_utility(risk_aversion=2)
@@ -536,7 +826,6 @@ def test_efficient_risk_many_values():
ef.efficient_risk(target_risk) ef.efficient_risk(target_risk)
np.testing.assert_almost_equal(ef.weights.sum(), 1) np.testing.assert_almost_equal(ef.weights.sum(), 1)
volatility = ef.portfolio_performance()[1] volatility = ef.portfolio_performance()[1]
print(volatility)
assert abs(target_risk - volatility) < 1e-5 assert abs(target_risk - volatility) < 1e-5