Skip to content

Commit 1879d9d

Browse files
committed
live trade with draft
1 parent 0ce24d8 commit 1879d9d

File tree

1 file changed

+260
-0
lines changed

1 file changed

+260
-0
lines changed

backtesting/live.py

+260
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
import pandas as pd
2+
import numpy as np
3+
import datetime
4+
from collections import UserDict
5+
from typing import Dict, Optional
6+
7+
from backtesting import Backtest
8+
from backtesting._stats import compute_stats
9+
from backtesting._util import _Indicator, try_, _Array
10+
from backtesting.backtesting import _Broker, Strategy, _OutOfMoneyError
11+
12+
13+
class LiveMarketOhlcv:
14+
symbol: str = ""
15+
open_price: float = 0.
16+
high_price: float = 0.
17+
low_price: float = 0.
18+
close_price: float = 0.
19+
total_qty: float = 0.
20+
21+
timestamp_second_start: int = 0
22+
duration_second: int = 60
23+
24+
def __init__(self, timestamp_second=0, open_price=0., high_price=0.,
25+
low_price=0., close_price=0., total_qty=0., symbol="") -> None:
26+
self.timestamp_second_start = timestamp_second
27+
self.symbol = symbol
28+
self.open_price = open_price
29+
self.high_price = high_price
30+
self.low_price = low_price
31+
self.close_price = close_price
32+
self.total_qty = total_qty
33+
34+
def __repr__(self) -> str:
35+
dt = datetime.datetime.fromtimestamp(self.timestamp_second_start)
36+
ps1 = f"o={self.open_price}, h={self.high_price}, "
37+
ps2 = f"l={self.low_price}, c={self.close_price}, v={self.total_qty}"
38+
symbol = f"[{self.symbol}]" \
39+
if type(self.symbol) is str and len(self.symbol) > 0 else ""
40+
return f"LiveMarketOhlcv {symbol}[{dt}]: {ps1}{ps2}"
41+
42+
43+
class _DataCachePatch:
44+
"""
45+
A data array accessor. Provides access to OHLCV "columns"
46+
as a standard `pd.DataFrame` would, except it's not a DataFrame
47+
and the returned "series" are _not_ `pd.Series` but `np.ndarray`
48+
for performance reasons.
49+
"""
50+
def __init__(self, df: pd.DataFrame):
51+
self.__df = df
52+
self.__i = len(df)
53+
self.__pip: Optional[float] = None
54+
self.__cache: Dict[str, np.ndarray] = {}
55+
self.__arrays: Dict[str, _Array] = {}
56+
self._update()
57+
58+
def __getitem__(self, item):
59+
return self.__get_array(item)
60+
61+
def __getattr__(self, item):
62+
try:
63+
return self.__get_array(item)
64+
except KeyError:
65+
raise AttributeError(f"Column '{item}' not in data") from None
66+
67+
def _set_length(self, i):
68+
self.__i = i
69+
self.__cache.clear()
70+
71+
def _update(self):
72+
index = self.__df.index.copy()
73+
self.__arrays = {col: _Array(arr, index=index)
74+
for col, arr in self.__df.items()}
75+
# Leave index as Series because pd.Timestamp nicer API to work with
76+
self.__arrays['__index'] = index
77+
78+
def __repr__(self):
79+
i = min(self.__i, len(self.__df) - 1)
80+
index = self.__df.index[i]
81+
items = ', '.join(f'{k}={v}' for k, v in self.__df.iloc[i].items())
82+
return f'<Data i={i} ({index}) {items}>'
83+
84+
def __len__(self):
85+
return self.__i
86+
87+
@property
88+
def df(self) -> pd.DataFrame:
89+
return (self.__df.iloc[:self.__i]
90+
if self.__i < len(self.__df)
91+
else self.__df)
92+
93+
def __get_array(self, key) -> np.ndarray:
94+
arr = self.__cache.get(key, None)
95+
if arr is None or len(arr) != len(self):
96+
farr: np.ndarray = self.__df[key].values[:self.__i]
97+
arr = self.__cache[key] = farr
98+
return arr
99+
100+
@property
101+
def Open(self) -> np.ndarray:
102+
return self.__get_array('Open')
103+
104+
@property
105+
def High(self) -> np.ndarray:
106+
return self.__get_array('High')
107+
108+
@property
109+
def Low(self) -> np.ndarray:
110+
return self.__get_array('Low')
111+
112+
@property
113+
def Close(self) -> np.ndarray:
114+
return self.__get_array('Close')
115+
116+
@property
117+
def Volume(self) -> np.ndarray:
118+
return self.__get_array('Volume')
119+
120+
@property
121+
def index(self) -> pd.DatetimeIndex:
122+
return self.__df.index[:self.__i]
123+
124+
# Make pickling in Backtest.optimize() work with our catch-all __getattr__
125+
def __getstate__(self):
126+
return self.__dict__
127+
128+
def __setstate__(self, state):
129+
self.__dict__ = state
130+
131+
132+
class _ObjectBindingDict(UserDict):
133+
def __init__(self, obj, is_key_func=lambda _: True):
134+
self.obj = obj
135+
self.is_key_func = is_key_func
136+
137+
def __iter__(self):
138+
return iter({
139+
k: v for k, v in self.obj.__dict__.items() if self.is_key_func(v)
140+
}.items())
141+
142+
def __getitem__(self, key):
143+
return getattr(self.obj, key)
144+
145+
def __setitem__(self, key, item) -> None:
146+
return setattr(self.obj, key, item)
147+
148+
149+
def set_indicator_indices(strategy, indicator_attrs, idx):
150+
for attr, indicator in indicator_attrs:
151+
# Slice indicator on the last dimension (case of 2d indicator)
152+
setattr(strategy, attr, indicator[..., :idx + 1])
153+
154+
155+
class LiveTrade:
156+
def __init__(self, backtest: Backtest) -> None:
157+
self.core = backtest
158+
159+
def init(self, **kwargs):
160+
self._df = self.core._data.copy(deep=False)
161+
self.data = _DataCachePatch(self._df)
162+
self.broker: _Broker = self.core._broker(data=self.data)
163+
self.strategy: Strategy = self.core._strategy(self.broker, self.data, kwargs)
164+
165+
self._mirror_data = _DataCachePatch(self._df)
166+
self._mirror_strategy = self.core._strategy(self.broker, self._mirror_data, kwargs)
167+
self._mirror_strategy.init()
168+
169+
self.data._update() # Strategy.init might have changed/added to data.df
170+
171+
self.indicator_attrs = _ObjectBindingDict(self._mirror_strategy,
172+
lambda x: isinstance(x, _Indicator))
173+
_ = {attr: indicator
174+
for attr, indicator in self.strategy.__dict__.items()
175+
if isinstance(indicator, _Indicator)}.items()
176+
177+
self.start_idx = 1 + max((np.isnan(indicator.astype(float)).argmin(axis=-1).max()
178+
for _, indicator in self.indicator_attrs), default=0)
179+
180+
self._current_idx = self.start_idx
181+
182+
def on_bar(self, bar: LiveMarketOhlcv):
183+
ts_idx = pd.Timestamp(bar.timestamp_second_start, unit='s')
184+
df = self._df
185+
last_ts = df.index[-1]
186+
if ts_idx <= last_ts:
187+
return
188+
data_to_set = {
189+
'Open': bar.open_price,
190+
'High': bar.high_price,
191+
'Low': bar.low_price,
192+
'Close': bar.close_price,
193+
'Volume': bar.total_qty
194+
}
195+
if type(df.iloc[0, 0]) is float or np.issubdtype(type(df.iloc[0, 0]), np.floating):
196+
df.loc[ts_idx] = {k: float(v) for k, v in data_to_set.items()}
197+
else:
198+
df.loc[ts_idx] = data_to_set
199+
self.broker._equity = np.append(self.broker._equity, [self.broker._equity[-1]])
200+
# for performance issue, this should be update partially
201+
# all indicators are re-calculate here
202+
self._mirror_data._set_length(len(df))
203+
self._mirror_strategy.init()
204+
205+
def run_next(self):
206+
err = self._run_with_ith_bar(self._current_idx)
207+
if err is None:
208+
self._current_idx += 1
209+
210+
def run(self, to_end=True, ntimes=0):
211+
if not to_end:
212+
ntimes = ntimes if ntimes > 0 else max(0, len(self._df) - self._current_idx)
213+
ntimes = min(ntimes, len(self._df))
214+
with np.errstate(invalid='ignore'):
215+
if to_end:
216+
iterator = range(self._current_idx, len(self._df))
217+
else:
218+
iterator = range(self._current_idx, self._current_idx+ntimes)
219+
for idx in iterator:
220+
err = self._w_run_with_ith_bar(idx)
221+
if err is not None: # outof-money
222+
break
223+
self._current_idx = len(self._df) # not sure in live mode, what to do if no money
224+
225+
def close_last_positions(self):
226+
for trade in self.broker.trades:
227+
trade.close()
228+
if self.start_idx < len(self._df):
229+
try_(self.broker.next, exception=_OutOfMoneyError)
230+
231+
def process_pending_orders(self):
232+
sz = self.broker.position.size
233+
self.broker._process_orders()
234+
sz_a = self.broker.position.size
235+
return sz_a - sz
236+
237+
def get_final_state(self):
238+
equity = pd.Series(self.broker._equity).bfill().fillna(self.broker._cash).values
239+
return compute_stats(
240+
trades=self.broker.closed_trades,
241+
equity=equity,
242+
ohlc_data=self._df,
243+
risk_free_rate=0.0,
244+
strategy_instance=self.strategy,
245+
)
246+
247+
def _run_with_ith_bar(self, idx):
248+
if idx >= len(self._df) or idx < self.start_idx:
249+
return False
250+
with np.errstate(invalid='ignore'):
251+
self._w_run_with_ith_bar(idx)
252+
253+
def _w_run_with_ith_bar(self, idx):
254+
self.data._set_length(idx + 1)
255+
set_indicator_indices(self.strategy, self.indicator_attrs, idx)
256+
try:
257+
self.broker.next()
258+
except _OutOfMoneyError:
259+
return False
260+
self.strategy.next()

0 commit comments

Comments
 (0)