Skip to content

Commit 99b956e

Browse files
authored
Merge pull request pandas-dev#715 from shashank88/remove_changes_coll
Fixes pandas-dev#714 Do not write to changes collection
2 parents 9f32653 + b1862e4 commit 99b956e

File tree

4 files changed

+7
-149
lines changed

4 files changed

+7
-149
lines changed

arctic/fixtures/arctic.py

-76
Original file line numberDiff line numberDiff line change
@@ -77,47 +77,6 @@ def multicolumn_store_with_uncompressed_write(mongo_server):
7777
}
7878
])
7979
db.TEST.ARCTIC.update_one({"_id": "ARCTIC_META"}, {"$set": {"_id": "ARCTIC_META", "TYPE": "VersionStore", "QUOTA": 10737418240}})
80-
db.TEST.changes.insert_many([
81-
{
82-
'append_count': 0,
83-
'dtype_metadata': {
84-
'index': ['index'],
85-
'columns': ["('a', 'a')", "('w', 'v')"]
86-
},
87-
'segment_count': 1,
88-
'dtype': '[(\'index\', \'S1\'), ("(\'a\', \'a\')", \'<i8\'), ("(\'w\', \'v\')", \'<i8\')]',
89-
'symbol': 'pandas',
90-
'up_to': 1,
91-
'metadata': None,
92-
'sha': bson.Binary(b'\xf2\x15h\x9d\x925\x95\xa5\x0e\x95J\xc4x\xfc\xfc\xd5\x80\xe0\x1d\xef', 0),
93-
'shape': [-1],
94-
'version': 1,
95-
'base_sha': bson.Binary(b'\xf2\x15h\x9d\x925\x95\xa5\x0e\x95J\xc4x\xfc\xfc\xd5\x80\xe0\x1d\xef', 0),
96-
'_id': bson.ObjectId('5ad0dc065c911d1188b512d8'),
97-
'type': 'pandasdf',
98-
'append_size': 0
99-
},
100-
{
101-
'append_count': 1,
102-
'dtype_metadata': {
103-
'index': ['index'],
104-
'columns': ["('a', 'a')", "('w', 'v')"]
105-
},
106-
'segment_count': 2,
107-
'sha': bson.Binary(b'1\x83[ZO\xec\x080D\x80f\xe4@\xe4\xd3\x94yG\xe2\x08', 0),
108-
'dtype': '[(\'index\', \'S1\'), ("(\'a\', \'a\')", \'<i8\'), ("(\'w\', \'v\')", \'<i8\')]',
109-
'symbol': 'pandas',
110-
'up_to': 2,
111-
'metadata': None,
112-
'base_version_id': bson.ObjectId('5ad0dc065c911d1188b512d8'),
113-
'shape': [-1],
114-
'version': 2,
115-
'base_sha': bson.Binary(b'\xf2\x15h\x9d\x925\x95\xa5\x0e\x95J\xc4x\xfc\xfc\xd5\x80\xe0\x1d\xef', 0),
116-
'_id': bson.ObjectId('5ad0dc075c911d1188b512d9'),
117-
'type': 'pandasdf',
118-
'append_size': 17
119-
}
120-
])
12180
db.TEST.version_nums.insert_one({'symbol': 'pandas', '_id': bson.ObjectId('5ad0dc067934ecad404070bd'), 'version': 2})
12281
db.TEST.versions.insert_many([
12382
{
@@ -206,41 +165,6 @@ def ndarray_store_with_uncompressed_write(mongo_server):
206165
}
207166
])
208167
db.TEST.ARCTIC.update_one({"_id": "ARCTIC_META"}, {"$set": {"_id": "ARCTIC_META", "TYPE": "VersionStore", "QUOTA": 10737418240}})
209-
db.TEST.changes.insert_many([
210-
{
211-
"_id": bson.ObjectId("5ad0742c5c911d4d80ee2ea3"),
212-
"append_count": 0,
213-
"dtype_metadata": {},
214-
"segment_count": 1,
215-
"dtype": "[('abc', '<i8')]",
216-
"symbol": "MYARR",
217-
"up_to": 1,
218-
"append_size": 0,
219-
"sha": bson.Binary(base64.b64decode("Bf5AV1MWbxJVWefJrFWGVPEHx+k="), 0),
220-
"shape": [-1],
221-
"version": 1,
222-
"base_sha": bson.Binary(base64.b64decode("Bf5AV1MWbxJVWefJrFWGVPEHx+k="), 0),
223-
"type": "ndarray",
224-
"metadata": None
225-
},
226-
{
227-
"_id": bson.ObjectId("5ad0742c5c911d4d80ee2ea4"),
228-
"append_count": 1,
229-
"dtype_metadata": {},
230-
"segment_count": 2,
231-
"base_version_id": bson.ObjectId("5ad0742c5c911d4d80ee2ea3"),
232-
"dtype": "[('abc', '<i8')]",
233-
"symbol": "MYARR",
234-
"up_to": 2,
235-
"append_size": 8,
236-
"sha": bson.Binary(base64.b64decode("Ax7oBxVFw1/9wKog2gfOLjbOVD8="), 0),
237-
"shape": [-1],
238-
"version": 2,
239-
"base_sha": bson.Binary(base64.b64decode("Bf5AV1MWbxJVWefJrFWGVPEHx+k="), 0),
240-
"type": "ndarray",
241-
"metadata": None
242-
}
243-
])
244168
db.TEST.versions_nums.insert_one({"_id": bson.ObjectId("5ad0742ca0949de6727cf993"), "symbol": "MYARR", "version": 2})
245169
db.TEST.versions.insert_many([
246170
{

arctic/store/version_store.py

-17
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,6 @@ class VersionStore(object):
5050
def initialize_library(cls, arctic_lib, hashed=True, **kwargs):
5151
c = arctic_lib.get_top_level_collection()
5252

53-
if '%s.changes' % c.name not in mongo_retry(c.database.list_collection_names)():
54-
# 32MB buffer for change notifications
55-
mongo_retry(c.database.create_collection)('%s.changes' % c.name, capped=True, size=32 * 1024 * 1024)
56-
5753
if 'strict_write_handler' in kwargs:
5854
arctic_lib.set_library_metadata('STRICT_WRITE_HANDLER_MATCH',
5955
bool(kwargs.pop('strict_write_handler')))
@@ -109,9 +105,6 @@ def _reset(self):
109105
self._snapshots = self._collection.snapshots
110106
self._versions = self._collection.versions
111107
self._version_nums = self._collection.version_nums
112-
self._publish_changes = '%s.changes' % self._collection.name in self._collection.database.list_collection_names()
113-
if self._publish_changes:
114-
self._changes = self._collection.changes
115108

116109
def __getstate__(self):
117110
return {'arctic_lib': self._arctic_lib}
@@ -611,8 +604,6 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser
611604
else:
612605
raise Exception("Append not implemented for handler %s" % handler)
613606

614-
self._publish_change(symbol, version)
615-
616607
if prune_previous_version and previous_version:
617608
# Does not allow prune to remove the base of the new version
618609
self._prune_previous_versions(symbol, keep_version=version.get('base_version_id'),
@@ -626,10 +617,6 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser
626617
metadata=version.pop('metadata', None), data=None,
627618
host=self._arctic_lib.arctic.mongo_host)
628619

629-
def _publish_change(self, symbol, version):
630-
if self._publish_changes:
631-
mongo_retry(self._changes.insert_one)(version)
632-
633620
@mongo_retry
634621
def write(self, symbol, data, metadata=None, prune_previous_version=True, **kwargs):
635622
"""
@@ -673,8 +660,6 @@ def write(self, symbol, data, metadata=None, prune_previous_version=True, **kwar
673660
if prune_previous_version and previous_version:
674661
self._prune_previous_versions(symbol, new_version_shas=version.get(FW_POINTERS_REFS_KEY))
675662

676-
self._publish_change(symbol, version)
677-
678663
# Insert the new version into the version DB
679664
self._insert_version(version)
680665

@@ -723,8 +708,6 @@ def _add_new_version_using_reference(self, symbol, new_version, reference_versio
723708

724709
logger.debug('Finished updating versions with new metadata for %s', symbol)
725710

726-
self._publish_change(symbol, new_version)
727-
728711
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=new_version['version'],
729712
metadata=new_version.get('metadata'), data=None,
730713
host=self._arctic_lib.arctic.mongo_host)

tests/integration/store/test_version_store.py

-31
Original file line numberDiff line numberDiff line change
@@ -1564,24 +1564,6 @@ def test_append_does_not_duplicate_data_when_prune_fails(library, fw_pointers_cf
15641564
assert len(set(data.index)) == len(data.index)
15651565

15661566

1567-
@pytest.mark.parametrize('fw_pointers_cfg', [FwPointersCfg.DISABLED, FwPointersCfg.HYBRID, FwPointersCfg.ENABLED])
1568-
def test_append_does_not_duplicate_data_when_publish_fails(library, fw_pointers_cfg):
1569-
with FwPointersCtx(fw_pointers_cfg):
1570-
side_effect = [OperationFailure(0), arctic.store.version_store.VersionStore._publish_change]
1571-
new_data = read_str_as_pandas("""times | near
1572-
2013-01-01 17:06:11.040 | 7.0
1573-
2013-01-02 17:06:11.040 | 8.2
1574-
2013-01-03 17:06:11.040 | 3.5
1575-
2013-01-04 17:06:11.040 | 0.7""")
1576-
library.write(symbol, ts1)
1577-
1578-
with patch.object(arctic.store.version_store.VersionStore, "_publish_change", autospec=True, side_effect=side_effect):
1579-
library.append(symbol, new_data)
1580-
1581-
data = library.read(symbol).data
1582-
assert len(set(data.index)) == len(data.index)
1583-
1584-
15851567
@pytest.mark.parametrize('fw_pointers_cfg', [FwPointersCfg.DISABLED, FwPointersCfg.HYBRID, FwPointersCfg.ENABLED])
15861568
def test_write_does_not_succeed_with_a_prune_error(library, fw_pointers_cfg):
15871569
with FwPointersCtx(fw_pointers_cfg):
@@ -1596,19 +1578,6 @@ def test_write_does_not_succeed_with_a_prune_error(library, fw_pointers_cfg):
15961578
assert len(library.list_versions(symbol)) == 1
15971579

15981580

1599-
@pytest.mark.parametrize('fw_pointers_cfg', [FwPointersCfg.DISABLED, FwPointersCfg.HYBRID, FwPointersCfg.ENABLED])
1600-
def test_write_does_not_succeed_with_a_publish_error(library, fw_pointers_cfg):
1601-
with FwPointersCtx(fw_pointers_cfg):
1602-
# More than max retries OperationFailure would be more realistic, but ValueError is used for simplicity
1603-
side_effect = [ValueError, arctic.store.version_store.VersionStore._publish_change]
1604-
1605-
with patch.object(arctic.store.version_store.VersionStore, "_publish_change", autospec=True, side_effect=side_effect):
1606-
with pytest.raises(ValueError):
1607-
library.append(symbol, ts1)
1608-
1609-
assert not library.list_versions(symbol)
1610-
1611-
16121581
@pytest.mark.parametrize('fw_pointers_cfg', [FwPointersCfg.DISABLED, FwPointersCfg.HYBRID, FwPointersCfg.ENABLED])
16131582
def test_prune_keeps_version(library, fw_pointers_cfg):
16141583
with FwPointersCtx(fw_pointers_cfg):

tests/unit/store/test_version_store.py

+7-25
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,7 @@ def test_write_check_quota():
159159
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': 1})),
160160
_versions=Mock(insert_one=lambda x: None),
161161
_arctic_lib=create_autospec(ArcticLibraryBinding,
162-
arctic=create_autospec(Arctic, mongo_host='some_host')),
163-
_publish_changes=False)
162+
arctic=create_autospec(Arctic, mongo_host='some_host')))
164163
vs._collection.database.connection.nodes = []
165164
vs._write_handler.return_value = write_handler
166165
VersionStore.write(vs, 'sym', sentinel.data, prune_previous_version=False)
@@ -367,7 +366,6 @@ def test_write_metadata_with_previous_data():
367366
assert expected_ret_val == VersionStore.write_metadata(vs, symbol=TEST_SYMBOL, metadata=META_TO_WRITE)
368367
assert vs._versions.insert_one.call_args_list == [call(expected_new_version)]
369368
assert vs._versions.delete_one.called is False
370-
assert vs._publish_change.call_args_list == [call(TEST_SYMBOL, expected_new_version)]
371369
assert vs.write.called is False
372370

373371

@@ -393,7 +391,6 @@ def test_write_empty_metadata():
393391
assert expected_ret_val == VersionStore.write_metadata(vs, symbol=TEST_SYMBOL, metadata=None)
394392
assert vs._versions.insert_one.call_args_list == [call(expected_new_version)]
395393
assert vs._versions.delete_one.called is False
396-
assert vs._publish_change.call_args_list == [call(TEST_SYMBOL, expected_new_version)]
397394
assert vs.write.called is False
398395

399396

@@ -404,7 +401,6 @@ def test_write_metadata_insert_version_dupkeyerror():
404401
VersionStore.write_metadata(vs, symbol=TEST_SYMBOL, metadata=META_TO_WRITE)
405402
assert vs._version_nums.find_one_and_update.call_count == 2
406403
assert vs._versions.insert_one.call_count == 2
407-
assert vs._publish_change.call_count == 1
408404

409405

410406
def test_write_metadata_insert_version_opfailure():
@@ -414,7 +410,6 @@ def test_write_metadata_insert_version_opfailure():
414410
VersionStore.write_metadata(vs, symbol=TEST_SYMBOL, metadata=META_TO_WRITE)
415411
assert vs._version_nums.find_one_and_update.call_count == 1
416412
assert vs._versions.insert_one.call_count == 2
417-
assert vs._publish_change.call_count == 1
418413

419414

420415
def test_restore_version():
@@ -455,7 +450,6 @@ def test_restore_version_data_missing_symbol():
455450
as_of=TPL_VERSION['version'], prune_previous_version=True)
456451
assert vs._read_metadata.call_args_list == [call(TEST_SYMBOL, as_of=TPL_VERSION['version'])]
457452
assert vs._versions.insert_one.called is False
458-
assert vs._publish_change.called is False
459453

460454

461455
def test_restore_last_version():
@@ -488,8 +482,7 @@ def test_write_error_clean_retry():
488482
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': 1})),
489483
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one")),
490484
_arctic_lib=create_autospec(ArcticLibraryBinding,
491-
arctic=create_autospec(Arctic, mongo_host='some_host')),
492-
_publish_changes=False)
485+
arctic=create_autospec(Arctic, mongo_host='some_host')))
493486
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
494487
vs._collection.database.connection.nodes = []
495488
vs._write_handler.return_value = write_handler
@@ -498,7 +491,6 @@ def test_write_error_clean_retry():
498491
assert vs._versions.find_one.call_count == 2
499492
assert write_handler.write.call_count == 2
500493
assert vs._versions.insert_one.call_count == 1
501-
assert vs._publish_change.call_count == 1
502494

503495

504496
def test_write_insert_version_duplicatekey():
@@ -508,8 +500,7 @@ def test_write_insert_version_duplicatekey():
508500
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': 1})),
509501
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one")),
510502
_arctic_lib=create_autospec(ArcticLibraryBinding,
511-
arctic=create_autospec(Arctic, mongo_host='some_host')),
512-
_publish_changes=False)
503+
arctic=create_autospec(Arctic, mongo_host='some_host')))
513504
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
514505
vs._versions.insert_one.side_effect = [DuplicateKeyError("dup key error"), None]
515506
vs._collection.database.connection.nodes = []
@@ -518,7 +509,6 @@ def test_write_insert_version_duplicatekey():
518509
assert vs._version_nums.find_one_and_update.call_count == 2
519510
assert vs._versions.find_one.call_count == 2
520511
assert write_handler.write.call_count == 2
521-
assert vs._publish_change.call_count == 2
522512
assert vs._versions.insert_one.call_count == 2
523513

524514

@@ -529,8 +519,7 @@ def test_write_insert_version_operror():
529519
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': 1})),
530520
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one")),
531521
_arctic_lib=create_autospec(ArcticLibraryBinding,
532-
arctic=create_autospec(Arctic, mongo_host='some_host')),
533-
_publish_changes=False)
522+
arctic=create_autospec(Arctic, mongo_host='some_host')))
534523
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
535524
vs._versions.insert_one.side_effect = [OperationFailure("mongo op error"), None]
536525
vs._collection.database.connection.nodes = []
@@ -540,7 +529,6 @@ def test_write_insert_version_operror():
540529
assert vs._versions.find_one.call_count == 1
541530
assert write_handler.write.call_count == 1
542531
assert vs._versions.insert_one.call_count == 2
543-
assert vs._publish_change.call_count == 1
544532

545533

546534
def test_append_error_clean_retry():
@@ -553,8 +541,7 @@ def test_append_error_clean_retry():
553541
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': previous_version['version']+1})),
554542
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one", return_value=previous_version)),
555543
_arctic_lib=create_autospec(ArcticLibraryBinding,
556-
arctic=create_autospec(Arctic, mongo_host='some_host')),
557-
_publish_changes=False)
544+
arctic=create_autospec(Arctic, mongo_host='some_host')))
558545
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
559546
vs._collection.database.connection.nodes = []
560547
vs._read_handler.return_value = read_handler
@@ -563,7 +550,6 @@ def test_append_error_clean_retry():
563550
assert vs._versions.find_one.call_count == 2
564551
assert read_handler.append.call_count == 2
565552
assert vs._versions.insert_one.call_count == 1
566-
assert vs._publish_change.call_count == 1
567553

568554

569555
def test_append_insert_version_duplicatekey():
@@ -575,8 +561,7 @@ def test_append_insert_version_duplicatekey():
575561
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': previous_version['version']+1})),
576562
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one", return_value=previous_version)),
577563
_arctic_lib=create_autospec(ArcticLibraryBinding,
578-
arctic=create_autospec(Arctic, mongo_host='some_host')),
579-
_publish_changes=False)
564+
arctic=create_autospec(Arctic, mongo_host='some_host')))
580565
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
581566
vs._versions.insert_one.side_effect = [DuplicateKeyError("dup key error"), None]
582567
vs._collection.database.connection.nodes = []
@@ -586,7 +571,6 @@ def test_append_insert_version_duplicatekey():
586571
assert vs._versions.find_one.call_count == 2
587572
assert read_handler.append.call_count == 2
588573
assert vs._versions.insert_one.call_count == 2
589-
assert vs._publish_change.call_count == 2
590574

591575
def test_append_insert_version_operror():
592576
read_handler = Mock(append=Mock(__name__=""))
@@ -597,8 +581,7 @@ def test_append_insert_version_operror():
597581
_version_nums=Mock(find_one_and_update=Mock(return_value={'version': previous_version['version']+1})),
598582
_versions=Mock(insert_one=Mock(__name__="insert_one"), find_one=Mock(__name__="find_one", return_value=previous_version)),
599583
_arctic_lib=create_autospec(ArcticLibraryBinding,
600-
arctic=create_autospec(Arctic, mongo_host='some_host')),
601-
_publish_changes=False)
584+
arctic=create_autospec(Arctic, mongo_host='some_host')))
602585
vs._insert_version = lambda version: VersionStore._insert_version(vs, version)
603586
vs._versions.insert_one.side_effect = [OperationFailure("mongo op error"), None]
604587
vs._collection.database.connection.nodes = []
@@ -608,4 +591,3 @@ def test_append_insert_version_operror():
608591
assert vs._versions.find_one.call_count == 1
609592
assert read_handler.append.call_count == 1
610593
assert vs._versions.insert_one.call_count == 2
611-
assert vs._publish_change.call_count == 1

0 commit comments

Comments
 (0)