I’m building a pipeline for running portfolio optimization studies at scale. The goal is to have a scalable and reliable system to backtest hundreds of trading strategies, in many timeframes, on hundreds of markets. I’m generating quantitative calculations on top of these backtest results and trades, and then use the results to build portfolios. The ultimate goal is to be able to easily implement latest portfolio optimization research, specifically focused on using Reinforcement Learning for manging a live portfolio of bots tradings assets. The strategies I’m using are all open source, and obviously rarely there is any incentive for anyone to open source a strategy that has a real edge over the market. Which becomes apparent as you look at the stats of many individual strategies in differnt timeframes. Therefore I’m also building a scalable way of building models which would ingest market conditions in real-time and supervise each of these strategies as they’re trading, which would help avoid losses and improve returns. Then finally we can construct diffrent portfolios with our desired returns/volatility ratios.
My first attempts were many notebooks, with a lot of pickle files, hacking things together on local. Then I decided scale is key to making things work, so I started designing the pipeline you see here.
The entire backend is written in
Python. I use
Postgres DB for storing backtest results, quant calculations, and model predictions. All price data, and market features are on GCS. Also many ETL functions moving the data around. For instance moving tens of millions of rows in trades table on postgres to a single json file on GCS.
Backtests are executed by
Celery workers. I monitor these worker using
Celery Flower. Recently I started running the pipeline in
CeleryExecutor on the backend. I’ve dockerized the pipeline into different microservices in a single
docker-compose.yml file. This has helped a lot with easily spinning up VM instances, and deploying any of the services on demand.
For model training I’m using
MLflow with a local
sqllite db. These models write their batch market predicictions to
Postgres, and their 1m real-time predicitons to
Backtesting and multiprocessing
A backtest is applying a “strategy-timeframe” on pricing data of many markets (~100 pairs right now, exapanding to ~200). The output is a set of trades. And some statistics calcualted based on those trades. This can be very computationaly expensive. Here’s a little back of the envelope:
- 60 strategies
- 6 timeframes
- 200 pairs
That gives you roughly 72,000 assets to backtest. on about 2-5 years of pricing data, on average.
“1m” pricing data for 200 pairs, is about 5GB of json files, which takes about 20GB with other indicators added in pandas dataframes and loaded in RAM. On a single core of C2 CPUs on GCP, it could take about 10minutes to complete, and that’s just one backtest. imagine you have 6 timeframes * 60 starategies = 360 of those. So in order to minimize the compute cost, you’d like to run these in parallel on many cores, which then requires a lot of ram.
I wrote my own multiprocessing, with a pool of agents each grabing a job, putting a lock on it, and executing the job and writing the results back to the database. I spent some time reading about CPU scheduling algorithmgs to build an orechestrator that would exauhst all CPU and RAM continously (for instance “shortest job first”). But then I ended up giving
Celery a try. Great solution for autoscaling and exhausting all system resources, and also easy to monitor things. So I migrated everything there, and got rid of a lot of unnecessary complex logic.
Now I have a wroker for each of the backtesting timeframes, subscribed to my
Redis broker, ready to execute with
--autoscale enabled, to exhaust all VM resources continuously.
And I’m using
Celery Flower to monitor and control the Celery processes from a GUI.
celery --app=backtest_agent worker --loglevel=INFO --autoscale=4,1 -Q 1m_tasks -f 1m_worker.log --detach celery --app=backtest_agent worker --loglevel=INFO --autoscale=10,1 -Q 5m_tasks -f 5m_worker.log --detach celery --app=backtest_agent worker --loglevel=INFO --autoscale=20,1 -Q 1h_tasks -f 1h_worker.log --detach
Training and Serving Models
I’m calling these supervising models “trading watchdogs”. Now lemme explain what these watchdogs do. Here’s an example for a strategy:
stoploss = -0.3 timeframe = '5m' dataframe.loc[ ((dataframe['macd'] > dataframe['macdsignal']) & (dataframe['cci'] <= -50.0)), 'buy'] = 1
We have n (~60) of these strategies in 6 different timeframes, they each take trades, some will be profitable, and some will incur a loss. We have ~10M of these trades for training our models.
The idea is that these boolean conditions can be supercharged with supervision of a binary classificaiton model, which takes market features as inputs, and predicts the
_proba of the bot making a profit or a loss. then we use the
_proba predictions of these models to decide how we should move money between our bots under different market conditions, based on their probability of making profit.
Here are some examples of the market features ( t lags are added to these):
[ 'open', 'high', 'low', 'close', 'volume', 'log_return', 'rsi_5', 'bollinger_20_2_bb_lowerband', 'bollinger_20_2_bb_upperband', 'bollinger_20_2_bb_middleband', 'bollinger_20_2_close_bb_lowerband_diff', 'rsi_14', 'macd', 'macdsignal', 'macdhist', 'cci', 'emarsi_5', 'ema100', 'adx', 'sma_200', 'sma_50', 'volume_adi', 'volume_obv', 'volume_cmf', 'volume_fi', 'volume_em', 'volume_sma_em', 'volume_vpt', 'volume_vwap', 'volume_mfi', 'volume_nvi', 'volatility_bbm', 'volatility_bbh', 'volatility_bbl', 'volatility_bbw', 'volatility_bbp', 'volatility_bbhi', 'volatility_bbli', 'volatility_kcc', 'volatility_kch', 'volatility_kcl', 'volatility_kcw', 'volatility_kcp', 'volatility_kchi', 'volatility_kcli', 'volatility_dcl', 'volatility_dch', 'volatility_dcm', 'volatility_dcw', 'volatility_dcp', 'volatility_atr', 'volatility_ui', 'trend_macd', 'trend_macd_signal', 'trend_macd_diff', 'trend_sma_fast', 'trend_sma_slow', 'trend_ema_fast', 'trend_ema_slow', 'trend_vortex_ind_pos', 'trend_vortex_ind_neg', 'trend_vortex_ind_diff', 'trend_trix', 'trend_mass_index', 'trend_dpo', 'trend_kst', 'trend_kst_sig', 'trend_kst_diff', 'trend_ichimoku_conv', 'trend_ichimoku_base', 'trend_ichimoku_a', 'trend_ichimoku_b', 'trend_stc', 'trend_adx', 'trend_adx_pos', 'trend_adx_neg', 'trend_cci', 'trend_visual_ichimoku_a', 'trend_visual_ichimoku_b', 'trend_aroon_up', 'trend_aroon_down', 'trend_aroon_ind', 'trend_psar_up', 'trend_psar_down', 'trend_psar_up_indicator', 'trend_psar_down_indicator', 'momentum_rsi', 'momentum_stoch_rsi', 'momentum_stoch_rsi_k', 'momentum_stoch_rsi_d', 'momentum_tsi', 'momentum_uo', 'momentum_stoch', 'momentum_stoch_signal', 'momentum_wr', 'momentum_ao', 'momentum_roc', 'momentum_ppo', 'momentum_ppo_signal', 'momentum_ppo_hist', 'momentum_kama', 'others_dr', 'others_dlr', 'others_cr']
I have a pipeline for loading trading data, adding market features, auto feature selection, preprocessing and finally training models, for each strategy-timeframe. all logged in mlflow. and then the registered models with best
val_f1_score are deployed to production.
I need to serve both batch and online predictions using the same pipeline. I have a stream of pricing data from exchange websocket, and need to feed it to my feature engineering pipeline and store the output. I initially wrote some multiprocessing logic for loading price data and adding features and storing in json files. There’re two big problems with this. Adding hunderds of feature columns to millions of rows of pricing data is very computationally expensive. And the dataframes need a lot of RAM, which limits the number of parallel feature engineering processes that you can run on a GCP VM instance. I need serverless horizontal autoscaling. Therefore I decided to keep all of it serverless on
GCP. So right now I’m working on having the websocket pricing data published to my
GCP Pub/Sub topic, from there to my
GCP Dataflow, where I add the features in
Beam and write it to csv for batch predictions and to
Redis for online predictions.
Every 1min each of my watchdog models read a stream of market features from
Redis, and each model dumps its predicitons for ~200 markets back in
Portfolio Manger runner reads these predictions every 1min, and decides how to move money between ~10 live bots trading.
Dashboards and Quant Calcualtions
Streamlit, dataframes and plotly plots. I’ve been using streamlit alot at work for our research dashboards. In order to keep the logic for calculations out of streamlit pages, I have a single notebook which loads many tables from db and some big json files from GCS, and generates the OLAP dataframes and figures, and stores it all in a single pickled dictionary (~ 100MBs). And it’s fast becuase streamlit caches the pickle after the first time it loads. I just run my notebook from Airflow and it keeps my dashboards updated.