Skip to content

BUG: Fix edge cases in merge_asof() by comparing factorized keys (#13709) #13836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions asv_bench/benchmarks/join_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,43 @@ def time_join_dataframe_integer_key(self):
merge(self.df, self.df2, on='key1')


class merge_asof_noby(object):

def setup(self):
np.random.seed(0)
one_count = 200000
two_count = 1000000
self.df1 = pd.DataFrame({'time': np.random.randint(0, one_count/20, one_count),
'value1': np.random.randn(one_count)})
self.df2 = pd.DataFrame({'time': np.random.randint(0, two_count/20, two_count),
'value2': np.random.randn(two_count)})
self.df1 = self.df1.sort_values('time')
self.df2 = self.df2.sort_values('time')

def time_merge_asof_noby(self):
merge_asof(self.df1, self.df2, on='time')


class merge_asof_by(object):

def setup(self):
import string
np.random.seed(0)
one_count = 200000
two_count = 1000000
self.df1 = pd.DataFrame({'time': np.random.randint(0, one_count/20, one_count),
'key': np.random.choice(list(string.uppercase), one_count),
'value1': np.random.randn(one_count)})
self.df2 = pd.DataFrame({'time': np.random.randint(0, two_count/20, two_count),
'key': np.random.choice(list(string.uppercase), two_count),
'value2': np.random.randn(two_count)})
self.df1 = self.df1.sort_values('time')
self.df2 = self.df2.sort_values('time')

def time_merge_asof_by(self):
merge_asof(self.df1, self.df2, on='time', by='key')


class join_non_unique_equal(object):
goal_time = 0.2

Expand Down
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v0.19.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ The following are now part of this API:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

A long-time requested feature has been added through the :func:`merge_asof` function, to
support asof style joining of time-series. (:issue:`1870`, :issue:`13695`). Full documentation is
support asof style joining of time-series. (:issue:`1870`, :issue:`13695`, :issue:`13709`). Full documentation is
:ref:`here <merging.merge_asof>`

The :func:`merge_asof` performs an asof merge, which is similar to a left-join
Expand Down
170 changes: 38 additions & 132 deletions pandas/src/join.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -126,150 +126,56 @@ def left_outer_join(ndarray[int64_t] left, ndarray[int64_t] right,


def left_outer_asof_join(ndarray[int64_t] left, ndarray[int64_t] right,
Py_ssize_t max_groups, sort=True,
Py_ssize_t max_groups, # ignored
bint allow_exact_matches=1,
left_distance=None,
right_distance=None,
left_values=None,
right_values=None,
tolerance=None):

cdef:
Py_ssize_t i, j, k, count = 0
Py_ssize_t loc, left_pos, right_pos, position
Py_ssize_t offset
ndarray[int64_t] left_count, right_count
ndarray left_sorter, right_sorter, rev
Py_ssize_t left_pos, right_pos, left_size, right_size
ndarray[int64_t] left_indexer, right_indexer
int64_t lc, rc, tol, left_val, right_val, diff, indexer
ndarray[int64_t] ld, rd
bint has_tol = 0
bint has_tolerance = 0
ndarray[int64_t] left_values_, right_values_
int64_t tolerance_

# if we are using tolerance, set our objects
if left_distance is not None and right_distance is not None and tolerance is not None:
has_tol = 1
ld = left_distance
rd = right_distance
tol = tolerance
if left_values is not None and right_values is not None and tolerance is not None:
has_tolerance = 1
left_values_ = left_values
right_values_ = right_values
tolerance_ = tolerance

# NA group in location 0
left_sorter, left_count = groupsort_indexer(left, max_groups)
right_sorter, right_count = groupsort_indexer(right, max_groups)
left_size = len(left)
right_size = len(right)

# First pass, determine size of result set, do not use the NA group
for i in range(1, max_groups + 1):
if right_count[i] > 0:
count += left_count[i] * right_count[i]
else:
count += left_count[i]
left_indexer = np.empty(left_size, dtype=np.int64)
right_indexer = np.empty(left_size, dtype=np.int64)

# group 0 is the NA group
left_pos = 0
right_pos = 0
position = 0

# exclude the NA group
left_pos = left_count[0]
right_pos = right_count[0]

left_indexer = np.empty(count, dtype=np.int64)
right_indexer = np.empty(count, dtype=np.int64)

for i in range(1, max_groups + 1):
lc = left_count[i]
rc = right_count[i]

if rc == 0:
for j in range(lc):
indexer = position + j
left_indexer[indexer] = left_pos + j

# take the most recent value
# if we are not the first
if right_pos:

if has_tol:

left_val = ld[left_pos + j]
right_val = rd[right_pos - 1]
diff = left_val - right_val

# do we allow exact matches
if allow_exact_matches:
if diff > tol:
right_indexer[indexer] = -1
continue
elif not allow_exact_matches:
if diff >= tol or lc == rc:
right_indexer[indexer] = -1
continue

right_indexer[indexer] = right_pos - 1
else:
right_indexer[indexer] = -1
position += lc
for left_pos in range(left_size):
# restart right_pos if it went negative in a previous iteration
if right_pos < 0:
right_pos = 0

# find last position in right whose value is less than left's value
if allow_exact_matches:
while right_pos < right_size and right[right_pos] <= left[left_pos]:
right_pos += 1
else:
for j in range(lc):
offset = position + j * rc
for k in range(rc):

indexer = offset + k
left_indexer[indexer] = left_pos + j

if has_tol:

left_val = ld[left_pos + j]
right_val = rd[right_pos + k]
diff = left_val - right_val

# do we allow exact matches
if allow_exact_matches:
if diff > tol:
right_indexer[indexer] = -1
continue

# we don't allow exact matches
elif not allow_exact_matches:
if diff >= tol or lc == rc:
right_indexer[indexer] = -1
else:
right_indexer[indexer] = right_pos - 1
continue

else:

# do we allow exact matches
if not allow_exact_matches:

if right_pos:
right_indexer[indexer] = right_pos - 1
else:
right_indexer[indexer] = -1
continue

right_indexer[indexer] = right_pos + k
position += lc * rc
left_pos += lc
right_pos += rc

left_indexer = _get_result_indexer(left_sorter, left_indexer)
right_indexer = _get_result_indexer(right_sorter, right_indexer)

if not sort: # if not asked to sort, revert to original order
if len(left) == len(left_indexer):
# no multiple matches for any row on the left
# this is a short-cut to avoid groupsort_indexer
# otherwise, the `else` path also works in this case
if left_sorter.dtype != np.int_:
left_sorter = left_sorter.astype(np.int_)

rev = np.empty(len(left), dtype=np.int_)
rev.put(left_sorter, np.arange(len(left)))
else:
rev, _ = groupsort_indexer(left_indexer, len(left))

if rev.dtype != np.int_:
rev = rev.astype(np.int_)
right_indexer = right_indexer.take(rev)
left_indexer = left_indexer.take(rev)
while right_pos < right_size and right[right_pos] < left[left_pos]:
right_pos += 1
right_pos -= 1

# save positions as the desired index
left_indexer[left_pos] = left_pos
right_indexer[left_pos] = right_pos

# if needed, verify that tolerance is met
if has_tolerance and right_pos != -1:
diff = left_values[left_pos] - right_values[right_pos]
if diff > tolerance_:
right_indexer[left_pos] = -1

return left_indexer, right_indexer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really simplified!


Expand Down
25 changes: 4 additions & 21 deletions pandas/tools/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,7 @@ def merge_asof(left, right, on=None,
by=None,
suffixes=('_x', '_y'),
tolerance=None,
allow_exact_matches=True,
check_duplicates=True):
allow_exact_matches=True):
"""Perform an asof merge. This is similar to a left-join except that we
match on nearest key rather than equal keys.
Expand Down Expand Up @@ -304,14 +303,6 @@ def merge_asof(left, right, on=None,
- If False, don't match the same 'on' value
(i.e., stricly less-than)
check_duplicates : boolean, default True
- If True, check and remove duplicates for the right
DataFrame, on the [by, on] combination, keeping the last value.
- If False, no check for duplicates. If you *know* that
you don't have duplicates, then turning off the check for duplicates
can be more performant.
Returns
-------
merged : DataFrame
Expand Down Expand Up @@ -436,7 +427,7 @@ def _merger(x, y):
if by is not None:
result, groupby = _groupby_and_merge(by, on, left, right,
lambda x, y: _merger(x, y),
check_duplicates=check_duplicates)
check_duplicates=False)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that check_duplicates is always False now, since it is no longer needed.


# we want to preserve the original order
# we had grouped, so need to reverse this
Expand All @@ -446,20 +437,12 @@ def _merger(x, y):
sorter = _ensure_platform_int(
np.concatenate([groupby.indices[g] for g, _ in groupby]))
if len(result) != len(sorter):
if check_duplicates:
raise AssertionError("invalid reverse grouping")
return result

rev = np.empty(len(sorter), dtype=np.int_)
rev.put(sorter, np.arange(len(sorter)))
return result.take(rev).reset_index(drop=True)

if check_duplicates:
if on is None:
on = []
elif not isinstance(on, (list, tuple)):
on = [on]

if right.duplicated(on).any():
right = right.drop_duplicates(on, keep='last')

Expand Down Expand Up @@ -1067,8 +1050,8 @@ def _get_join_indexers(self):
lt = lt.view('i8')
t = t.value
rt = rt.view('i8')
kwargs['left_distance'] = lt
kwargs['right_distance'] = rt
kwargs['left_values'] = lt
kwargs['right_values'] = rt
kwargs['tolerance'] = t

return _get_join_indexers(self.left_join_keys,
Expand Down
55 changes: 20 additions & 35 deletions pandas/tools/tests/test_merge_asof.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,43 +184,8 @@ def test_with_duplicates(self):
expected = self.read_data('asof.csv')
assert_frame_equal(result, expected)

result = merge_asof(self.trades, q,
on='time',
by='ticker',
check_duplicates=False)
expected = self.read_data('asof.csv')
expected = pd.concat([expected, expected]).sort_values(
['time', 'ticker']).reset_index(drop=True)

# the results are not ordered in a meaningful way
# nor are the exact matches duplicated, so comparisons
# are pretty tricky here, however the uniques are the same

def aligner(x, ticker):
return (x[x.ticker == ticker]
.sort_values(['time', 'ticker', 'quantity', 'price',
'marketCenter', 'bid', 'ask'])
.drop_duplicates(keep='last')
.reset_index(drop=True)
)

for ticker in expected.ticker.unique():
r = aligner(result, ticker)
e = aligner(expected, ticker)
assert_frame_equal(r, e)

def test_with_duplicates_no_on(self):

df1 = pd.DataFrame({'key': [1, 1, 3],
'left_val': [1, 2, 3]})
df2 = pd.DataFrame({'key': [1, 3, 3],
'right_val': [1, 2, 3]})
result = merge_asof(df1, df2, on='key', check_duplicates=False)
expected = pd.DataFrame({'key': [1, 1, 3, 3],
'left_val': [1, 2, 3, 3],
'right_val': [1, 1, 2, 3]})
assert_frame_equal(result, expected)

df1 = pd.DataFrame({'key': [1, 1, 3],
'left_val': [1, 2, 3]})
df2 = pd.DataFrame({'key': [1, 2, 2],
Expand Down Expand Up @@ -379,6 +344,26 @@ def test_allow_exact_matches_and_tolerance2(self):
'version': [np.nan]})
assert_frame_equal(result, expected)

def test_allow_exact_matches_and_tolerance3(self):
# GH 13709
df1 = pd.DataFrame({
'time': pd.to_datetime(['2016-07-15 13:30:00.030',
'2016-07-15 13:30:00.030']),
'username': ['bob', 'charlie']})
df2 = pd.DataFrame({
'time': pd.to_datetime(['2016-07-15 13:30:00.000',
'2016-07-15 13:30:00.030']),
'version': [1, 2]})

result = pd.merge_asof(df1, df2, on='time', allow_exact_matches=False,
tolerance=pd.Timedelta('10ms'))
expected = pd.DataFrame({
'time': pd.to_datetime(['2016-07-15 13:30:00.030',
'2016-07-15 13:30:00.030']),
'username': ['bob', 'charlie'],
'version': [np.nan, np.nan]})
assert_frame_equal(result, expected)


if __name__ == '__main__':
nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb', '--pdb-failure'],
Expand Down