Update data module

This commit is contained in:
Thomas
2021-07-22 11:07:25 +02:00
parent d4b58bf158
commit 08337ec8fa
5 changed files with 248 additions and 46 deletions

View File

@@ -0,0 +1,125 @@
from typing import List
from typing import Optional
import numpy as np
import xarray as xr
import portfolio_management.paths as p
import portfolio_management.data.constants as c
from portfolio_management.io_utilities import pickle_dump
from portfolio_management.data.bases import Interval
from portfolio_management.data.retrieve import get_dataframe
from portfolio_management.data.retrieve import get_symbol_list
from portfolio_management.data.utilities import session_scope
from portfolio_management.data.utilities import get_sessionmaker
from portfolio_management.data.preprocessing import Preprocessing
class DatasetManager: # todo edit the data structure as well as the folder structure because it is not coherent yet
def __init__(
self,
train_database_name: str,
test_database_name: Optional[str] = None,
dataset_folder_path: Optional[str] = None,
database_folder_path: Optional[str] = None,
interval: Optional[str] = None,
symbol_list: Optional[List[str]] = None,
echo: bool = False,
float_32: bool = True,
):
self.databases_folder_path = p.get_databases_folder_path(database_folder_path)
self.datasets_folder_path = p.get_datasets_folder_path(dataset_folder_path)
self.train_database_name = train_database_name
self.test_database_name = test_database_name
self.echo = echo
self.float_32 = float_32
with session_scope(
get_sessionmaker(str(self.databases_folder_path), self.train_database_name, echo),
expire_on_commit=False
) as session:
self.symbol_list = symbol_list or get_symbol_list(session=session) # noqa
self.interval = interval or session.query(Interval.value).first()[0]
def run(self, preprocessing: Preprocessing, pickle: bool = True, float_32: bool = True):
train_dataset = self.get_dataset(self.train_database_name, preprocessing, pickle, is_test=False, float_32=float_32)
test_dataset = self.get_dataset(self.test_database_name, preprocessing, pickle, is_test=True, float_32=float_32)
return train_dataset, test_dataset
def get_dataset(
self,
database_name: str,
preprocessing: Preprocessing,
pickle: bool,
is_test: bool = False,
float_32: bool = True,
):
dataframe_dict = {}
open_time_array_dict = {}
close_time_array_dict = {}
properties_array_dict = {}
for symbol in self.symbol_list:
dataframe = get_dataframe(
folder_path=str(self.databases_folder_path),
database_name=database_name,
symbol=symbol,
interval=self.interval,
)
dataframe_dict[symbol] = dataframe
open_time_array_dict[symbol] = dataframe[c.OPEN_TIME]
close_time_array_dict[symbol] = dataframe[c.CLOSE_TIME]
properties_array_dict[symbol] = dataframe[c.PROPERTY_LIST]
preprocessed_array_dict = {} # todo it is not really an array
if preprocessing is not None:
preprocessed_array_dict = preprocessing(dataframe_dict, is_test=is_test)
dtype = 'float32' if float_32 else 'float64'
properties_array = np.stack([properties_array_dict[symbol] for symbol in self.symbol_list]).astype(dtype)
open_time_array = np.stack([open_time_array_dict[symbol] for symbol in self.symbol_list])
close_time_array = np.stack([close_time_array_dict[symbol] for symbol in self.symbol_list])
indexes = np.arange(open_time_array.shape[1])
data_preprocessing = {}
coords_preprocessing = {}
if preprocessing is not None:
coords = list(list(preprocessed_array_dict.values())[0].columns)
coords_preprocessing = {c.PREPROCESSING_PROPERTY: coords} # edit for having the thing in k
preprocessed_data = np.stack([preprocessed_array_dict[symbol] for symbol in self.symbol_list])
data_preprocessing = {
c.DATA_PREPROCESSED: ([c.SYMBOL, c.INDEX, c.PREPROCESSING_PROPERTY], preprocessed_data)
}
dataset = xr.Dataset(
{
c.DATA: ([c.SYMBOL, c.INDEX, c.PROPERTY], properties_array),
c.CLOSE_TIME: ([c.SYMBOL, c.INDEX], open_time_array), # todo put only one time in the dataset
c.OPEN_TIME: ([c.SYMBOL, c.INDEX], close_time_array),
**data_preprocessing
},
coords={
c.SYMBOL: self.symbol_list,
c.PROPERTY: c.PROPERTY_LIST,
c.INDEX: indexes,
**coords_preprocessing
},
attrs={
c.INTERVAL: self.interval
}
)
dataset = dataset.isel({c.INDEX: slice(0, -2)})
if pickle:
path_dataset = self.datasets_folder_path.joinpath(database_name).with_suffix('.pkl')
pickle_dump(path_dataset, dataset)
return dataset

View File

@@ -1,3 +1,4 @@
# todo rename certainly to database
from typing import List
from typing import Optional

View File

@@ -1,49 +1,121 @@
from typing import Dict
from typing import Optional
from typing import Tuple
from abc import ABC
from abc import abstractmethod
import pandas as pd
import numpy as np
from typing import Optional
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
def get_preprocessing_function(
df: pd.DataFrame,
relative_change: bool = True,
relative_to: bool = True
):
columns = []
class Preprocessing(ABC):
@abstractmethod
def __call__(
self,
dataframe_dict: Dict[str, pd.DataFrame],
is_test: bool
) -> Dict[str, pd.DataFrame]:
pass
if relative_change:
def get_relative_change(value):
return 100 * (value.shift(1) - 1) / value
df['r_open'] = get_relative_change(df['open'])
df['r_close'] = get_relative_change(df['close'])
df['r_high'] = get_relative_change(df['high'])
df['r_low'] = get_relative_change(df['low'])
r_columns = ['r_open', 'r_close', 'r_high', 'r_low']
columns += r_columns
if relative_to:
def get_relative_to(value, reference):
return 100 * (value - 1) / reference
df['rto_close'] = get_relative_to(df['close'], df['open'])
df['rto_low'] = get_relative_to(df['low'], df['open'])
df['rto_high'] = get_relative_to(df['high'], df['open'])
rto_columns = ['rto_close', 'rto_low', 'rto_high']
columns += rto_columns
class PCAPreprocessing(Preprocessing):
def __init__(
self,
epsilon: float = 10e-9,
pca_num_components: int = 5,
):
super(PCAPreprocessing, self).__init__()
self.epsilon = epsilon
self.pca_num_components = pca_num_components
self.is_trained = False
return df[columns]
self.pca = None
self.scaler_dict = {}
def __call__(
self,
dataframe_dict: Dict[str, pd.DataFrame],
is_test: bool
) -> Dict[str, pd.DataFrame]:
if is_test and not self.is_trained:
raise ValueError
features_scaled_dict = {}
for symbol, dataframe in dataframe_dict.items():
features = self.get_features(dataframe)
features_scaled, scaler = self.scale(features, symbol)
features_scaled_dict[symbol] = features_scaled
if not is_test:
self.scaler_dict[symbol] = scaler
preprocessed_dataframe_dict = self.apply_pca(features_scaled_dict, is_test)
if not is_test:
self.is_trained = True
return preprocessed_dataframe_dict
def scale(self, features: np.array, symbol: str) -> Tuple[np.array, StandardScaler]:
if symbol not in self.scaler_dict.keys():
scaler = StandardScaler()
scaler.fit(features)
else:
scaler = self.scaler_dict[symbol]
scaled_features = scaler.transform(features)
return scaled_features, scaler
def apply_pca(self, features_dict: Dict[str, pd.DataFrame], is_test: bool) -> Dict[str, pd.DataFrame]:
if not is_test:
self.pca = PCA(n_components=self.pca_num_components)
features = np.concatenate(list(features_dict.values()))
self.pca.fit(features)
dataframe_dict = {}
columns = [f'pca_{i}' for i in range(self.pca_num_components)]
for symbol, features in features_dict.items():
data = self.pca.transform(features)
dataframe = pd.DataFrame(data, columns=columns)
dataframe_dict[symbol] = dataframe
return dataframe_dict
def get_features(self, dataframe):
dataframe[['open_s', 'low_s', 'close_s', 'high_s']] = dataframe[['open', 'low', 'close', 'high']].shift(-1)
nominator = np.array(dataframe[['open_s', 'low_s', 'close_s', 'high_s', 'open', 'low', 'close', 'high']])
denominator = np.array(dataframe[['open', 'low', 'close', 'high']])
nominator = np.nan_to_num(nominator, nan=0.0)
denominator = np.nan_to_num(denominator, nan=0.0)
nominator = nominator + self.epsilon
denominator = denominator + self.epsilon
nominator_reshaped = np.reshape(np.repeat(nominator, 4), (-1, 8, 4))
denominator_reshaped = np.tile(np.expand_dims(denominator, axis=1), (1, 8, 1))
raw_features = nominator_reshaped / denominator_reshaped
features = raw_features.reshape(raw_features.shape[0], -1)
return features
def get_pca_preprocessing_function(
epsilon: float = 10e-9,
pca_num_componants: int = 5,
means: Optional[list] = None, # todo maybe use config file instead ?
stds: Optional[list] = None,
pca_components: Optional[list] = None
pca_num_components: int = 5,
pca: Optional[PCA] = None,
scaler: Optional[StandardScaler] = None,
):
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
def preprocess(dataframe):
dataframe[['open', 'low', 'close', 'high']] = dataframe[['open', 'low', 'close', 'high']].shift(1)
dataframe[['open_s', 'low_s', 'close_s', 'high_s']] = dataframe[['open', 'low', 'close', 'high']].shift(1)
nominator = np.array(dataframe[['open_s', 'low_s', 'close_s', 'high_s', 'open', 'low', 'close', 'high']])
denominator = np.array(dataframe[['open', 'low', 'close', 'high']])
@@ -53,21 +125,25 @@ def get_pca_preprocessing_function(
nominator = nominator + epsilon
denominator = denominator + epsilon
nominator_ = np.reshape(np.repeat(nominator, 4), (-1, 8, 4))
denominator_ = np.tile(np.expand_dims(denominator, axis=1), (1, 8, 1))
new = nominator_/denominator_
nominator_reshaped = np.reshape(np.repeat(nominator, 4), (-1, 8, 4))
denominator_reshaped = np.tile(np.expand_dims(denominator, axis=1), (1, 8, 1))
features = new.reshape(new.shape[0], -1)
raw_features = nominator_reshaped/denominator_reshaped
features = raw_features.reshape(raw_features.shape[0], -1)
scaler = StandardScaler() # todo find what to do with the scaler, mean std all data, put in config file
scaler.fit(features)
nonlocal scaler
if scaler is None:
scaler = StandardScaler()
scaler.fit(features)
scaled_features = scaler.transform(features)
pca = PCA(n_components=pca_num_componants) # todo idem for PCA
pca.fit(scaled_features)
nonlocal pca
if pca is None:
pca = PCA(n_components=pca_num_components)
pca.fit(scaled_features)
data = pca.transform(scaled_features)
columns = [f'pca_{i}' for i in range(pca_num_componants)]
columns = [f'pca_{i}' for i in range(pca_num_components)]
return pd.DataFrame(data, columns=columns)
return preprocess

View File

@@ -83,7 +83,7 @@ def get_dataset(
echo: bool = False,
float_32: bool = True,
preprocessing: Optional[Callable] = None,
target: Optional[Callable] = None,
preprocessing_kwargs: Optional[Callable] = None,
) -> xr.Dataset:
databases_folder_path = p.get_databases_folder_path(folder_path)
@@ -113,10 +113,7 @@ def get_dataset(
properties_array_list.append(df[c.PROPERTY_LIST])
if preprocessing is not None:
preprocessing_array_list.append(preprocessing(df))
if target is not None:
preprocessing_array_list.append(target(df))
preprocessing_array_list.append(preprocessing(df)) # todo preprocessing common to all
dtype = 'float32' if float_32 else 'float64'
properties_array = np.stack(properties_array_list).astype(dtype)

View File

@@ -84,3 +84,6 @@ def inner_join(a, b) -> list:
def remove_keys_from_dictionary(dictionary: dict, keys: list) -> dict:
return {k: v for k, v in dictionary.items() if k not in keys}
def prepare_dataset():
pass