|
| 1 | +from __future__ import print_function |
| 2 | + |
| 3 | +import argparse |
| 4 | +import random |
| 5 | +from collections import defaultdict |
| 6 | +from datetime import datetime as dt |
| 7 | +from datetime import timedelta as td |
| 8 | + |
| 9 | +import numpy as np |
| 10 | +import pandas as pd |
| 11 | +from dateutil.rrule import rrule, MINUTELY |
| 12 | + |
| 13 | +import arctic |
| 14 | +from arctic import Arctic |
| 15 | +from arctic._config import FwPointersCfg |
| 16 | +# import matplotlib.pyplot as plt |
| 17 | + |
| 18 | +price_template = (800.0, 1200.0) |
| 19 | + |
| 20 | +ONE_MIN_ATTRIBUTES = { |
| 21 | + 'BID': price_template, |
| 22 | + 'BID_TWAP': price_template, |
| 23 | + 'ASK': price_template, |
| 24 | + 'ASK_TWAP': price_template, |
| 25 | + 'HIGH': price_template, |
| 26 | + 'LOW': price_template, |
| 27 | + 'CLOSE': price_template, |
| 28 | + 'TWAP': price_template, |
| 29 | + 'ASKSIZE': (0.0, 400.0), |
| 30 | + 'BIDSIZE': (0.0, 400.0), |
| 31 | + 'TICK_COUNT': (1.0, 50.0), |
| 32 | + 'VOLUME': (0.0, 1000.0), |
| 33 | +} |
| 34 | + |
| 35 | +APPEND_NROWS = 10 |
| 36 | + |
| 37 | + |
| 38 | +class FwPointersCtx: |
| 39 | + def __init__(self, value_to_test, do_reconcile=False): |
| 40 | + self.value_to_test = value_to_test |
| 41 | + self.do_reconcile = do_reconcile |
| 42 | + |
| 43 | + def __enter__(self): |
| 44 | + self.orig_value = arctic.store._ndarray_store.ARCTIC_FORWARD_POINTERS_CFG |
| 45 | + arctic.store._ndarray_store.ARCTIC_FORWARD_POINTERS_CFG = self.value_to_test |
| 46 | + |
| 47 | + self.reconcile_orig_value = arctic.store._ndarray_store.ARCTIC_FORWARD_POINTERS_RECONCILE |
| 48 | + arctic.store._ndarray_store.ARCTIC_FORWARD_POINTERS_RECONCILE = self.do_reconcile |
| 49 | + |
| 50 | + def __exit__(self, *args): |
| 51 | + arctic.store._ndarray_store.ARCTIC_FORWARD_POINTERS_CFG = self.orig_value |
| 52 | + arctic.store._ndarray_store.ARCTIC_FORWARD_POINTERS_RECONCILE = self.reconcile_orig_value |
| 53 | + |
| 54 | + |
| 55 | +def gen_sparse_rows_for_range(n_rows, low, high, dense): |
| 56 | + if dense: |
| 57 | + return [random.uniform(low, high) for _ in range(n_rows)] |
| 58 | + current = 0 |
| 59 | + rows = [] |
| 60 | + while current < n_rows: |
| 61 | + value = float(random.randrange(low, high)) |
| 62 | + repetitions = min(random.randint(0, 20), n_rows - current) |
| 63 | + rows.extend([value] * repetitions) |
| 64 | + current += repetitions |
| 65 | + |
| 66 | + return rows |
| 67 | + |
| 68 | + |
| 69 | +def gen_one_minute_rows(n_rows, dense): |
| 70 | + data = {} |
| 71 | + for header, header_range in ONE_MIN_ATTRIBUTES.iteritems(): |
| 72 | + data[header] = gen_sparse_rows_for_range(n_rows, header_range[0], header_range[1], dense) |
| 73 | + |
| 74 | + return data |
| 75 | + |
| 76 | + |
| 77 | +def gen_oneminute_dataset(n_row, n_col, dense): |
| 78 | + timestamps = [] |
| 79 | + active_minutes_daily = 120 |
| 80 | + for day in range(0, n_row // 120): |
| 81 | + timestamps.extend(list(rrule(MINUTELY, count=active_minutes_daily, dtstart=dt(2005, 1, 1) + td(days=day)))) |
| 82 | + |
| 83 | + timestamps.extend(list(rrule( |
| 84 | + MINUTELY, |
| 85 | + count=n_row % active_minutes_daily, |
| 86 | + dtstart=dt(random.randrange(2006, 2016), 1, 1)), |
| 87 | + )) |
| 88 | + |
| 89 | + return pd.DataFrame( |
| 90 | + index=timestamps, |
| 91 | + data=gen_one_minute_rows(n_row, dense) |
| 92 | + ) |
| 93 | + |
| 94 | + |
| 95 | +def lib_name_from_args(config): |
| 96 | + return 'bench2_{cfg}'.format( |
| 97 | + cfg=config.name, |
| 98 | + ) |
| 99 | + |
| 100 | + |
| 101 | +def insert_random_data(config, args, n_rows): |
| 102 | + store = Arctic(args.mongodb, app_name="benchmark") |
| 103 | + lib_name = lib_name_from_args(config) |
| 104 | + store.delete_library(lib_name) |
| 105 | + store.initialize_library(lib_name, segment='month') |
| 106 | + lib = store[lib_name] |
| 107 | + |
| 108 | + for sym in range(args.symbols): |
| 109 | + df = gen_oneminute_dataset(n_row=n_rows, n_col=n_rows, dense=args.dense) |
| 110 | + lib.write('sym' + str(sym), df) |
| 111 | + |
| 112 | + |
| 113 | +def append_random_rows(config, args, n_rows): |
| 114 | + store = Arctic(args.mongodb, app_name="benchmark") |
| 115 | + lib_name = lib_name_from_args(config) |
| 116 | + |
| 117 | + lib = store[lib_name] |
| 118 | + |
| 119 | + for _ in range(args.appends): |
| 120 | + for sym in range(args.symbols): |
| 121 | + df = gen_oneminute_dataset(n_row=APPEND_NROWS, n_col=n_rows, dense=False) |
| 122 | + lib.append('sym' + str(sym), df) |
| 123 | + |
| 124 | + |
| 125 | +def read_all_symbols(config, args): |
| 126 | + store = Arctic(args.mongodb, app_name="benchmark") |
| 127 | + lib_name = lib_name_from_args(config) |
| 128 | + |
| 129 | + lib = store[lib_name] |
| 130 | + |
| 131 | + symbol_df = [] |
| 132 | + for sym in range(args.symbols): |
| 133 | + symbol_df.append(lib.read('sym' + str(sym))) |
| 134 | + |
| 135 | + # Basic sanity checks while reading back |
| 136 | + sample_df = symbol_df[0].data |
| 137 | + assert sorted(sample_df.dtypes) == ['float64'] * len(ONE_MIN_ATTRIBUTES) |
| 138 | + assert 800.0 <= sample_df['BID'][0] <= 1200.0 |
| 139 | + |
| 140 | + |
| 141 | +def mean_timedelta(timedelta_list): |
| 142 | + return np.sum(timedelta_list) / len(timedelta_list) |
| 143 | + |
| 144 | + |
| 145 | +def parse_args(): |
| 146 | + parser = argparse.ArgumentParser() |
| 147 | + |
| 148 | + parser.add_argument('-r', '--rounds', type=int, help="number of rounds to run benchmarks", default=2) |
| 149 | + parser.add_argument('-a', '--appends', type=int, help="number of appends for each symbol", default=75) |
| 150 | + parser.add_argument('-n', '--ndim', type=int, help="dimension of dataframe = size * size", default=500) |
| 151 | + parser.add_argument('-e', '--dense', help="Use dense or sparse (70 ish Nans) data", action="store_true") |
| 152 | + parser.add_argument('-d', '--mongodb', help="Mongo db endpoint.", default="127.0.0.1") |
| 153 | + parser.add_argument('-y', '--symbols', type=int, help="Total number of symbols to use", default=5) |
| 154 | + |
| 155 | + return parser.parse_args() |
| 156 | + |
| 157 | + |
| 158 | +def main(args): |
| 159 | + measure = defaultdict(list) |
| 160 | + data_size = [ |
| 161 | + 100, |
| 162 | + 500, |
| 163 | + 1000, |
| 164 | + # 5000, |
| 165 | + # 10000, |
| 166 | + # 200000 |
| 167 | + ] |
| 168 | + print('Arguments=', args) |
| 169 | + |
| 170 | + for fwd_ptr in [FwPointersCfg.ENABLED, FwPointersCfg.DISABLED, FwPointersCfg.HYBRID]: |
| 171 | + for n_rows in data_size: |
| 172 | + for rounds in range(1, args.rounds + 1): |
| 173 | + with FwPointersCtx(fwd_ptr): |
| 174 | + w_start = dt.now() |
| 175 | + # Writes data to lib with above config. |
| 176 | + insert_random_data(fwd_ptr, args, n_rows) |
| 177 | + w_end = dt.now() |
| 178 | + # Appends multiple rows to each symbol |
| 179 | + append_random_rows(fwd_ptr, args, n_rows) |
| 180 | + a_end = dt.now() |
| 181 | + # Read everything. |
| 182 | + read_all_symbols(fwd_ptr, args) |
| 183 | + r_end = dt.now() |
| 184 | + print('read time=', r_end - a_end) |
| 185 | + measure[n_rows].append( |
| 186 | + { |
| 187 | + 'dfsize': (n_rows, len(ONE_MIN_ATTRIBUTES)), |
| 188 | + 'wtime': w_end - w_start, |
| 189 | + 'atime': a_end - w_end, |
| 190 | + 'rtime': r_end - a_end, |
| 191 | + 'fwd': fwd_ptr |
| 192 | + } |
| 193 | + ) |
| 194 | + |
| 195 | + enabled_reads = {} |
| 196 | + disabled_reads = {} |
| 197 | + |
| 198 | + for dsize in data_size: |
| 199 | + enabled_reads[dsize] = mean_timedelta( |
| 200 | + [data['rtime'] for data in measure[dsize] if data['fwd'] == FwPointersCfg.ENABLED]) |
| 201 | + disabled_reads[dsize] = mean_timedelta( |
| 202 | + [data['rtime'] for data in measure[dsize] if data['fwd'] == FwPointersCfg.DISABLED]) |
| 203 | + |
| 204 | + print('enabled read times=', enabled_reads) |
| 205 | + print('disabled read times=', disabled_reads) |
| 206 | + |
| 207 | + |
| 208 | +if __name__ == '__main__': |
| 209 | + main(parse_args()) |
0 commit comments