Add parametrization of quadratic utility

This commit is contained in:
phschiele
2021-05-18 20:22:03 +02:00
parent 65c790057b
commit 538087eb61
4 changed files with 53 additions and 30 deletions

View File

@@ -7,13 +7,16 @@ Additionally, we define a general utility function ``portfolio_performance`` to
evaluate return and risk for a given set of portfolio weights.
"""
import collections
import copy
import json
import warnings
from collections.abc import Iterable
from typing import List
import numpy as np
import pandas as pd
import cvxpy as cp
import scipy.optimize as sco
from . import objective_functions
from . import exceptions
@@ -223,23 +226,25 @@ class BaseConvexOptimizer(BaseOptimizer):
def is_parameter_defined(self, parameter_name: str) -> bool:
is_defined = False
for const in self._constraints:
for arg in const.args:
if isinstance(arg, cp.Parameter):
if arg.name() == parameter_name and not is_defined:
objective_and_constraints = self._constraints + [self._objective] if self._objective is not None else self._constraints
for expr in objective_and_constraints:
params = [arg for arg in get_all_args(expr) if isinstance(arg, cp.Parameter)]
for param in params:
if param.name() == parameter_name and not is_defined:
is_defined = True
elif arg.name() == parameter_name and is_defined:
elif param.name() == parameter_name and is_defined:
raise Exception('Parameter name defined multiple times')
return is_defined
def update_parameter_value(self, parameter_name: str, new_value: float) -> None:
assert self.is_parameter_defined(parameter_name)
was_updated = False
for const in self._constraints:
for arg in const.args:
if isinstance(arg, cp.Parameter):
if arg.name() == parameter_name:
arg.value = new_value
objective_and_constraints = self._constraints + [self._objective] if self._objective is not None else self._constraints
for expr in objective_and_constraints:
params = [arg for arg in get_all_args(expr) if isinstance(arg, cp.Parameter)]
for param in params:
if param.name() == parameter_name:
param.value = new_value
was_updated = True
assert was_updated
@@ -526,3 +531,18 @@ def portfolio_performance(
if verbose:
print("Annual volatility: {:.1f}%".format(100 * sigma))
return None, sigma, None
def get_all_args(expression: cp.Expression) -> List[cp.Expression]:
if expression.args == []:
return [expression]
else:
return list(flatten([get_all_args(arg) for arg in expression.args]))
def flatten(l: Iterable) -> Iterable:
for el in l:
if isinstance(el, Iterable) and not isinstance(el, (str, bytes)):
yield from flatten(el)
else:
yield el

View File

@@ -299,6 +299,10 @@ class EfficientFrontier(base_optimizer.BaseConvexOptimizer):
if risk_aversion <= 0:
raise ValueError("risk aversion coefficient must be greater than zero")
update_existing_parameter = self.is_parameter_defined('risk_aversion')
if update_existing_parameter:
self.update_parameter_value('risk_aversion', risk_aversion)
else:
self._objective = objective_functions.quadratic_utility(
self._w, self.expected_returns, self.cov_matrix, risk_aversion=risk_aversion
)
@@ -376,9 +380,9 @@ class EfficientFrontier(base_optimizer.BaseConvexOptimizer):
"target_return must be lower than the maximum possible return"
)
update_existing_parameter = self.is_parameter_defined('target_return_par')
update_existing_parameter = self.is_parameter_defined('target_return')
if update_existing_parameter:
self.update_parameter_value('target_return_par', target_return)
self.update_parameter_value('target_return', target_return)
else:
self._objective = objective_functions.portfolio_variance(
self._w, self.cov_matrix
@@ -390,7 +394,7 @@ class EfficientFrontier(base_optimizer.BaseConvexOptimizer):
for obj in self._additional_objectives:
self._objective += obj
target_return_par = cp.Parameter(name='target_return_par', value=target_return)
target_return_par = cp.Parameter(name='target_return', value=target_return)
self._constraints.append(ret >= target_return_par)
self._make_weight_sum_constraint(market_neutral)
return self._solve_cvxpy_opt_problem()

View File

@@ -158,7 +158,8 @@ def quadratic_utility(w, expected_returns, cov_matrix, risk_aversion, negative=T
mu = w @ expected_returns
variance = cp.quad_form(w, cov_matrix)
utility = mu - 0.5 * risk_aversion * variance
risk_aversion_par = cp.Parameter(value=risk_aversion, name='risk_aversion', nonneg=True)
utility = mu - 0.5 * risk_aversion_par * variance
return _objective_value(w, sign * utility)

View File

@@ -168,15 +168,13 @@ def _plot_ef(ef, ef_param, ef_param_range, ax, show_assets):
# Create a portfolio for each value of ef_param_range
for param_value in ef_param_range:
ef_i = copy.deepcopy(ef)
try:
if ef_param == "utility":
ef_i.max_quadratic_utility(param_value)
ef.max_quadratic_utility(param_value)
elif ef_param == "risk":
ef_i.efficient_risk(param_value)
ef.efficient_risk(param_value)
elif ef_param == "return":
ef_i.efficient_return(param_value)
ef.efficient_return(param_value)
else:
raise NotImplementedError(
"ef_param should be one of {'utility', 'risk', 'return'}"
@@ -184,7 +182,7 @@ def _plot_ef(ef, ef_param, ef_param_range, ax, show_assets):
except exceptions.OptimizationError:
continue
ret, sigma, _ = ef_i.portfolio_performance()
ret, sigma, _ = ef.portfolio_performance()
mus.append(ret)
sigmas.append(sigma)