Skip to content

Commit bdb2868

Browse files
authored
Merge pull request pandas-dev#597 from dimosped/fix-pymongo-warning-collection-names
fix failing integration tests on travis due to list_collection_names deprecation
2 parents 260c786 + 4ef21ca commit bdb2868

23 files changed

+202
-139
lines changed

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ Arctic storage implementations are **pluggable**. VersionStore is the default.
111111
Arctic currently works with:
112112

113113
* Python 2.7, 3.4, 3.5, 3.6
114-
* pymongo >= 3.0
114+
* pymongo >= 3.6
115115
* Pandas
116116
* MongoDB >= 2.4.x
117117

@@ -137,6 +137,7 @@ It wouldn't be possible without the work of the AHL Data Engineering Team includ
137137
* [Wilfred Hughes](https://github.com/wilfred)
138138
* [Edward Easton](https://github.com/eeaston)
139139
* [Bryant Moscon](https://github.com/bmoscon)
140+
* [Dimosthenis Pediaditakis](https://github.com/dimosped)
140141
* ... and many others ...
141142

142143
Contributions welcome!

arctic/_util.py

+26-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
11
from pandas import DataFrame
22
from pandas.util.testing import assert_frame_equal
3-
from pymongo.errors import OperationFailure
43
import logging
4+
import pymongo
5+
56

67
logger = logging.getLogger(__name__)
78

9+
# Avoid import-time extra logic
10+
_use_new_count_api = None
11+
12+
13+
def _detect_new_count_api():
14+
try:
15+
mongo_v = [int(v) for v in pymongo.version.split('.')]
16+
return mongo_v[0] >= 3 and mongo_v[1] >= 7
17+
except:
18+
return False
19+
820

921
def indent(s, num_spaces):
1022
s = s.split('\n')
@@ -47,7 +59,7 @@ def enable_sharding(arctic, library_name, hashed=True, key='symbol'):
4759
library_name = lib.get_top_level_collection().name
4860
try:
4961
c.admin.command('enablesharding', dbname)
50-
except OperationFailure as e:
62+
except pymongo.errors.OperationFailure as e:
5163
if 'already enabled' not in str(e):
5264
raise
5365
if not hashed:
@@ -56,3 +68,15 @@ def enable_sharding(arctic, library_name, hashed=True, key='symbol'):
5668
else:
5769
logger.info("Hash sharding '" + key + "' on: " + dbname + '.' + library_name)
5870
c.admin.command('shardCollection', dbname + '.' + library_name, key={key: 'hashed'})
71+
72+
73+
def mongo_count(collection, filter=None, **kwargs):
74+
filter = {} if filter is None else filter
75+
global _use_new_count_api
76+
_use_new_count_api = _detect_new_count_api() if _use_new_count_api is None else _use_new_count_api
77+
# This is a temporary compatibility fix for compatibility with pymongo>=3.7, and also avoid deprecation warnings
78+
if _use_new_count_api:
79+
# Projection is ignored for count_documents
80+
return collection.count_documents(filter=filter, **kwargs)
81+
else:
82+
return collection.count(filter=filter, **kwargs)

arctic/arctic.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,11 @@ def list_libraries(self):
181181
libs = []
182182
for db in self._conn.list_database_names():
183183
if db.startswith(self.DB_PREFIX + '_'):
184-
for coll in self._conn[db].collection_names():
184+
for coll in self._conn[db].list_collection_names():
185185
if coll.endswith(self.METADATA_COLL):
186186
libs.append(db[len(self.DB_PREFIX) + 1:] + "." + coll[:-1 * len(self.METADATA_COLL) - 1])
187187
elif db == self.DB_PREFIX:
188-
for coll in self._conn[db].collection_names():
188+
for coll in self._conn[db].list_collection_names():
189189
if coll.endswith(self.METADATA_COLL):
190190
libs.append(coll[:-1 * len(self.METADATA_COLL) - 1])
191191
return libs
@@ -212,9 +212,9 @@ def initialize_library(self, library, lib_type=VERSION_STORE, **kwargs):
212212
# check that we don't create too many namespaces
213213
# can be disabled check_library_count=False
214214
check_library_count = kwargs.pop('check_library_count', True)
215-
if len(self._conn[l.database_name].collection_names()) > 5000 and check_library_count:
215+
if len(self._conn[l.database_name].list_collection_names()) > 5000 and check_library_count:
216216
raise ArcticException("Too many namespaces %s, not creating: %s" %
217-
(len(self._conn[l.database_name].collection_names()), library))
217+
(len(self._conn[l.database_name].list_collection_names()), library))
218218
l.set_library_type(lib_type)
219219
LIBRARY_TYPES[lib_type].initialize_library(l, **kwargs)
220220
# Add a 10G quota just in case the user is calling this with API.
@@ -233,11 +233,11 @@ def delete_library(self, library):
233233
"""
234234
l = ArcticLibraryBinding(self, library)
235235
colname = l.get_top_level_collection().name
236-
if not [c for c in l._db.collection_names(False) if re.match(r"^{}([\.].*)?$".format(colname), c)]:
236+
if not [c for c in l._db.list_collection_names(False) if re.match(r"^{}([\.].*)?$".format(colname), c)]:
237237
logger.info('Nothing to delete. Arctic library %s does not exist.' % colname)
238238
logger.info('Dropping collection: %s' % colname)
239239
l._db.drop_collection(colname)
240-
for coll in l._db.collection_names():
240+
for coll in l._db.list_collection_names():
241241
if coll.startswith(colname + '.'):
242242
logger.info('Dropping collection: %s' % coll)
243243
l._db.drop_collection(coll)
@@ -352,7 +352,7 @@ def rename_library(self, from_lib, to_lib):
352352

353353
logger.info('Renaming collection: %s' % colname)
354354
l._db[colname].rename(to_colname)
355-
for coll in l._db.collection_names():
355+
for coll in l._db.list_collection_names():
356356
if coll.startswith(colname + '.'):
357357
l._db[coll].rename(coll.replace(colname, to_colname))
358358

arctic/chunkstore/chunkstore.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from pymongo.errors import OperationFailure
1212

1313
from ..decorators import mongo_retry
14-
from .._util import indent
14+
from .._util import indent, mongo_count
1515
from ..serialization.numpy_arrays import FrametoArraySerializer, DATA, METADATA, COLUMNS
1616
from .date_chunker import DateChunker, START, END
1717
from .passthrough_chunker import PassthroughChunker
@@ -87,7 +87,7 @@ def _check_invalid_segment(self):
8787
# Issue 442
8888
# for legacy data that was incorectly marked with segment start of -1
8989
for symbol in self.list_symbols():
90-
if self._collection.find({SYMBOL: symbol, SEGMENT: -1}).count() > 1:
90+
if mongo_count(self._collection, filter={SYMBOL: symbol, SEGMENT: -1}) > 1:
9191
logger.warning("Symbol %s has malformed segments. Data must be rewritten or fixed with chunkstore segment_id_repair tool" % symbol)
9292

9393
@mongo_retry
@@ -153,7 +153,7 @@ def delete(self, symbol, chunk_range=None, audit=None):
153153
# update symbol metadata (rows and chunk count)
154154
sym = self._get_symbol_info(symbol)
155155
sym[LEN] -= row_adjust
156-
sym[CHUNK_COUNT] = self._collection.count({SYMBOL: symbol})
156+
sym[CHUNK_COUNT] = mongo_count(self._collection, filter={SYMBOL: symbol})
157157
self._symbols.replace_one({SYMBOL: symbol}, sym)
158158

159159
else:
@@ -458,7 +458,7 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No
458458
meta = data[METADATA]
459459

460460
chunk_count = int(len(data[DATA]) / MAX_CHUNK_SIZE + 1)
461-
seg_count = self._collection.count({SYMBOL: symbol, START: start, END: end})
461+
seg_count = mongo_count(self._collection, filter={SYMBOL: symbol, START: start, END: end})
462462
# remove old segments for this chunk in case we now have less
463463
# segments than we did before
464464
if seg_count > chunk_count:

arctic/fixtures/arctic.py

+12
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,12 @@ def user_library(arctic, user_library_name):
324324
def overlay_library(arctic, overlay_library_name):
325325
""" Overlay library fixture, returns a pair of libs, read-write: ${name} and read-only: ${name}_RAW
326326
"""
327+
# Call _create_overlay_library to avoid:
328+
# RemovedInPytest4Warning: Fixture overlay_library called directly. Fixtures are not meant to be called directly
329+
return _overlay_library(arctic, overlay_library)
330+
331+
332+
def _overlay_library(arctic, overlay_library_name):
327333
rw_name = overlay_library_name
328334
ro_name = '{}_RAW'.format(overlay_library_name)
329335
arctic.initialize_library(rw_name, m.VERSION_STORE, segment='year')
@@ -333,6 +339,12 @@ def overlay_library(arctic, overlay_library_name):
333339

334340
@pytest.fixture(scope="function")
335341
def tickstore_lib(arctic, library_name):
342+
# Call _create_overlay_library to avoid:
343+
# RemovedInPytest4Warning: Fixture overlay_library called directly. Fixtures are not meant to be called directly
344+
return _tickstore_lib(arctic, library_name)
345+
346+
347+
def _tickstore_lib(arctic, library_name):
336348
arctic.initialize_library(library_name, TICK_STORE_TYPE)
337349
return arctic.get_library(library_name)
338350

arctic/store/_ndarray_store.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import pymongo
99
from pymongo.errors import OperationFailure, DuplicateKeyError
1010

11+
from arctic._util import mongo_count
1112
from ..decorators import mongo_retry
1213
from ..exceptions import UnhandledDtypeException, DataIntegrityException
1314
from ._version_store_utils import checksum, version_base_or_id, _fast_check_corruption
@@ -455,8 +456,7 @@ def check_written(self, collection, symbol, version):
455456
parent_id = version_base_or_id(version)
456457

457458
# Check all the chunks are in place
458-
seen_chunks = collection.find({'symbol': symbol, 'parent': parent_id},
459-
).count()
459+
seen_chunks = mongo_count(collection, filter={'symbol': symbol, 'parent': parent_id})
460460

461461
if seen_chunks != version['segment_count']:
462462
segments = [x['segment'] for x in collection.find({'symbol': symbol, 'parent': parent_id},

arctic/store/_version_store_utils.py

+21-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
from pandas.compat import pickle_compat
1212
from pymongo.errors import OperationFailure
1313

14+
from arctic._util import mongo_count
15+
1416

1517
def _split_arrs(array_2d, slices):
1618
"""
@@ -113,7 +115,7 @@ def analyze_symbol(l, sym, from_ver, to_ver, do_reads=False):
113115
matching = 0
114116
else:
115117
spec = {'symbol': sym, 'parent': v.get('base_version_id', v['_id']), 'segment': {'$lt': v.get('up_to', 0)}}
116-
matching = l._collection.find(spec).count() if not is_deleted else 0
118+
matching = mongo_count(l._collection, filter=spec) if not is_deleted else 0
117119

118120
base_id = v.get('base_version_id')
119121
snaps = ['/'.join((str(x), str(x.generation_time))) for x in v.get('parent')] if v.get('parent') else None
@@ -129,7 +131,21 @@ def analyze_symbol(l, sym, from_ver, to_ver, do_reads=False):
129131
corrupted = not is_deleted and (is_corrupted(l, sym, v) if do_reads else fast_is_corrupted(l, sym, v))
130132

131133
logging.info(
132-
"v{: <6} {: <6} {: <5} ({: <20}): expected={: <6} found={: <6} last_row={: <10} new_rows={: <10} append count={: <10} append_size={: <10} type={: <14} {: <14} base={: <24}/{: <28} snap={: <30}[{:.1f} mins delayed] {: <20} {: <20}".format(
134+
"v{: <6} "
135+
"{: <6} "
136+
"{: <5} "
137+
"({: <20}): "
138+
"expected={: <6} "
139+
"found={: <6} "
140+
"last_row={: <10} "
141+
"new_rows={: <10} "
142+
"append count={: <10} "
143+
"append_size={: <10} "
144+
"type={: <14} {: <14} "
145+
"base={: <24}/{: <28} "
146+
"snap={: <30}[{:.1f} mins delayed] "
147+
"{: <20} "
148+
"{: <20}".format(
133149
n,
134150
prev_v_diff,
135151
'DEL' if is_deleted else 'ALIVE',
@@ -144,7 +160,7 @@ def analyze_symbol(l, sym, from_ver, to_ver, do_reads=False):
144160
'meta-same' if meta_match_with_prev else 'meta-changed',
145161
str(base_id),
146162
str(base_id.generation_time) if base_id else '',
147-
snaps,
163+
str(snaps),
148164
delta_snap_creation,
149165
'PREV_MISSING' if prev_n < n - 1 else '',
150166
'CORRUPTED VERSION' if corrupted else '')
@@ -159,7 +175,7 @@ def analyze_symbol(l, sym, from_ver, to_ver, do_reads=False):
159175
hashlib.sha1(seg['sha']).hexdigest(),
160176
seg.get('segment'),
161177
'compressed' if seg.get('compressed', False) else 'raw',
162-
[str(p) for p in seg.get('parent', [])]
178+
str([str(p) for p in seg.get('parent', [])])
163179
))
164180

165181

@@ -193,7 +209,7 @@ def _fast_check_corruption(collection, sym, v, check_count, check_last_segment,
193209
# collection.find_one(spec, {'segment': 1}, sort=[('segment', pymongo.DESCENDING)])
194210

195211
if check_count:
196-
total_segments = collection.find(spec, {'segment': 1}).count()
212+
total_segments = mongo_count(collection, filter=spec)
197213
# Quick check: compare segment count
198214
if total_segments != v.get('segment_count', 0):
199215
return True # corrupted, don't proceed with fetching from mongo the first hit

arctic/store/bson_store.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from pymongo.errors import OperationFailure
33
from ..decorators import mongo_retry
4-
from .._util import enable_sharding
4+
from .._util import enable_sharding, mongo_count
55

66
logger = logging.getLogger(__name__)
77

@@ -161,7 +161,7 @@ def count(self, filter, **kwargs):
161161
"""
162162
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.count
163163
"""
164-
return self._collection.count(filter, **kwargs)
164+
return mongo_count(self._collection, filter=filter, **kwargs)
165165

166166
@mongo_retry
167167
def aggregate(self, pipeline, **kwargs):

arctic/store/version_store.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import pymongo
77
from pymongo.errors import OperationFailure, AutoReconnect, DuplicateKeyError
88

9-
from .._util import indent, enable_sharding
9+
from .._util import indent, enable_sharding, mongo_count
1010
from ..date import mktz, datetime_to_ms, ms_to_datetime
1111
from ..decorators import mongo_retry
1212
from ..exceptions import NoDataFoundException, DuplicateSnapshotException, \
@@ -41,7 +41,7 @@ class VersionStore(object):
4141
def initialize_library(cls, arctic_lib, hashed=True, **kwargs):
4242
c = arctic_lib.get_top_level_collection()
4343

44-
if '%s.changes' % c.name not in mongo_retry(c.database.collection_names)():
44+
if '%s.changes' % c.name not in mongo_retry(c.database.list_collection_names)():
4545
# 32MB buffer for change notifications
4646
mongo_retry(c.database.create_collection)('%s.changes' % c.name, capped=True, size=32 * 1024 * 1024)
4747

@@ -88,7 +88,7 @@ def _reset(self):
8888
self._snapshots = self._collection.snapshots
8989
self._versions = self._collection.versions
9090
self._version_nums = self._collection.version_nums
91-
self._publish_changes = '%s.changes' % self._collection.name in self._collection.database.collection_names()
91+
self._publish_changes = '%s.changes' % self._collection.name in self._collection.database.list_collection_names()
9292
if self._publish_changes:
9393
self._changes = self._collection.changes
9494

@@ -1034,7 +1034,7 @@ def _cleanup_orphaned_chunks(self, dry_run):
10341034
if len(leaked_versions):
10351035
logger.info("%s leaked %d versions" % (symbol, len(leaked_versions)))
10361036
for x in leaked_versions:
1037-
chunk_count = chunks_coll.find({'symbol': symbol, 'parent': x}).count()
1037+
chunk_count = mongo_count(chunks_coll, filter={'symbol': symbol, 'parent': x})
10381038
logger.info("%s: Missing Version %s (%s) ; %s chunks ref'd" % (symbol,
10391039
x.generation_time,
10401040
x,
@@ -1078,7 +1078,7 @@ def _cleanup_orphaned_versions(self, dry_run):
10781078
if len(leaked_snaps):
10791079
logger.info("leaked %d snapshots" % (len(leaked_snaps)))
10801080
for x in leaked_snaps:
1081-
ver_count = versions_coll.find({'parent': x}).count()
1081+
ver_count = mongo_count(versions_coll, filter={'parent': x})
10821082
logger.info("Missing Snapshot %s (%s) ; %s versions ref'd" % (x.generation_time,
10831083
x,
10841084
ver_count

tests/integration/chunkstore/test_chunkstore.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from datetime import datetime as dt
33
from datetime import timedelta
44
from pandas.util.testing import assert_frame_equal, assert_series_equal
5+
6+
from arctic._util import mongo_count
57
from arctic.date import DateRange
68
from arctic.exceptions import NoDataFoundException
79
import pandas as pd
@@ -943,7 +945,7 @@ def test_delete_range_segment(chunkstore_lib):
943945
chunkstore_lib.delete('test_df', chunk_range=pd.date_range(dt(2016, 1, 1), dt(2016, 1, 1)))
944946
read_df = chunkstore_lib.read('test_df')
945947
assert(read_df.equals(dg))
946-
assert(chunkstore_lib._collection.count({'sy': 'test_df'}) == 1)
948+
assert(mongo_count(chunkstore_lib._collection, {'sy': 'test_df'}) == 1)
947949

948950

949951
def test_size_chunk_update(chunkstore_lib):
@@ -960,7 +962,7 @@ def test_size_chunk_update(chunkstore_lib):
960962
read_df = chunkstore_lib.read('test_df')
961963

962964
assert_frame_equal(dh, read_df)
963-
assert(chunkstore_lib._collection.count({'sy': 'test_df'}) == 1)
965+
assert mongo_count(chunkstore_lib._collection, filter={'sy': 'test_df'}) == 1
964966

965967

966968
def test_size_chunk_multiple_update(chunkstore_lib):
@@ -976,7 +978,7 @@ def test_size_chunk_multiple_update(chunkstore_lib):
976978
expected = pd.concat([df_large, df_small]).reset_index(drop=True)
977979

978980
assert_frame_equal(expected, read_df)
979-
assert(chunkstore_lib._collection.count({'sy': 'test_df'}) == 3)
981+
assert mongo_count(chunkstore_lib._collection, filter={'sy': 'test_df'}) == 3
980982

981983

982984
def test_get_chunk_range(chunkstore_lib):

0 commit comments

Comments
 (0)