Skip to content

Commit c5951f0

Browse files
authored
Support for audit logs in ChunkStore (pandas-dev#319)
1 parent 43840f7 commit c5951f0

File tree

3 files changed

+139
-17
lines changed

3 files changed

+139
-17
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
* Bugfix: #300 to_datetime deprecated in pandas, use to_pydatetime instead
55
* Bugfix: #309 formatting change for DateRange ```__str__```
66
* Feature: #313 set and read user specified metadata in chunkstore
7+
* Feature: #319 Audit log support in ChunkStor
78
* Bugfix: #216 Tickstore write fails with named index column
89

10+
911
### 1.36 (2016-12-13)
1012

1113
* Feature: Default to hashed based sharding

arctic/chunkstore/chunkstore.py

Lines changed: 80 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def __init__(self, arctic_lib):
8383
self._collection = arctic_lib.get_top_level_collection()
8484
self._symbols = self._collection.symbols
8585
self._mdata = self._collection.metadata
86+
self._audit = self._collection.audit
8687

8788
def __getstate__(self):
8889
return {'arctic_lib': self._arctic_lib}
@@ -107,7 +108,7 @@ def _checksum(self, fields, data):
107108
sha.update(data)
108109
return Binary(sha.digest())
109110

110-
def delete(self, symbol, chunk_range=None):
111+
def delete(self, symbol, chunk_range=None, audit=None):
111112
"""
112113
Delete all chunks for a symbol, or optionally, chunks within a range
113114
@@ -117,6 +118,8 @@ def delete(self, symbol, chunk_range=None):
117118
symbol name for the item
118119
chunk_range: range object
119120
a date range to delete
121+
audit: dict
122+
dict to store in the audit log
120123
"""
121124
if chunk_range is not None:
122125
sym = self._get_symbol_info(symbol)
@@ -144,6 +147,16 @@ def delete(self, symbol, chunk_range=None):
144147
self._collection.delete_many(query)
145148
self._symbols.delete_many(query)
146149
self._mdata.delete_many(query)
150+
151+
if audit is not None:
152+
audit['symbol'] = symbol
153+
if chunk_range is not None:
154+
audit['rows_deleted'] = row_adjust
155+
audit['action'] = 'range delete'
156+
else:
157+
audit['action'] = 'symbol delete'
158+
159+
self._audit.insert_one(audit)
147160

148161
def list_symbols(self, partial_match=None):
149162
"""
@@ -166,7 +179,7 @@ def list_symbols(self, partial_match=None):
166179
def _get_symbol_info(self, symbol):
167180
return self._symbols.find_one({SYMBOL: symbol})
168181

169-
def rename(self, from_symbol, to_symbol):
182+
def rename(self, from_symbol, to_symbol, audit=None):
170183
"""
171184
Rename a symbol
172185
@@ -176,6 +189,8 @@ def rename(self, from_symbol, to_symbol):
176189
the existing symbol that will be renamed
177190
to_symbol: str
178191
the new symbol name
192+
audit: dict
193+
audit information
179194
"""
180195

181196
sym = self._get_symbol_info(from_symbol)
@@ -191,6 +206,14 @@ def rename(self, from_symbol, to_symbol):
191206
{'$set': {SYMBOL: to_symbol}})
192207
mongo_retry(self._mdata.update_many)({SYMBOL: from_symbol},
193208
{'$set': {SYMBOL: to_symbol}})
209+
mongo_retry(self._audit.update_many)({'symbol': from_symbol},
210+
{'$set': {'symbol': to_symbol}})
211+
if audit is not None:
212+
audit['symbol'] = to_symbol
213+
audit['action'] = 'symbol rename'
214+
audit['old_symbol'] = from_symbol
215+
self._audit.insert_one(audit)
216+
194217

195218
def read(self, symbol, chunk_range=None, filter_data=True, **kwargs):
196219
"""
@@ -245,8 +268,25 @@ def read(self, symbol, chunk_range=None, filter_data=True, **kwargs):
245268
if not filter_data or chunk_range is None:
246269
return data
247270
return CHUNKER_MAP[sym[CHUNKER]].filter(data, chunk_range)
271+
272+
def read_audit_log(self, symbol=None):
273+
"""
274+
Reads the audit log
275+
276+
Parameters
277+
----------
278+
symbol: str
279+
optionally only retrieve specific symbol's audit information
280+
281+
Returns
282+
-------
283+
list of dicts
284+
"""
285+
if symbol:
286+
return [x for x in self._audit.find({'symbol': symbol}, {'_id': False})]
287+
return [x for x in self._audit.find({}, {'_id': False})]
248288

249-
def write(self, symbol, item, metadata=None, chunker=DateChunker(), **kwargs):
289+
def write(self, symbol, item, metadata=None, chunker=DateChunker(), audit=None, **kwargs):
250290
"""
251291
Writes data from item to symbol in the database
252292
@@ -260,6 +300,8 @@ def write(self, symbol, item, metadata=None, chunker=DateChunker(), **kwargs):
260300
optional per symbol metadata
261301
chunker: Object of type Chunker
262302
A chunker that chunks the data in item
303+
audit: dict
304+
audit information
263305
kwargs:
264306
optional keyword args that are passed to the chunker. Includes:
265307
chunk_size:
@@ -336,8 +378,13 @@ def write(self, symbol, item, metadata=None, chunker=DateChunker(), **kwargs):
336378
mongo_retry(self._symbols.update_one)({SYMBOL: symbol},
337379
{'$set': doc},
338380
upsert=True)
381+
if audit is not None:
382+
audit['symbol'] = symbol
383+
audit['action'] = 'write'
384+
audit['chunks'] = chunk_count
385+
self._audit.insert_one(audit)
339386

340-
def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=None):
387+
def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=None, audit=None):
341388
'''
342389
helper method used by update and append since they very closely
343390
resemble eachother. Really differ only by the combine method.
@@ -361,6 +408,8 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No
361408
op = False
362409
chunker = CHUNKER_MAP[sym[CHUNKER]]
363410

411+
appended = 0
412+
new_chunks = 0
364413
for start, end, _, record in chunker.to_chunks(item, chunk_size=sym[CHUNK_SIZE]):
365414
# read out matching chunks
366415
df = self.read(symbol, chunk_range=chunker.to_range(start, end), filter_data=False)
@@ -371,10 +420,12 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No
371420
if record is None or record.equals(df):
372421
continue
373422

374-
sym[APPEND_COUNT] += len(record)
423+
sym[APPEND_COUNT] += len(record) - len(df)
424+
appended += len(record) - len(df)
375425
sym[LEN] += len(record) - len(df)
376426
else:
377427
sym[CHUNK_COUNT] += 1
428+
new_chunks += 1
378429
sym[LEN] += len(record)
379430

380431
data = SER_MAP[sym[SERIALIZER]].serialize(record)
@@ -420,8 +471,14 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No
420471

421472
sym[USERMETA] = metadata
422473
self._symbols.replace_one({SYMBOL: symbol}, sym)
423-
424-
def append(self, symbol, item, metadata=None):
474+
if audit is not None:
475+
if new_chunks > 0:
476+
audit['new_chunks'] = new_chunks
477+
if appended > 0:
478+
audit['appended_rows'] = appended
479+
self._audit.insert_one(audit)
480+
481+
def append(self, symbol, item, metadata=None, audit=None):
425482
"""
426483
Appends data from item to symbol's data in the database.
427484
@@ -435,13 +492,18 @@ def append(self, symbol, item, metadata=None):
435492
the data to append
436493
metadata: ?
437494
optional per symbol metadata
495+
audit: dict
496+
optional audit information
438497
"""
439498
sym = self._get_symbol_info(symbol)
440499
if not sym:
441500
raise NoDataFoundException("Symbol does not exist.")
442-
self.__update(sym, item, metadata=metadata, combine_method=SER_MAP[sym[SERIALIZER]].combine)
501+
if audit is not None:
502+
audit['symbol'] = symbol
503+
audit['action'] = 'append'
504+
self.__update(sym, item, metadata=metadata, combine_method=SER_MAP[sym[SERIALIZER]].combine, audit=audit)
443505

444-
def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, **kwargs):
506+
def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, audit=None, **kwargs):
445507
"""
446508
Overwrites data in DB with data in item for the given symbol.
447509
@@ -462,6 +524,8 @@ def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, **
462524
original data.
463525
upsert: bool
464526
if True, will write the data even if the symbol does not exist.
527+
audit: dict
528+
optional audit information
465529
kwargs:
466530
optional keyword args passed to write during an upsert. Includes:
467531
chunk_size
@@ -470,15 +534,18 @@ def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, **
470534
sym = self._get_symbol_info(symbol)
471535
if not sym:
472536
if upsert:
473-
return self.write(symbol, item, metadata=metadata, **kwargs)
537+
return self.write(symbol, item, metadata=metadata, audit=audit, **kwargs)
474538
else:
475539
raise NoDataFoundException("Symbol does not exist.")
540+
if audit is not None:
541+
audit['symbol'] = symbol
542+
audit['action'] = 'update'
476543
if chunk_range is not None:
477544
if len(CHUNKER_MAP[sym[CHUNKER]].filter(item, chunk_range)) == 0:
478545
raise Exception('Range must be inclusive of data')
479-
self.__update(sym, item, metadata=metadata, combine_method=self.serializer.combine, chunk_range=chunk_range)
546+
self.__update(sym, item, metadata=metadata, combine_method=self.serializer.combine, chunk_range=chunk_range, audit=audit)
480547
else:
481-
self.__update(sym, item, metadata=metadata, combine_method=lambda old, new: new, chunk_range=chunk_range)
548+
self.__update(sym, item, metadata=metadata, combine_method=lambda old, new: new, chunk_range=chunk_range, audit=audit)
482549

483550
def get_info(self, symbol):
484551
"""
@@ -499,6 +566,7 @@ def get_info(self, symbol):
499566
ret = {}
500567
ret['chunk_count'] = sym[CHUNK_COUNT]
501568
ret['len'] = sym[LEN]
569+
ret['appended_rows'] = sym[APPEND_COUNT]
502570
ret['metadata'] = sym[METADATA]
503571
ret['chunker'] = sym[CHUNKER]
504572
ret['chunk_size'] = sym[CHUNK_SIZE]

tests/integration/chunkstore/test_chunkstore.py

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import random
99
import pytest
1010
import pymongo
11+
import pickle
1112

1213
from arctic.chunkstore.chunkstore import START, SYMBOL
1314
from arctic.chunkstore.passthrough_chunker import PassthroughChunker
@@ -661,6 +662,7 @@ def test_get_info(chunkstore_lib):
661662
)
662663
chunkstore_lib.write('test_df', df)
663664
info = {'len': 3,
665+
'appended_rows': 0,
664666
'chunk_count': 3,
665667
'metadata': {'columns': [u'date', u'id', u'data']},
666668
'chunker': u'date',
@@ -688,6 +690,7 @@ def test_get_info_after_append(chunkstore_lib):
688690
assert_frame_equal(chunkstore_lib.read('test_df'), pd.concat([df, df2]).sort_index())
689691

690692
info = {'len': 6,
693+
'appended_rows': 2,
691694
'chunk_count': 4,
692695
'metadata': {'columns': [u'date', u'id', u'data']},
693696
'chunker': u'date',
@@ -715,6 +718,7 @@ def test_get_info_after_update(chunkstore_lib):
715718
chunkstore_lib.update('test_df', df2)
716719

717720
info = {'len': 4,
721+
'appended_rows': 0,
718722
'chunk_count': 4,
719723
'metadata': {'columns': [u'date', u'id', u'data']},
720724
'chunker': u'date',
@@ -991,6 +995,10 @@ def test_rename(chunkstore_lib):
991995
with pytest.raises(Exception) as e:
992996
chunkstore_lib.rename('new_name', 'new_name')
993997
assert('already exists' in str(e))
998+
999+
with pytest.raises(NoDataFoundException) as e:
1000+
chunkstore_lib.rename('doesnt_exist', 'temp')
1001+
assert('No data found for doesnt_exist' in str(e))
9941002

9951003
assert('test' not in chunkstore_lib.list_symbols())
9961004

@@ -1225,12 +1233,12 @@ def test_stats(chunkstore_lib):
12251233

12261234

12271235
def test_metadata(chunkstore_lib):
1228-
df = DataFrame(data={'data': np.random.randint(0, 100, size=2)},
1236+
df = DataFrame(data={'data': np.random.randint(0, 100, size=2)},
12291237
index=pd.date_range('2016-01-01', '2016-01-02'))
1230-
df.index.name = 'date'
1231-
chunkstore_lib.write('data', df, metadata = 'some metadata')
1232-
m = chunkstore_lib.read_metadata('data')
1233-
assert(m == u'some metadata')
1238+
df.index.name = 'date'
1239+
chunkstore_lib.write('data', df, metadata = 'some metadata')
1240+
m = chunkstore_lib.read_metadata('data')
1241+
assert(m == u'some metadata')
12341242

12351243

12361244
def test_metadata_update(chunkstore_lib):
@@ -1281,3 +1289,47 @@ def test_write_metadata(chunkstore_lib):
12811289
def test_write_metadata_nosymbol(chunkstore_lib):
12821290
with pytest.raises(NoDataFoundException):
12831291
chunkstore_lib.write_metadata('doesnt_exist', 'meta')
1292+
1293+
1294+
def test_audit(chunkstore_lib):
1295+
df = DataFrame(data={'data': np.random.randint(0, 100, size=2)},
1296+
index=pd.date_range('2016-01-01', '2016-01-02'))
1297+
df.index.name = 'date'
1298+
chunkstore_lib.write('data', df, audit={'user': 'test_user'})
1299+
df = DataFrame(data={'data': np.random.randint(0, 100, size=10)},
1300+
index=pd.date_range('2016-01-01', '2016-01-10'))
1301+
df.index.name = 'date'
1302+
chunkstore_lib.write('data', df, audit={'user': 'other_user'})
1303+
1304+
assert(len(chunkstore_lib.read_audit_log()) == 2)
1305+
assert(len(chunkstore_lib.read_audit_log(symbol='data')) == 2)
1306+
assert(len(chunkstore_lib.read_audit_log(symbol='none')) == 0)
1307+
1308+
chunkstore_lib.append('data', df, audit={'user': 'test_user'})
1309+
assert(chunkstore_lib.read_audit_log()[-1]['appended_rows'] == 10)
1310+
1311+
df = DataFrame(data={'data': np.random.randint(0, 100, size=5)},
1312+
index=pd.date_range('2017-01-01', '2017-01-5'))
1313+
df.index.name = 'date'
1314+
chunkstore_lib.update('data', df, audit={'user': 'other_user'})
1315+
assert(chunkstore_lib.read_audit_log()[-1]['new_chunks'] == 5)
1316+
1317+
chunkstore_lib.rename('data', 'data_new', audit={'user': 'temp_user'})
1318+
assert(chunkstore_lib.read_audit_log()[-1]['action'] == 'symbol rename')
1319+
1320+
chunkstore_lib.delete('data_new', chunk_range=DateRange('2016-01-01', '2016-01-02'), audit={'user': 'test_user'})
1321+
chunkstore_lib.delete('data_new', audit={'user': 'test_user'})
1322+
assert(chunkstore_lib.read_audit_log()[-1]['action'] == 'symbol delete')
1323+
assert(chunkstore_lib.read_audit_log()[-2]['action'] == 'range delete')
1324+
1325+
1326+
def test_chunkstore_misc(chunkstore_lib):
1327+
1328+
p = pickle.dumps(chunkstore_lib)
1329+
c = pickle.loads(p)
1330+
assert(chunkstore_lib._arctic_lib.get_name() == c._arctic_lib.get_name())
1331+
1332+
assert("arctic_test.TEST" in str(chunkstore_lib))
1333+
assert(str(chunkstore_lib) == repr(chunkstore_lib))
1334+
1335+

0 commit comments

Comments
 (0)