refactor base_optimizer structure

This commit is contained in:
Dingyuan Wang
2019-01-26 00:19:44 +08:00
parent d33666e37f
commit 3c4dad8517
9 changed files with 201 additions and 83 deletions

View File

@@ -1,22 +1,67 @@
# TODO module docstring
import numpy as np
import pandas as pd
from . import objective_functions
class BaseOptimizer:
def __init__(self, n_assets, weight_bounds=(0, 1)):
def __init__(self, n_assets, tickers=None):
"""
:param n_assets: number of assets
:type n_assets: int
:param tickers: name of assets
:type tickers: list
"""
self.n_assets = n_assets
if tickers is None:
self.tickers = list(range(n_assets))
else:
self.tickers = tickers
# Outputs
self.weights = None
def set_weights(self, weights):
if self.weights is None:
self.weights = [0] * self.n_assets
for i, k in enumerate(self.tickers):
if k in weights:
self.weights[i] = weights[k]
def clean_weights(self, cutoff=1e-4, rounding=5):
"""
Helper method to clean the raw weights, setting any weights whose absolute
values are below the cutoff to zero, and rounding the rest.
:param cutoff: the lower bound, defaults to 1e-4
:type cutoff: float, optional
:param rounding: number of decimal places to round the weights, defaults to 5.
Set to None if rounding is not desired.
:type rounding: int, optional
:return: asset weights
:rtype: dict
"""
if not isinstance(rounding, int) or rounding < 1:
raise ValueError("rounding must be a positive integer")
clean_weights = self.weights.copy()
clean_weights[np.abs(clean_weights) < cutoff] = 0
if rounding is not None:
clean_weights = np.round(clean_weights, rounding)
return dict(zip(self.tickers, clean_weights))
class BaseScipyOptimizer(BaseOptimizer):
def __init__(self, n_assets, tickers=None, weight_bounds=(0, 1)):
"""
:param weight_bounds: minimum and maximum weight of an asset, defaults to (0, 1).
Must be changed to (-1, 1) for portfolios with shorting.
:type weight_bounds: tuple, optional
"""
self.n_assets = n_assets
super().__init__(n_assets, tickers)
self.bounds = self._make_valid_bounds(weight_bounds)
# Optimisation parameters
self.initial_guess = np.array([1 / self.n_assets] * self.n_assets)
self.constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]
# Outputs
self.weights = None
def _make_valid_bounds(self, test_bounds):
"""
@@ -39,23 +84,54 @@ class BaseOptimizer:
raise ValueError("Lower bound is too high")
return (test_bounds,) * self.n_assets
def clean_weights(self, cutoff=1e-4, rounding=5):
"""
Helper method to clean the raw weights, setting any weights whose absolute
values are below the cutoff to zero, and rounding the rest.
:param cutoff: the lower bound, defaults to 1e-4
:type cutoff: float, optional
:param rounding: number of decimal places to round the weights, defaults to 5.
Set to None if rounding is not desired.
:type rounding: int, optional
:return: asset weights
:rtype: dict
"""
if not isinstance(rounding, int) or rounding < 1:
raise ValueError("rounding must be a positive integer")
clean_weights = self.weights.copy()
clean_weights[np.abs(clean_weights) < cutoff] = 0
if rounding is not None:
clean_weights = np.round(clean_weights, rounding)
return dict(zip(self.tickers, clean_weights))
def portfolio_performance(
expected_returns, cov_matrix, weights, verbose=False, risk_free_rate=0.02
):
"""
After optimising, calculate (and optionally print) the performance of the optimal
portfolio. Currently calculates expected return, volatility, and the Sharpe ratio.
:param expected_returns: expected returns for each asset. Set to None if
optimising for volatility only.
:type expected_returns: pd.Series, list, np.ndarray
:param cov_matrix: covariance of returns for each asset
:type cov_matrix: pd.DataFrame or np.array
:param weights: weights or assets
:type weights: list, np.array or dict, optional
:param verbose: whether performance should be printed, defaults to False
:type verbose: bool, optional
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02
:type risk_free_rate: float, optional
:raises ValueError: if weights have not been calcualted yet
:return: expected return, volatility, Sharpe ratio.
:rtype: (float, float, float)
"""
if isinstance(weights, dict):
if isinstance(expected_returns, pd.Series):
tickers = list(expected_returns.index)
elif isinstance(cov_matrix, pd.DataFrame):
tickers = list(cov_matrix.columns)
else:
tickers = list(range(len(expected_returns)))
newweights = np.zeros(len(tickers))
for i, k in enumerate(tickers):
if k in weights:
newweights[i] = weights[k]
if newweights.sum() == 0:
raise ValueError("Weights add to zero, or ticker names don't match")
elif weights is not None:
newweights = np.asarray(weights)
else:
raise ValueError("Weights is None")
sigma = np.sqrt(objective_functions.volatility(newweights, cov_matrix))
mu = newweights.dot(expected_returns)
sharpe = -objective_functions.negative_sharpe(
newweights, expected_returns, cov_matrix, risk_free_rate
)
if verbose:
print("Expected annual return: {:.1f}%".format(100 * mu))
print("Annual volatility: {:.1f}%".format(100 * sigma))
print("Sharpe Ratio: {:.2f}".format(sharpe))
return mu, sigma, sharpe

View File

@@ -40,7 +40,7 @@ def portfolio(weights, latest_prices, min_allocation=0.01, total_portfolio_value
:type total_portfolio_value: int/float, optional
:raises TypeError: if ``weights`` is not a dict
:raises TypeError: if ``latest_prices`` isn't a series
:raises ValueError: if ``0 < min_allocation < 0.3``
:raises ValueError: if not ``0 < min_allocation < 0.3``
:return: the number of shares of each ticker that should be purchased, along with the amount
of funds leftover.
:rtype: (dict, float)

View File

@@ -7,14 +7,13 @@ import warnings
import numpy as np
import pandas as pd
import scipy.optimize as sco
from . import objective_functions
from .base_optimizer import BaseOptimizer
from . import objective_functions, base_optimizer
class EfficientFrontier(BaseOptimizer):
class EfficientFrontier(base_optimizer.BaseScipyOptimizer):
"""
An EfficientFrontier object (inheriting from BaseOptimizer) contains multiple
An EfficientFrontier object (inheriting from BaseScipyOptimizer) contains multiple
optimisation methods that can be called (corresponding to different objective
functions) with various parameters.
@@ -65,28 +64,25 @@ class EfficientFrontier(BaseOptimizer):
self.cov_matrix = cov_matrix
if expected_returns is not None:
if not isinstance(expected_returns, (pd.Series, list, np.ndarray)):
raise TypeError(
"expected_returns is not a series, list or array")
raise TypeError("expected_returns is not a series, list or array")
if not isinstance(cov_matrix, (pd.DataFrame, np.ndarray)):
raise TypeError("cov_matrix is not a dataframe or array")
self.expected_returns = expected_returns
self.tickers = list(expected_returns.index)
if isinstance(expected_returns, pd.Series):
tickers = list(expected_returns.index)
elif isinstance(cov_matrix, pd.DataFrame):
tickers = list(cov_matrix.columns)
else:
self.tickers = list(cov_matrix.columns)
self.n_assets = len(self.tickers)
tickers = list(range(len(expected_returns)))
super().__init__(self.n_assets, weight_bounds)
super().__init__(len(tickers), tickers, weight_bounds)
if not isinstance(gamma, (int, float)):
raise ValueError("gamma should be numeric")
if gamma < 0:
warnings.warn(
"in most cases, gamma should be positive", UserWarning)
warnings.warn("in most cases, gamma should be positive", UserWarning)
self.gamma = gamma
# Outputs
self.weights = None
def max_sharpe(self, risk_free_rate=0.02):
"""
Maximise the Sharpe Ratio. The result is also referred to as the tangency portfolio,
@@ -102,8 +98,7 @@ class EfficientFrontier(BaseOptimizer):
if not isinstance(risk_free_rate, (int, float)):
raise ValueError("risk_free_rate should be numeric")
args = (self.expected_returns, self.cov_matrix,
self.gamma, risk_free_rate)
args = (self.expected_returns, self.cov_matrix, self.gamma, risk_free_rate)
result = sco.minimize(
objective_functions.negative_sharpe,
x0=self.initial_guess,
@@ -178,8 +173,7 @@ class EfficientFrontier(BaseOptimizer):
if not isinstance(risk_free_rate, (int, float)):
raise ValueError("risk_free_rate should be numeric")
args = (self.expected_returns, self.cov_matrix,
self.gamma, risk_free_rate)
args = (self.expected_returns, self.cov_matrix, self.gamma, risk_free_rate)
target_constraint = {
"type": "ineq",
"fun": lambda w: target_risk
@@ -273,17 +267,10 @@ class EfficientFrontier(BaseOptimizer):
:return: expected return, volatility, Sharpe ratio.
:rtype: (float, float, float)
"""
if self.weights is None:
raise ValueError("Weights not calculated yet")
sigma = np.sqrt(objective_functions.volatility(
self.weights, self.cov_matrix))
mu = self.weights.dot(self.expected_returns)
sharpe = -objective_functions.negative_sharpe(
self.weights, self.expected_returns, self.cov_matrix, risk_free_rate
return base_optimizer.portfolio_performance(
self.expected_returns,
self.cov_matrix,
self.weights,
verbose,
risk_free_rate,
)
if verbose:
print("Expected annual return: {:.1f}%".format(100 * mu))
print("Annual volatility: {:.1f}%".format(100 * sigma))
print("Sharpe Ratio: {:.2f}".format(sharpe))
return mu, sigma, sharpe

View File

@@ -18,6 +18,17 @@ import warnings
import pandas as pd
def daily_price_returns(prices):
"""
Calculate the daily return DataFrame from the prices of the asset.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
"""
return prices.pct_change().dropna(how="all")
def mean_historical_return(prices, frequency=252):
"""
Calculate annualised mean (daily) historical return from input (daily) asset prices.
@@ -34,7 +45,7 @@ def mean_historical_return(prices, frequency=252):
if not isinstance(prices, pd.DataFrame):
warnings.warn("prices are not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
daily_returns = prices.pct_change().dropna(how="all")
daily_returns = daily_price_returns(prices)
return daily_returns.mean() * frequency
@@ -57,5 +68,5 @@ def ema_historical_return(prices, frequency=252, span=500):
if not isinstance(prices, pd.DataFrame):
warnings.warn("prices are not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
daily_returns = prices.pct_change().dropna(how="all")
daily_returns = daily_price_returns(prices)
return daily_returns.ewm(span=span).mean().iloc[-1] * frequency

View File

@@ -6,6 +6,7 @@ import numpy as np
import pandas as pd
import scipy.cluster.hierarchy as sch
import scipy.spatial.distance as ssd
from .base_optimizer import BaseOptimizer
# This code has been reproduced(with modification) from the paper:
# López de Prado, M. (2016). Building Diversified Portfolios that Outperform Out of Sample.
@@ -66,6 +67,52 @@ def _raw_hrp_allocation(cov, ordered_tickers):
return w
class HRPOpt(BaseOptimizer):
"""
A HRPOpt object (inheriting from BaseOptimizer) constructs a hierarchical
risk parity portfolio.
Instance variables:
- Inputs
- ``returns``
- Output: ``weights``
Public methods:
- ``hrp_portfolio()``
"""
def __init__(self, returns):
"""
:param returns: asset historical returns
:type returns: pd.DataFrame
:raises TypeError: if ``returns`` is not a dataframe
"""
if not isinstance(returns, pd.DataFrame):
raise TypeError("returns are not a dataframe")
self.returns = returns
tickers = list(returns.columns)
super().__init__(len(tickers), tickers)
def hrp_portfolio(self):
corr, cov = self.returns.corr(), self.returns.cov()
# Compute distance matrix, with ClusterWarning fix as
# per https://stackoverflow.com/questions/18952587/
dist = ssd.squareform(((1 - corr) / 2) ** 0.5)
link = sch.linkage(dist, "single")
sort_ix = _get_quasi_diag(link)
ordered_tickers = corr.index[sort_ix].tolist()
hrp = _raw_hrp_allocation(cov, ordered_tickers)
weights = dict(hrp.sort_index())
self.set_weights(weights)
return weights
def hrp_portfolio(returns):
"""
Construct a hierarchical risk parity portfolio
@@ -76,16 +123,4 @@ def hrp_portfolio(returns):
:rtype: dict
:raises TypeError: if ``returns`` is not a dataframe
"""
if not isinstance(returns, pd.DataFrame):
raise TypeError("returns are not a dataframe")
corr, cov = returns.corr(), returns.cov()
# Compute distance matrix, with ClusterWarning fix as
# per https://stackoverflow.com/questions/18952587/
dist = ssd.squareform(((1 - corr) / 2) ** 0.5)
link = sch.linkage(dist, "single")
sort_ix = _get_quasi_diag(link)
ordered_tickers = corr.index[sort_ix].tolist()
hrp = _raw_hrp_allocation(cov, ordered_tickers)
return dict(hrp.sort_index())
return HRPOpt(returns).hrp_portfolio()

View File

@@ -22,6 +22,7 @@ import warnings
import numpy as np
import pandas as pd
from sklearn import covariance
from .expected_returns import daily_price_returns
def sample_cov(prices, frequency=252):
@@ -40,7 +41,7 @@ def sample_cov(prices, frequency=252):
if not isinstance(prices, pd.DataFrame):
warnings.warn("prices are not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
daily_returns = prices.pct_change().dropna(how="all")
daily_returns = daily_price_returns(prices)
return daily_returns.cov() * frequency
@@ -65,7 +66,7 @@ def semicovariance(prices, benchmark=0, frequency=252):
if not isinstance(prices, pd.DataFrame):
warnings.warn("prices are not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
daily_returns = prices.pct_change().dropna(how="all")
daily_returns = daily_price_returns(prices)
drops = np.fmin(daily_returns - benchmark, 0)
return drops.cov() * frequency
@@ -110,7 +111,7 @@ def exp_cov(prices, span=180, frequency=252):
warnings.warn("prices are not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
assets = prices.columns
daily_returns = prices.pct_change().dropna(how="all")
daily_returns = daily_price_returns(prices)
N = len(assets)
# Loop over matrix, filling entries with the pairwise exp cov
@@ -192,8 +193,7 @@ class CovarianceShrinkage:
"""
assets = self.X.columns
return (
pd.DataFrame(raw_cov_array, index=assets,
columns=assets) * self.frequency
pd.DataFrame(raw_cov_array, index=assets, columns=assets) * self.frequency
)
def shrunk_covariance(self, delta=0.2):

View File

@@ -4,15 +4,15 @@ value-at-risk (CVaR) objective, which requires Monte Carlo simulation.
"""
import pandas as pd
from .base_optimizer import BaseOptimizer
from .base_optimizer import BaseScipyOptimizer
from . import objective_functions
import noisyopt
class CVAROpt(BaseOptimizer):
class CVAROpt(BaseScipyOptimizer):
"""
A CVAROpt object (inheriting from BaseOptimizer) provides a method for
A CVAROpt object (inheriting from BaseScipyOptimizer) provides a method for
optimising the CVaR (a.k.a expected shortfall) of a portfolio.
Instance variables:
@@ -48,8 +48,8 @@ class CVAROpt(BaseOptimizer):
if not isinstance(returns, pd.DataFrame):
raise TypeError("returns are not a dataframe")
self.returns = returns
self.tickers = returns.columns
super().__init__(returns.shape[1], weight_bounds) # bounds
tickers = returns.columns
super().__init__(len(tickers), tickers, weight_bounds)
def min_cvar(self, s=10000, beta=0.95, random_state=None):
"""

View File

@@ -58,8 +58,7 @@ def test_clean_weights():
assert clean_number_tiny_weights == number_tiny_weights
#  Check rounding
cleaned_weights_str_length = [len(str(i)) for i in cleaned_weights]
assert all([length == 7 or length ==
3 for length in cleaned_weights_str_length])
assert all([length == 7 or length == 3 for length in cleaned_weights_str_length])
def test_clean_weights_short():

View File

@@ -5,6 +5,16 @@ from pypfopt import expected_returns
from tests.utilities_for_tests import get_data
def test_returns_dataframe():
df = get_data()
returns_df = expected_returns.daily_price_returns(df)
assert isinstance(returns_df, pd.DataFrame)
assert returns_df.shape[1] == 20
assert len(returns_df) == 7125
assert returns_df.index.is_all_dates
assert not ((returns_df > 1) & returns_df.notnull()).any().any()
def test_mean_historical_returns_dummy():
data = pd.DataFrame(
[