Skip to content

Commit 5f7cda8

Browse files
Add support non-audited transactions.
This is usual for batch jobs that require 'transactional' access to VersionStore (i.e. read-modify-write without intervening update), but don't necessarily want to keep all version in perpetuity.
1 parent 23722a3 commit 5f7cda8

File tree

2 files changed

+30
-5
lines changed

2 files changed

+30
-5
lines changed

arctic/store/audit.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ArcticTransaction(object):
3131
call the `write` method of the context manager to output changes. The changes will only be written when the block
3232
exits.
3333
34-
NB changes are audited.
34+
NB changes may be audited.
3535
3636
Example:
3737
-------
@@ -44,7 +44,8 @@ class ArcticTransaction(object):
4444
retry the whole block should that happens, as the assumption is that you need to base your changes on a different
4545
starting timeseries.
4646
'''
47-
def __init__(self, version_store, symbol, user, log, modify_timeseries=None, *args, **kwargs):
47+
def __init__(self, version_store, symbol, user, log, modify_timeseries=None, audit=True,
48+
*args, **kwargs):
4849
'''
4950
Parameters
5051
----------
@@ -67,13 +68,20 @@ def __init__(self, version_store, symbol, user, log, modify_timeseries=None, *ar
6768
interacting with code that read in the data already and for some reason you cannot refactor the read-write
6869
operation to be contained within this context manager
6970
71+
audit: `bool`
72+
should we 'audit' the transaction. An audited write transaction is equivalent to a snapshot
73+
before and after the data change - i.e. we won't prune versions of the data involved in an
74+
audited transaction. This can be used to ensure that the history of certain data changes is
75+
preserved indefinitely.
76+
7077
all other args:
7178
Will be passed into the initial read
7279
'''
7380
self._version_store = version_store
7481
self._symbol = symbol
7582
self._user = user
7683
self._log = log
84+
self._audit = audit
7785
logger.info("MT: {}@{}: [{}] {}: {}".format(_get_host(version_store).get('l'),
7886
_get_host(version_store).get('mhost'),
7987
user, log, symbol)
@@ -136,4 +144,5 @@ def __exit__(self, *args, **kwargs):
136144
self._symbol, self.base_ts.version, written_ver.version))
137145

138146
changed = ChangedItem(self._symbol, self.base_ts, written_ver, None)
139-
self._version_store._write_audit(self._user, self._log, changed)
147+
if self._audit:
148+
self._version_store._write_audit(self._user, self._log, changed)

tests/unit/store/test_version_store_audit.py

+18-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,24 @@ def test_ConcurrentWriteBlock_simple():
2020
cwb.write(sentinel.symbol, pd.DataFrame(index=[3, 4], data={'a': [1.0, 2.0]}), metadata=sentinel.meta)
2121

2222
assert not vs._delete_version.called
23-
vs.write.assert_called_once_with(sentinel.symbol, ANY, prune_previous_version=True, metadata=sentinel.meta)
24-
vs.list_versions.assert_called_once_with(sentinel.symbol)
23+
assert vs.write.call_args_list == [call(sentinel.symbol, ANY, prune_previous_version=True, metadata=sentinel.meta)]
24+
assert vs.list_versions.call_args_list == [call(sentinel.symbol)]
25+
assert vs._write_audit.call_args_list == [call(sentinel.user, sentinel.log, ANY)]
26+
27+
28+
def test_ArticTransaction_no_audit():
29+
vs = create_autospec(VersionStore, _collection=Mock())
30+
ts1 = pd.DataFrame(index=[1, 2], data={'a':[1.0, 2.0]})
31+
vs.read.return_value = VersionedItem(symbol=sentinel.symbol, library=sentinel.library, version=1, metadata=None, data=ts1)
32+
vs.write.return_value = VersionedItem(symbol=sentinel.symbol, library=sentinel.library, version=2,
33+
metadata=None, data=None)
34+
vs.list_versions.return_value = [{'version': 2}, {'version': 1}]
35+
36+
with ArcticTransaction(vs, sentinel.symbol, sentinel.user, sentinel.log, audit=False) as cwb:
37+
cwb.write(sentinel.symbol, pd.DataFrame(index=[3, 4], data={'a': [1.0, 2.0]}), metadata=sentinel.meta)
38+
39+
assert vs.write.call_count == 1
40+
assert vs._write_audit.call_count == 0
2541

2642

2743
def test_ConcurrentWriteBlock_writes_if_metadata_changed():

0 commit comments

Comments
 (0)