diff --git a/asv_bench/benchmarks/join_merge.py b/asv_bench/benchmarks/join_merge.py index 9eefe80c8e5e4..d9c631fa92efd 100644 --- a/asv_bench/benchmarks/join_merge.py +++ b/asv_bench/benchmarks/join_merge.py @@ -302,12 +302,19 @@ def setup(self): self.df1 = self.df1.sort_values('time') self.df2 = self.df2.sort_values('time') + self.df1['time32'] = np.int32(self.df1.time) + self.df2['time32'] = np.int32(self.df2.time) + self.df1a = self.df1[['time', 'value1']] self.df2a = self.df2[['time', 'value2']] self.df1b = self.df1[['time', 'key', 'value1']] self.df2b = self.df2[['time', 'key', 'value2']] self.df1c = self.df1[['time', 'key2', 'value1']] self.df2c = self.df2[['time', 'key2', 'value2']] + self.df1d = self.df1[['time32', 'value1']] + self.df2d = self.df2[['time32', 'value2']] + self.df1e = self.df1[['time', 'key', 'key2', 'value1']] + self.df2e = self.df2[['time', 'key', 'key2', 'value2']] def time_noby(self): merge_asof(self.df1a, self.df2a, on='time') @@ -318,6 +325,12 @@ def time_by_object(self): def time_by_int(self): merge_asof(self.df1c, self.df2c, on='time', by='key2') + def time_on_int32(self): + merge_asof(self.df1d, self.df2d, on='time32') + + def time_multiby(self): + merge_asof(self.df1e, self.df2e, on='time', by=['key', 'key2']) + #---------------------------------------------------------------------- # data alignment diff --git a/doc/source/whatsnew/v0.19.2.txt b/doc/source/whatsnew/v0.19.2.txt index dabc6036fc9ba..d16697a00a689 100644 --- a/doc/source/whatsnew/v0.19.2.txt +++ b/doc/source/whatsnew/v0.19.2.txt @@ -29,6 +29,7 @@ Other enhancements ^^^^^^^^^^^^^^^^^^ - ``pd.merge_asof()`` gained ``left_index``/``right_index`` and ``left_by``/``right_by`` arguments (:issue:`14253`) +- ``pd.merge_asof()`` can take multiple columns in ``by`` parameter and has specialized dtypes for better performace (:issue:`13936`) diff --git a/pandas/src/joins_func_helper.pxi.in b/pandas/src/joins_func_helper.pxi.in index 06c35cfb69e53..33926a23f7f41 100644 --- a/pandas/src/joins_func_helper.pxi.in +++ b/pandas/src/joins_func_helper.pxi.in @@ -1,3 +1,4 @@ +# cython: boundscheck=False, wraparound=False """ Template for each `dtype` helper function for hashtable @@ -14,7 +15,9 @@ WARNING: DO NOT edit .pxi FILE directly, .pxi is generated from .pxi.in by_dtypes = [('PyObjectHashTable', 'object'), ('Int64HashTable', 'int64_t')] # on_dtype -on_dtypes = ['int64_t', 'double'] +on_dtypes = ['uint8_t', 'uint16_t', 'uint32_t', 'uint64_t', + 'int8_t', 'int16_t', 'int32_t', 'int64_t', + 'float', 'double'] }} @@ -98,7 +101,9 @@ def asof_join_{{on_dtype}}_by_{{by_dtype}}(ndarray[{{on_dtype}}] left_values, {{py: # on_dtype -dtypes = ['int64_t', 'double'] +dtypes = ['uint8_t', 'uint16_t', 'uint32_t', 'uint64_t', + 'int8_t', 'int16_t', 'int32_t', 'int64_t', + 'float', 'double'] }} diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index 198991531e0a7..efae7c63a9d0e 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -5,6 +5,8 @@ import copy import warnings +import string + import numpy as np from pandas.compat import range, lrange, lzip, zip, map, filter import pandas.compat as compat @@ -28,7 +30,8 @@ is_list_like, _ensure_int64, _ensure_float64, - _ensure_object) + _ensure_object, + _get_dtype) from pandas.types.missing import na_value_for_dtype from pandas.core.generic import NDFrame @@ -271,8 +274,8 @@ def merge_asof(left, right, on=None, DataFrame whose 'on' key is less than or equal to the left's key. Both DataFrames must be sorted by the key. - Optionally perform group-wise merge. This searches for the nearest match - on the 'on' key within the same group according to 'by'. + Optionally match on equivalent keys with 'by' before searching for nearest + match with 'on'. .. versionadded:: 0.19.0 @@ -299,16 +302,15 @@ def merge_asof(left, right, on=None, .. versionadded:: 0.19.2 - by : column name - Group both the left and right DataFrames by the group column; perform - the merge operation on these pieces and recombine. + by : column name or list of column names + Match on these columns before performing merge operation. left_by : column name - Field name to group by in the left DataFrame. + Field names to match on in the left DataFrame. .. versionadded:: 0.19.2 right_by : column name - Field name to group by in the right DataFrame. + Field names to match on in the right DataFrame. .. versionadded:: 0.19.2 @@ -997,17 +999,13 @@ def get_result(self): return result -_asof_functions = { - 'int64_t': _join.asof_join_int64_t, - 'double': _join.asof_join_double, -} +def _asof_function(on_type): + return getattr(_join, 'asof_join_%s' % on_type, None) + + +def _asof_by_function(on_type, by_type): + return getattr(_join, 'asof_join_%s_by_%s' % (on_type, by_type), None) -_asof_by_functions = { - ('int64_t', 'int64_t'): _join.asof_join_int64_t_by_int64_t, - ('double', 'int64_t'): _join.asof_join_double_by_int64_t, - ('int64_t', 'object'): _join.asof_join_int64_t_by_object, - ('double', 'object'): _join.asof_join_double_by_object, -} _type_casters = { 'int64_t': _ensure_int64, @@ -1015,9 +1013,32 @@ def get_result(self): 'object': _ensure_object, } +_cython_types = { + 'uint8': 'uint8_t', + 'uint32': 'uint32_t', + 'uint16': 'uint16_t', + 'uint64': 'uint64_t', + 'int8': 'int8_t', + 'int32': 'int32_t', + 'int16': 'int16_t', + 'int64': 'int64_t', + 'float16': 'error', + 'float32': 'float', + 'float64': 'double', +} + def _get_cython_type(dtype): - """ Given a dtype, return 'int64_t', 'double', or 'object' """ + """ Given a dtype, return a C name like 'int64_t' or 'double' """ + type_name = _get_dtype(dtype).name + ctype = _cython_types.get(type_name, 'object') + if ctype == 'error': + raise MergeError('unsupported type: ' + type_name) + return ctype + + +def _get_cython_type_upcast(dtype): + """ Upcast a dtype to 'int64_t', 'double', or 'object' """ if is_integer_dtype(dtype): return 'int64_t' elif is_float_dtype(dtype): @@ -1084,11 +1105,6 @@ def _validate_specification(self): if not is_list_like(self.right_by): self.right_by = [self.right_by] - if len(self.left_by) != 1: - raise MergeError("can only asof by a single key") - if len(self.right_by) != 1: - raise MergeError("can only asof by a single key") - self.left_on = self.left_by + list(self.left_on) self.right_on = self.right_by + list(self.right_on) @@ -1142,6 +1158,13 @@ def _get_merge_keys(self): def _get_join_indexers(self): """ return the join indexers """ + def flip(xs): + """ unlike np.transpose, this returns an array of tuples """ + labels = list(string.ascii_lowercase[:len(xs)]) + dtypes = [x.dtype for x in xs] + labeled_dtypes = list(zip(labels, dtypes)) + return np.array(lzip(*xs), labeled_dtypes) + # values to compare left_values = (self.left.index.values if self.left_index else self.left_join_keys[-1]) @@ -1165,22 +1188,23 @@ def _get_join_indexers(self): # a "by" parameter requires special handling if self.left_by is not None: - left_by_values = self.left_join_keys[0] - right_by_values = self.right_join_keys[0] - - # choose appropriate function by type - on_type = _get_cython_type(left_values.dtype) - by_type = _get_cython_type(left_by_values.dtype) + if len(self.left_join_keys) > 2: + # get tuple representation of values if more than one + left_by_values = flip(self.left_join_keys[0:-1]) + right_by_values = flip(self.right_join_keys[0:-1]) + else: + left_by_values = self.left_join_keys[0] + right_by_values = self.right_join_keys[0] - on_type_caster = _type_casters[on_type] + # upcast 'by' parameter because HashTable is limited + by_type = _get_cython_type_upcast(left_by_values.dtype) by_type_caster = _type_casters[by_type] - func = _asof_by_functions[(on_type, by_type)] - - left_values = on_type_caster(left_values) - right_values = on_type_caster(right_values) left_by_values = by_type_caster(left_by_values) right_by_values = by_type_caster(right_by_values) + # choose appropriate function by type + on_type = _get_cython_type(left_values.dtype) + func = _asof_by_function(on_type, by_type) return func(left_values, right_values, left_by_values, @@ -1190,12 +1214,7 @@ def _get_join_indexers(self): else: # choose appropriate function by type on_type = _get_cython_type(left_values.dtype) - type_caster = _type_casters[on_type] - func = _asof_functions[on_type] - - left_values = type_caster(left_values) - right_values = type_caster(right_values) - + func = _asof_function(on_type) return func(left_values, right_values, self.allow_exact_matches, diff --git a/pandas/tools/tests/test_merge_asof.py b/pandas/tools/tests/test_merge_asof.py index d33ba30d7f032..f3e8bc913ad67 100644 --- a/pandas/tools/tests/test_merge_asof.py +++ b/pandas/tools/tests/test_merge_asof.py @@ -221,6 +221,117 @@ def test_missing_right_by(self): expected.loc[expected.ticker == 'MSFT', ['bid', 'ask']] = np.nan assert_frame_equal(result, expected) + def test_multiby(self): + # GH13936 + trades = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.046', + '20160525 13:30:00.048', + '20160525 13:30:00.050']), + 'ticker': ['MSFT', 'MSFT', + 'GOOG', 'GOOG', 'AAPL'], + 'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'], + 'price': [51.95, 51.95, + 720.77, 720.92, 98.00], + 'quantity': [75, 155, + 100, 100, 100]}, + columns=['time', 'ticker', 'exch', + 'price', 'quantity']) + + quotes = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.030', + '20160525 13:30:00.041', + '20160525 13:30:00.045', + '20160525 13:30:00.049']), + 'ticker': ['GOOG', 'MSFT', 'MSFT', + 'MSFT', 'GOOG', 'AAPL'], + 'exch': ['BATS', 'NSDQ', 'ARCA', 'ARCA', + 'NSDQ', 'ARCA'], + 'bid': [720.51, 51.95, 51.97, 51.99, + 720.50, 97.99], + 'ask': [720.92, 51.96, 51.98, 52.00, + 720.93, 98.01]}, + columns=['time', 'ticker', 'exch', 'bid', 'ask']) + + expected = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.046', + '20160525 13:30:00.048', + '20160525 13:30:00.050']), + 'ticker': ['MSFT', 'MSFT', + 'GOOG', 'GOOG', 'AAPL'], + 'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'], + 'price': [51.95, 51.95, + 720.77, 720.92, 98.00], + 'quantity': [75, 155, + 100, 100, 100], + 'bid': [np.nan, 51.95, 720.50, 720.51, np.nan], + 'ask': [np.nan, 51.96, 720.93, 720.92, np.nan]}, + columns=['time', 'ticker', 'exch', + 'price', 'quantity', 'bid', 'ask']) + + result = pd.merge_asof(trades, quotes, on='time', + by=['ticker', 'exch']) + assert_frame_equal(result, expected) + + def test_multiby_heterogeneous_types(self): + # GH13936 + trades = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.046', + '20160525 13:30:00.048', + '20160525 13:30:00.050']), + 'ticker': [0, 0, 1, 1, 2], + 'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'], + 'price': [51.95, 51.95, + 720.77, 720.92, 98.00], + 'quantity': [75, 155, + 100, 100, 100]}, + columns=['time', 'ticker', 'exch', + 'price', 'quantity']) + + quotes = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.030', + '20160525 13:30:00.041', + '20160525 13:30:00.045', + '20160525 13:30:00.049']), + 'ticker': [1, 0, 0, 0, 1, 2], + 'exch': ['BATS', 'NSDQ', 'ARCA', 'ARCA', + 'NSDQ', 'ARCA'], + 'bid': [720.51, 51.95, 51.97, 51.99, + 720.50, 97.99], + 'ask': [720.92, 51.96, 51.98, 52.00, + 720.93, 98.01]}, + columns=['time', 'ticker', 'exch', 'bid', 'ask']) + + expected = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.046', + '20160525 13:30:00.048', + '20160525 13:30:00.050']), + 'ticker': [0, 0, 1, 1, 2], + 'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'], + 'price': [51.95, 51.95, + 720.77, 720.92, 98.00], + 'quantity': [75, 155, + 100, 100, 100], + 'bid': [np.nan, 51.95, 720.50, 720.51, np.nan], + 'ask': [np.nan, 51.96, 720.93, 720.92, np.nan]}, + columns=['time', 'ticker', 'exch', + 'price', 'quantity', 'bid', 'ask']) + + result = pd.merge_asof(trades, quotes, on='time', + by=['ticker', 'exch']) + assert_frame_equal(result, expected) + def test_basic2(self): expected = self.read_data('asof2.csv') @@ -542,6 +653,77 @@ def test_on_float(self): assert_frame_equal(result, expected) + def test_on_specialized_type(self): + # GH13936 + for dtype in [np.uint8, np.uint16, np.uint32, np.uint64, + np.int8, np.int16, np.int32, np.int64, + np.float16, np.float32, np.float64]: + df1 = pd.DataFrame({ + 'value': [5, 2, 25, 100, 78, 120, 79], + 'symbol': list("ABCDEFG")}, + columns=['symbol', 'value']) + df1.value = dtype(df1.value) + + df2 = pd.DataFrame({ + 'value': [0, 80, 120, 125], + 'result': list('xyzw')}, + columns=['value', 'result']) + df2.value = dtype(df2.value) + + df1 = df1.sort_values('value').reset_index(drop=True) + + if dtype == np.float16: + with self.assertRaises(MergeError): + pd.merge_asof(df1, df2, on='value') + else: + result = pd.merge_asof(df1, df2, on='value') + + expected = pd.DataFrame({ + 'symbol': list("BACEGDF"), + 'value': [2, 5, 25, 78, 79, 100, 120], + 'result': list('xxxxxyz')}, + columns=['symbol', 'value', 'result']) + expected.value = dtype(expected.value) + + assert_frame_equal(result, expected) + + def test_on_specialized_type_by_int(self): + # GH13936 + for dtype in [np.uint8, np.uint16, np.uint32, np.uint64, + np.int8, np.int16, np.int32, np.int64, + np.float16, np.float32, np.float64]: + df1 = pd.DataFrame({ + 'value': [5, 2, 25, 100, 78, 120, 79], + 'key': [1, 2, 3, 2, 3, 1, 2], + 'symbol': list("ABCDEFG")}, + columns=['symbol', 'key', 'value']) + df1.value = dtype(df1.value) + + df2 = pd.DataFrame({ + 'value': [0, 80, 120, 125], + 'key': [1, 2, 2, 3], + 'result': list('xyzw')}, + columns=['value', 'key', 'result']) + df2.value = dtype(df2.value) + + df1 = df1.sort_values('value').reset_index(drop=True) + + if dtype == np.float16: + with self.assertRaises(MergeError): + pd.merge_asof(df1, df2, on='value', by='key') + else: + result = pd.merge_asof(df1, df2, on='value', by='key') + + expected = pd.DataFrame({ + 'symbol': list("BACEGDF"), + 'key': [2, 1, 3, 3, 2, 2, 1], + 'value': [2, 5, 25, 78, 79, 100, 120], + 'result': [np.nan, 'x', np.nan, np.nan, np.nan, 'y', 'x']}, + columns=['symbol', 'key', 'value', 'result']) + expected.value = dtype(expected.value) + + assert_frame_equal(result, expected) + def test_on_float_by_int(self): # type specialize both "by" and "on" parameters df1 = pd.DataFrame({