11
11
from ..date import mktz , datetime_to_ms , ms_to_datetime
12
12
from ..decorators import mongo_retry
13
13
from ..exceptions import NoDataFoundException , DuplicateSnapshotException , \
14
- OptimisticLockException , ArcticException
14
+ OptimisticLockException
15
15
from ..hooks import log_exception
16
16
from ._pickle_store import PickleStore
17
17
from ._version_store_utils import cleanup
@@ -105,7 +105,7 @@ def __str__(self):
105
105
106
106
def __repr__ (self ):
107
107
return str (self )
108
-
108
+
109
109
def _read_preference (self , allow_secondary ):
110
110
""" Return the mongo read preference given an 'allow_secondary' argument
111
111
"""
@@ -136,7 +136,7 @@ def list_symbols(self, all_symbols=False, snapshot=None, regex=None, **kwargs):
136
136
"""
137
137
query = {}
138
138
if regex is not None :
139
- query ['symbol' ] = {'$regex' : regex }
139
+ query ['symbol' ] = {'$regex' : regex }
140
140
if kwargs :
141
141
for k , v in six .iteritems (kwargs ):
142
142
query ['metadata.' + k ] = v
@@ -154,19 +154,19 @@ def list_symbols(self, all_symbols=False, snapshot=None, regex=None, **kwargs):
154
154
# Match based on user criteria first
155
155
pipeline .append ({'$match' : query })
156
156
pipeline .extend ([
157
- # Id is by insert time which matches version order
158
- {'$sort' : {'_id' :- 1 }},
159
- # Group by 'symbol'
160
- {'$group' : {'_id' : '$symbol' ,
161
- 'deleted' : {'$first' : '$metadata.deleted' },
162
- },
163
- },
164
- # Don't include symbols which are part of some snapshot, but really deleted...
165
- {'$match' : {'deleted' : {'$ne' : True }}},
166
- {'$project' : {'_id' : 0 ,
167
- 'symbol' : '$_id' ,
168
- }
169
- }])
157
+ # Id is by insert time which matches version order
158
+ {'$sort' : {'_id' :- 1 }},
159
+ # Group by 'symbol'
160
+ {'$group' : {'_id' : '$symbol' ,
161
+ 'deleted' : {'$first' : '$metadata.deleted' },
162
+ },
163
+ },
164
+ # Don't include symbols which are part of some snapshot, but really deleted...
165
+ {'$match' : {'deleted' : {'$ne' : True }}},
166
+ {'$project' : {'_id' : 0 ,
167
+ 'symbol' : '$_id' ,
168
+ }
169
+ }])
170
170
171
171
results = self ._versions .aggregate (pipeline )
172
172
return sorted ([x ['symbol' ] for x in results ])
@@ -371,8 +371,6 @@ def get_info(self, symbol, as_of=None):
371
371
return handler .get_info (version )
372
372
return {}
373
373
374
-
375
-
376
374
def _do_read (self , symbol , version , from_version = None , ** kwargs ):
377
375
if version .get ('deleted' ):
378
376
raise NoDataFoundException ("No data found for %s in library %s" % (symbol , self ._arctic_lib .get_name ()))
@@ -429,8 +427,8 @@ def _read_metadata(self, symbol, as_of=None, read_preference=None):
429
427
if not as_of .tzinfo :
430
428
as_of = as_of .replace (tzinfo = mktz ())
431
429
_version = versions_coll .find_one ({'symbol' : symbol ,
432
- '_id' : {'$lt' : bson .ObjectId .from_datetime (as_of + timedelta (seconds = 1 ))}},
433
- sort = [('_id' , pymongo .DESCENDING )])
430
+ '_id' : {'$lt' : bson .ObjectId .from_datetime (as_of + timedelta (seconds = 1 ))}},
431
+ sort = [('_id' , pymongo .DESCENDING )])
434
432
else :
435
433
# Backward compatibility - as of is a version number
436
434
_version = versions_coll .find_one ({'symbol' : symbol , 'version' : as_of })
@@ -484,7 +482,7 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser
484
482
# If the version numbers aren't in line, then we've lost some data.
485
483
next_ver = self ._version_nums .find_one ({'symbol' : symbol })['version' ]
486
484
if next_ver != previous_version ['version' ]:
487
- logger .error ('''version_nums is out of sync with previous version document.
485
+ logger .error ('''version_nums is out of sync with previous version document.
488
486
This probably means that either a version document write has previously failed, or the previous version has been deleted.
489
487
There will be a gap in the data.''' )
490
488
@@ -508,8 +506,8 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser
508
506
509
507
# Get the next version number - check there hasn't been a concurrent write
510
508
next_ver = self ._version_nums .find_one_and_update ({'symbol' : symbol , 'version' : next_ver },
511
- {'$inc' : {'version' : 1 }},
512
- upsert = False , new = True )
509
+ {'$inc' : {'version' : 1 }},
510
+ upsert = False , new = True )
513
511
if next_ver is None :
514
512
raise OptimisticLockException ()
515
513
@@ -549,7 +547,7 @@ def write(self, symbol, data, metadata=None, prune_previous_version=True, **kwar
549
547
Default: True
550
548
kwargs :
551
549
passed through to the write handler
552
-
550
+
553
551
Returns
554
552
-------
555
553
VersionedItem named tuple containing the metadata and verison number
@@ -559,13 +557,13 @@ def write(self, symbol, data, metadata=None, prune_previous_version=True, **kwar
559
557
version = {'_id' : bson .ObjectId ()}
560
558
version ['symbol' ] = symbol
561
559
version ['version' ] = self ._version_nums .find_one_and_update ({'symbol' : symbol },
562
- {'$inc' : {'version' : 1 }},
563
- upsert = True , new = True )['version' ]
560
+ {'$inc' : {'version' : 1 }},
561
+ upsert = True , new = True )['version' ]
564
562
version ['metadata' ] = metadata
565
563
566
564
previous_version = self ._versions .find_one ({'symbol' : symbol , 'version' : {'$lt' : version ['version' ]}},
567
- sort = [('version' , pymongo .DESCENDING )],
568
- )
565
+ sort = [('version' , pymongo .DESCENDING )],
566
+ )
569
567
570
568
handler = self ._write_handler (version , symbol , data , ** kwargs )
571
569
mongo_retry (handler .write )(self ._arctic_lib , version , symbol , data , previous_version , ** kwargs )
@@ -590,34 +588,34 @@ def _prune_previous_versions(self, symbol, keep_mins=120):
590
588
# Find all non-snapshotted versions older than a version that's at least keep_mins minutes old
591
589
# Based on documents available on the secondary
592
590
versions_find = mongo_retry (self ._versions .with_options (read_preference = ReadPreference .SECONDARY_PREFERRED if keep_mins > 0 else
593
- ReadPreference .PRIMARY )
591
+ ReadPreference .PRIMARY )
594
592
.find )
595
593
versions = list (versions_find ({ # Find versions of this symbol
596
- 'symbol' : symbol ,
597
- # Not snapshotted
598
- '$or' : [{'parent' : {'$exists' : False }}, {'parent' : {'$size' : 0 }}],
599
- # At least 'keep_mins' old
600
- '_id' : {'$lt' : bson .ObjectId .from_datetime (
594
+ 'symbol' : symbol ,
595
+ # Not snapshotted
596
+ '$or' : [{'parent' : {'$exists' : False }}, {'parent' : {'$size' : 0 }}],
597
+ # At least 'keep_mins' old
598
+ '_id' : {'$lt' : bson .ObjectId .from_datetime (
601
599
dt .utcnow ()
602
- # Add one second as the ObjectId str has random fuzz
600
+ # Add one second as the ObjectId str has random fuzz
603
601
+ timedelta (seconds = 1 )
604
602
- timedelta (minutes = keep_mins ))
605
- }
606
- },
607
- # Using version number here instead of _id as there's a very unlikely case
608
- # where the versions are created on different hosts or processes at exactly
609
- # the same time.
610
- sort = [('version' , pymongo .DESCENDING )],
611
- # Keep one, that's at least 10 mins old, around
612
- # (cope with replication delay)
613
- skip = 1 ,
614
- projection = ['_id' , 'type' ],
615
- ))
603
+ }
604
+ },
605
+ # Using version number here instead of _id as there's a very unlikely case
606
+ # where the versions are created on different hosts or processes at exactly
607
+ # the same time.
608
+ sort = [('version' , pymongo .DESCENDING )],
609
+ # Keep one, that's at least 10 mins old, around
610
+ # (cope with replication delay)
611
+ skip = 1 ,
612
+ projection = ['_id' , 'type' ],
613
+ ))
616
614
if not versions :
617
615
return
618
616
version_ids = [v ['_id' ] for v in versions ]
619
617
620
- #Find any version_ids that are the basis of other, 'current' versions - don't prune these.
618
+ # Find any version_ids that are the basis of other, 'current' versions - don't prune these.
621
619
base_versions = set ([x ['base_version_id' ] for x in mongo_retry (self ._versions .find )({
622
620
'symbol' : symbol ,
623
621
'_id' : {'$nin' : version_ids },
@@ -728,8 +726,8 @@ def snapshot(self, snap_name, metadata=None, skip_symbols=None, versions=None):
728
726
snapshot = {'_id' : bson .ObjectId ()}
729
727
snapshot ['name' ] = snap_name
730
728
snapshot ['metadata' ] = metadata
731
-
732
- skip_symbols = set () if skip_symbols is None else set (skip_symbols )
729
+
730
+ skip_symbols = set () if skip_symbols is None else set (skip_symbols )
733
731
734
732
if versions is None :
735
733
versions = {sym : None for sym in set (self .list_symbols ()) - skip_symbols }
0 commit comments