diff --git a/asv_bench/benchmarks/join_merge.py b/asv_bench/benchmarks/join_merge.py index dcd07911f2ff0..86d5f84cb9b36 100644 --- a/asv_bench/benchmarks/join_merge.py +++ b/asv_bench/benchmarks/join_merge.py @@ -293,6 +293,43 @@ def time_join_dataframe_integer_key(self): merge(self.df, self.df2, on='key1') +class merge_asof_noby(object): + + def setup(self): + np.random.seed(0) + one_count = 200000 + two_count = 1000000 + self.df1 = pd.DataFrame({'time': np.random.randint(0, one_count/20, one_count), + 'value1': np.random.randn(one_count)}) + self.df2 = pd.DataFrame({'time': np.random.randint(0, two_count/20, two_count), + 'value2': np.random.randn(two_count)}) + self.df1 = self.df1.sort_values('time') + self.df2 = self.df2.sort_values('time') + + def time_merge_asof_noby(self): + merge_asof(self.df1, self.df2, on='time') + + +class merge_asof_by(object): + + def setup(self): + import string + np.random.seed(0) + one_count = 200000 + two_count = 1000000 + self.df1 = pd.DataFrame({'time': np.random.randint(0, one_count/20, one_count), + 'key': np.random.choice(list(string.uppercase), one_count), + 'value1': np.random.randn(one_count)}) + self.df2 = pd.DataFrame({'time': np.random.randint(0, two_count/20, two_count), + 'key': np.random.choice(list(string.uppercase), two_count), + 'value2': np.random.randn(two_count)}) + self.df1 = self.df1.sort_values('time') + self.df2 = self.df2.sort_values('time') + + def time_merge_asof_by(self): + merge_asof(self.df1, self.df2, on='time', by='key') + + class join_non_unique_equal(object): goal_time = 0.2 diff --git a/doc/source/whatsnew/v0.19.0.txt b/doc/source/whatsnew/v0.19.0.txt index cdab02265aa5c..f0c941991bef5 100644 --- a/doc/source/whatsnew/v0.19.0.txt +++ b/doc/source/whatsnew/v0.19.0.txt @@ -47,7 +47,7 @@ The following are now part of this API: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ A long-time requested feature has been added through the :func:`merge_asof` function, to -support asof style joining of time-series. (:issue:`1870`, :issue:`13695`). Full documentation is +support asof style joining of time-series. (:issue:`1870`, :issue:`13695`, :issue:`13709`). Full documentation is :ref:`here ` The :func:`merge_asof` performs an asof merge, which is similar to a left-join diff --git a/pandas/src/join.pyx b/pandas/src/join.pyx index ad3b1d4e4a90e..fbbef8a31071f 100644 --- a/pandas/src/join.pyx +++ b/pandas/src/join.pyx @@ -126,150 +126,56 @@ def left_outer_join(ndarray[int64_t] left, ndarray[int64_t] right, def left_outer_asof_join(ndarray[int64_t] left, ndarray[int64_t] right, - Py_ssize_t max_groups, sort=True, + Py_ssize_t max_groups, # ignored bint allow_exact_matches=1, - left_distance=None, - right_distance=None, + left_values=None, + right_values=None, tolerance=None): cdef: - Py_ssize_t i, j, k, count = 0 - Py_ssize_t loc, left_pos, right_pos, position - Py_ssize_t offset - ndarray[int64_t] left_count, right_count - ndarray left_sorter, right_sorter, rev + Py_ssize_t left_pos, right_pos, left_size, right_size ndarray[int64_t] left_indexer, right_indexer - int64_t lc, rc, tol, left_val, right_val, diff, indexer - ndarray[int64_t] ld, rd - bint has_tol = 0 + bint has_tolerance = 0 + ndarray[int64_t] left_values_, right_values_ + int64_t tolerance_ # if we are using tolerance, set our objects - if left_distance is not None and right_distance is not None and tolerance is not None: - has_tol = 1 - ld = left_distance - rd = right_distance - tol = tolerance + if left_values is not None and right_values is not None and tolerance is not None: + has_tolerance = 1 + left_values_ = left_values + right_values_ = right_values + tolerance_ = tolerance - # NA group in location 0 - left_sorter, left_count = groupsort_indexer(left, max_groups) - right_sorter, right_count = groupsort_indexer(right, max_groups) + left_size = len(left) + right_size = len(right) - # First pass, determine size of result set, do not use the NA group - for i in range(1, max_groups + 1): - if right_count[i] > 0: - count += left_count[i] * right_count[i] - else: - count += left_count[i] + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) - # group 0 is the NA group - left_pos = 0 right_pos = 0 - position = 0 - - # exclude the NA group - left_pos = left_count[0] - right_pos = right_count[0] - - left_indexer = np.empty(count, dtype=np.int64) - right_indexer = np.empty(count, dtype=np.int64) - - for i in range(1, max_groups + 1): - lc = left_count[i] - rc = right_count[i] - - if rc == 0: - for j in range(lc): - indexer = position + j - left_indexer[indexer] = left_pos + j - - # take the most recent value - # if we are not the first - if right_pos: - - if has_tol: - - left_val = ld[left_pos + j] - right_val = rd[right_pos - 1] - diff = left_val - right_val - - # do we allow exact matches - if allow_exact_matches: - if diff > tol: - right_indexer[indexer] = -1 - continue - elif not allow_exact_matches: - if diff >= tol or lc == rc: - right_indexer[indexer] = -1 - continue - - right_indexer[indexer] = right_pos - 1 - else: - right_indexer[indexer] = -1 - position += lc + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and right[right_pos] <= left[left_pos]: + right_pos += 1 else: - for j in range(lc): - offset = position + j * rc - for k in range(rc): - - indexer = offset + k - left_indexer[indexer] = left_pos + j - - if has_tol: - - left_val = ld[left_pos + j] - right_val = rd[right_pos + k] - diff = left_val - right_val - - # do we allow exact matches - if allow_exact_matches: - if diff > tol: - right_indexer[indexer] = -1 - continue - - # we don't allow exact matches - elif not allow_exact_matches: - if diff >= tol or lc == rc: - right_indexer[indexer] = -1 - else: - right_indexer[indexer] = right_pos - 1 - continue - - else: - - # do we allow exact matches - if not allow_exact_matches: - - if right_pos: - right_indexer[indexer] = right_pos - 1 - else: - right_indexer[indexer] = -1 - continue - - right_indexer[indexer] = right_pos + k - position += lc * rc - left_pos += lc - right_pos += rc - - left_indexer = _get_result_indexer(left_sorter, left_indexer) - right_indexer = _get_result_indexer(right_sorter, right_indexer) - - if not sort: # if not asked to sort, revert to original order - if len(left) == len(left_indexer): - # no multiple matches for any row on the left - # this is a short-cut to avoid groupsort_indexer - # otherwise, the `else` path also works in this case - if left_sorter.dtype != np.int_: - left_sorter = left_sorter.astype(np.int_) - - rev = np.empty(len(left), dtype=np.int_) - rev.put(left_sorter, np.arange(len(left))) - else: - rev, _ = groupsort_indexer(left_indexer, len(left)) - - if rev.dtype != np.int_: - rev = rev.astype(np.int_) - right_indexer = right_indexer.take(rev) - left_indexer = left_indexer.take(rev) + while right_pos < right_size and right[right_pos] < left[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 return left_indexer, right_indexer diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index e7d165354ec6c..9f8e27c4d8176 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -258,8 +258,7 @@ def merge_asof(left, right, on=None, by=None, suffixes=('_x', '_y'), tolerance=None, - allow_exact_matches=True, - check_duplicates=True): + allow_exact_matches=True): """Perform an asof merge. This is similar to a left-join except that we match on nearest key rather than equal keys. @@ -304,14 +303,6 @@ def merge_asof(left, right, on=None, - If False, don't match the same 'on' value (i.e., stricly less-than) - check_duplicates : boolean, default True - - - If True, check and remove duplicates for the right - DataFrame, on the [by, on] combination, keeping the last value. - - If False, no check for duplicates. If you *know* that - you don't have duplicates, then turning off the check for duplicates - can be more performant. - Returns ------- merged : DataFrame @@ -436,7 +427,7 @@ def _merger(x, y): if by is not None: result, groupby = _groupby_and_merge(by, on, left, right, lambda x, y: _merger(x, y), - check_duplicates=check_duplicates) + check_duplicates=False) # we want to preserve the original order # we had grouped, so need to reverse this @@ -446,20 +437,12 @@ def _merger(x, y): sorter = _ensure_platform_int( np.concatenate([groupby.indices[g] for g, _ in groupby])) if len(result) != len(sorter): - if check_duplicates: - raise AssertionError("invalid reverse grouping") return result rev = np.empty(len(sorter), dtype=np.int_) rev.put(sorter, np.arange(len(sorter))) return result.take(rev).reset_index(drop=True) - if check_duplicates: - if on is None: - on = [] - elif not isinstance(on, (list, tuple)): - on = [on] - if right.duplicated(on).any(): right = right.drop_duplicates(on, keep='last') @@ -1067,8 +1050,8 @@ def _get_join_indexers(self): lt = lt.view('i8') t = t.value rt = rt.view('i8') - kwargs['left_distance'] = lt - kwargs['right_distance'] = rt + kwargs['left_values'] = lt + kwargs['right_values'] = rt kwargs['tolerance'] = t return _get_join_indexers(self.left_join_keys, diff --git a/pandas/tools/tests/test_merge_asof.py b/pandas/tools/tests/test_merge_asof.py index bcbb0f0fadb49..e0c50cf3baaf7 100644 --- a/pandas/tools/tests/test_merge_asof.py +++ b/pandas/tools/tests/test_merge_asof.py @@ -184,43 +184,8 @@ def test_with_duplicates(self): expected = self.read_data('asof.csv') assert_frame_equal(result, expected) - result = merge_asof(self.trades, q, - on='time', - by='ticker', - check_duplicates=False) - expected = self.read_data('asof.csv') - expected = pd.concat([expected, expected]).sort_values( - ['time', 'ticker']).reset_index(drop=True) - - # the results are not ordered in a meaningful way - # nor are the exact matches duplicated, so comparisons - # are pretty tricky here, however the uniques are the same - - def aligner(x, ticker): - return (x[x.ticker == ticker] - .sort_values(['time', 'ticker', 'quantity', 'price', - 'marketCenter', 'bid', 'ask']) - .drop_duplicates(keep='last') - .reset_index(drop=True) - ) - - for ticker in expected.ticker.unique(): - r = aligner(result, ticker) - e = aligner(expected, ticker) - assert_frame_equal(r, e) - def test_with_duplicates_no_on(self): - df1 = pd.DataFrame({'key': [1, 1, 3], - 'left_val': [1, 2, 3]}) - df2 = pd.DataFrame({'key': [1, 3, 3], - 'right_val': [1, 2, 3]}) - result = merge_asof(df1, df2, on='key', check_duplicates=False) - expected = pd.DataFrame({'key': [1, 1, 3, 3], - 'left_val': [1, 2, 3, 3], - 'right_val': [1, 1, 2, 3]}) - assert_frame_equal(result, expected) - df1 = pd.DataFrame({'key': [1, 1, 3], 'left_val': [1, 2, 3]}) df2 = pd.DataFrame({'key': [1, 2, 2], @@ -379,6 +344,26 @@ def test_allow_exact_matches_and_tolerance2(self): 'version': [np.nan]}) assert_frame_equal(result, expected) + def test_allow_exact_matches_and_tolerance3(self): + # GH 13709 + df1 = pd.DataFrame({ + 'time': pd.to_datetime(['2016-07-15 13:30:00.030', + '2016-07-15 13:30:00.030']), + 'username': ['bob', 'charlie']}) + df2 = pd.DataFrame({ + 'time': pd.to_datetime(['2016-07-15 13:30:00.000', + '2016-07-15 13:30:00.030']), + 'version': [1, 2]}) + + result = pd.merge_asof(df1, df2, on='time', allow_exact_matches=False, + tolerance=pd.Timedelta('10ms')) + expected = pd.DataFrame({ + 'time': pd.to_datetime(['2016-07-15 13:30:00.030', + '2016-07-15 13:30:00.030']), + 'username': ['bob', 'charlie'], + 'version': [np.nan, np.nan]}) + assert_frame_equal(result, expected) + if __name__ == '__main__': nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'],