import pandas as pd
import numpy as np
from plotnine import *
from mizani.formatters import percent_format, date_format
from itertools import product
from sklearn.model_selection import (
train_test_split, GridSearchCV, TimeSeriesSplit, cross_val_score
)
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet, Lasso, Ridge
from sklearn.exceptions import ConvergenceWarningStoring and sharing ML models in Python
ML Example
The following example is taken from the “Pre-process data”, “Build a model”, and “Fit a model” sections of the chapter Factor Selection via Machine Learning.
import pandas as pd
import numpy as np
from plotnine import *
from mizani.formatters import percent_format, date_format
from itertools import product
from sklearn.model_selection import (
train_test_split, GridSearchCV, TimeSeriesSplit, cross_val_score
)
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet, Lasso, Ridge
from sklearn.exceptions import ConvergenceWarningfactors_ff3_monthly = pd.read_parquet(
"data/factors_ff3_monthly.parquet"
).add_prefix("factor_ff_")
factors_q_monthly = pd.read_parquet(
"data/factors_q_monthly.parquet"
).add_prefix("factor_q_")
macro_predictors = pd.read_parquet(
"data/macro_predictors.parquet"
).add_prefix("macro_")
industries_ff_monthly = pd.read_parquet(
"data/industries_ff_monthly.parquet"
).melt(id_vars="date", var_name="industry", value_name="ret")
data = (industries_ff_monthly
.merge(factors_ff3_monthly,
how="left", left_on="date", right_on="factor_ff_date")
.merge(factors_q_monthly,
how="left", left_on="date", right_on="factor_q_date")
.merge(macro_predictors,
how="left", left_on="date", right_on="macro_date")
.assign(ret_excess=lambda x: x["ret"] - x["factor_ff_risk_free"])
.drop(columns=["ret", "factor_ff_date", "factor_q_date", "macro_date"])
.dropna()
)macro_variables = data.filter(like="macro").columns
factor_variables = data.filter(like="factor").columns
column_combinations = list(product(macro_variables, factor_variables))
new_column_values = []
for macro_column, factor_column in column_combinations:
new_column_values.append(data[macro_column] * data[factor_column])
column_names = [" x ".join(t) for t in column_combinations]
new_columns = pd.DataFrame(dict(zip(column_names, new_column_values)))
data = pd.concat([data, new_columns], axis=1)
preprocessor = ColumnTransformer(
transformers=[
("scale", StandardScaler(),
[col for col in data.columns
if col not in ["ret_excess", "date", "industry"]])
],
remainder="drop",
verbose_feature_names_out=False
)
lm_model = ElasticNet(
alpha=0.007,
l1_ratio=1,
max_iter=5000,
fit_intercept=False
)
lm_pipeline = Pipeline([
("preprocessor", preprocessor),
("regressor", lm_model)
])data_manufacturing = data.query("industry == 'manuf'")
training_date = "2011-12-01"
data_manufacturing_training = data_manufacturing.query(f"date<'{training_date}'")
lm_fit = lm_pipeline.fit(
data_manufacturing_training, data_manufacturing_training.get("ret_excess")
)
predicted_values = (
pd.DataFrame(
{
"Fitted value": lm_fit.predict(data_manufacturing),
"Realization": data_manufacturing.get("ret_excess"),
}
)
.assign(date=data_manufacturing["date"])
.melt(id_vars="date", var_name="Variable", value_name="return")
)Storing and sharing
To store the workflow, I use the joblib package, which works similarly to butcher in R. First, I create a folder for the stored workflows. Next, I create a model package, which includes the workflow and relevant meta-data such as training date, industry etc.
import joblib
import os
os.makedirs("stored_workflows", exist_ok=True)
model_package = {
"workflow": lm_fit,
"training_date": "2011-12-01",
"industry": "manufacturing",
"features": list(data.columns)
}
joblib.dump(model_package, "manufacturing_model.joblib")I load the stored package using joblib.load() and access the workflow.
package = joblib.load("manufacturing_model.joblib")
workflow = package["workflow"]workflow contains all the information, making replication easier. We can use it to access the trained coefficients using workflow.names_steps["regressor"].coef or make predictions using with workflow.predict()
coefficients = workflow.named_steps["regressor"].coef_
predictions = workflow.predict(data_manufacturing.head())