From 75157fcbc03ed95c99d2bd1e0326e2d27660156e Mon Sep 17 00:00:00 2001 From: "Christopher C. Aycock" Date: Thu, 1 Dec 2016 13:52:16 -0500 Subject: [PATCH 1/8] merge_asof() has type specializations and can take multiple 'by' parameters (#13936) --- asv_bench/benchmarks/join_merge.py | 45 +- pandas/src/joins_func_helper.pxi | 1518 ++++++++++++++++++++++++- pandas/src/joins_func_helper.pxi.in | 46 +- pandas/tools/merge.py | 80 +- pandas/tools/tests/test_merge_asof.py | 174 +++ 5 files changed, 1783 insertions(+), 80 deletions(-) diff --git a/asv_bench/benchmarks/join_merge.py b/asv_bench/benchmarks/join_merge.py index c98179c8950c5..f022f1f619889 100644 --- a/asv_bench/benchmarks/join_merge.py +++ b/asv_bench/benchmarks/join_merge.py @@ -310,6 +310,25 @@ def time_merge_asof_noby(self): merge_asof(self.df1, self.df2, on='time') +class merge_asof_int32_noby(object): + + def setup(self): + np.random.seed(0) + one_count = 200000 + two_count = 1000000 + self.df1 = pd.DataFrame({'time': np.random.randint(0, one_count/20, one_count), + 'value1': np.random.randn(one_count)}) + self.df1.time = np.int32(self.df1.time) + self.df2 = pd.DataFrame({'time': np.random.randint(0, two_count/20, two_count), + 'value2': np.random.randn(two_count)}) + self.df2.time = np.int32(self.df2.time) + self.df1 = self.df1.sort_values('time') + self.df2 = self.df2.sort_values('time') + + def time_merge_asof_int32_noby(self): + merge_asof(self.df1, self.df2, on='time') + + class merge_asof_by_object(object): def setup(self): @@ -318,10 +337,10 @@ def setup(self): one_count = 200000 two_count = 1000000 self.df1 = pd.DataFrame({'time': np.random.randint(0, one_count/20, one_count), - 'key': np.random.choice(list(string.uppercase), one_count), + 'key': np.random.choice(list(string.ascii_uppercase), one_count), 'value1': np.random.randn(one_count)}) self.df2 = pd.DataFrame({'time': np.random.randint(0, two_count/20, two_count), - 'key': np.random.choice(list(string.uppercase), two_count), + 'key': np.random.choice(list(string.ascii_uppercase), two_count), 'value2': np.random.randn(two_count)}) self.df1 = self.df1.sort_values('time') self.df2 = self.df2.sort_values('time') @@ -349,6 +368,28 @@ def time_merge_asof_by_int(self): merge_asof(self.df1, self.df2, on='time', by='key') +class merge_asof_multiby(object): + + def setup(self): + import string + np.random.seed(0) + one_count = 200000 + two_count = 1000000 + self.df1 = pd.DataFrame({'time': np.random.randint(0, one_count/20, one_count), + 'key1': np.random.choice(list(string.ascii_uppercase), one_count), + 'key2': np.random.choice(list(string.ascii_uppercase), one_count), + 'value1': np.random.randn(one_count)}) + self.df2 = pd.DataFrame({'time': np.random.randint(0, two_count/20, two_count), + 'key1': np.random.choice(list(string.ascii_uppercase), two_count), + 'key2': np.random.choice(list(string.ascii_uppercase), two_count), + 'value2': np.random.randn(two_count)}) + self.df1 = self.df1.sort_values('time') + self.df2 = self.df2.sort_values('time') + + def time_merge_asof_multiby(self): + merge_asof(self.df1, self.df2, on='time', by=['key1', 'key2']) + + class join_non_unique_equal(object): goal_time = 0.2 diff --git a/pandas/src/joins_func_helper.pxi b/pandas/src/joins_func_helper.pxi index 7a59da37c5ced..0a6f4c580c2ae 100644 --- a/pandas/src/joins_func_helper.pxi +++ b/pandas/src/joins_func_helper.pxi @@ -1,3 +1,4 @@ +# cython: boundscheck=False, wraparound=False """ Template for each `dtype` helper function for hashtable @@ -12,10 +13,10 @@ WARNING: DO NOT edit .pxi FILE directly, .pxi is generated from .pxi.in from hashtable cimport * -def asof_join_int64_t_by_object(ndarray[int64_t] left_values, - ndarray[int64_t] right_values, - ndarray[object] left_by_values, - ndarray[object] right_by_values, +def asof_join_uint8_t_by_int64_t(ndarray[uint8_t] left_values, + ndarray[uint8_t] right_values, + ndarray[int64_t] left_by_values, + ndarray[int64_t] right_by_values, bint allow_exact_matches=1, tolerance=None): @@ -23,9 +24,9 @@ def asof_join_int64_t_by_object(ndarray[int64_t] left_values, Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos ndarray[int64_t] left_indexer, right_indexer bint has_tolerance = 0 - int64_t tolerance_ - PyObjectHashTable hash_table - object by_value + uint8_t tolerance_ + Int64HashTable hash_table + int64_t by_value # if we are using tolerance, set our objects if tolerance is not None: @@ -38,7 +39,7 @@ def asof_join_int64_t_by_object(ndarray[int64_t] left_values, left_indexer = np.empty(left_size, dtype=np.int64) right_indexer = np.empty(left_size, dtype=np.int64) - hash_table = PyObjectHashTable(right_size) + hash_table = Int64HashTable(right_size) right_pos = 0 for left_pos in range(left_size): @@ -75,10 +76,10 @@ def asof_join_int64_t_by_object(ndarray[int64_t] left_values, return left_indexer, right_indexer -def asof_join_double_by_object(ndarray[double] left_values, - ndarray[double] right_values, - ndarray[object] left_by_values, - ndarray[object] right_by_values, +def asof_join_uint16_t_by_int64_t(ndarray[uint16_t] left_values, + ndarray[uint16_t] right_values, + ndarray[int64_t] left_by_values, + ndarray[int64_t] right_by_values, bint allow_exact_matches=1, tolerance=None): @@ -86,9 +87,9 @@ def asof_join_double_by_object(ndarray[double] left_values, Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos ndarray[int64_t] left_indexer, right_indexer bint has_tolerance = 0 - double tolerance_ - PyObjectHashTable hash_table - object by_value + uint16_t tolerance_ + Int64HashTable hash_table + int64_t by_value # if we are using tolerance, set our objects if tolerance is not None: @@ -101,7 +102,7 @@ def asof_join_double_by_object(ndarray[double] left_values, left_indexer = np.empty(left_size, dtype=np.int64) right_indexer = np.empty(left_size, dtype=np.int64) - hash_table = PyObjectHashTable(right_size) + hash_table = Int64HashTable(right_size) right_pos = 0 for left_pos in range(left_size): @@ -138,8 +139,8 @@ def asof_join_double_by_object(ndarray[double] left_values, return left_indexer, right_indexer -def asof_join_int64_t_by_int64_t(ndarray[int64_t] left_values, - ndarray[int64_t] right_values, +def asof_join_uint32_t_by_int64_t(ndarray[uint32_t] left_values, + ndarray[uint32_t] right_values, ndarray[int64_t] left_by_values, ndarray[int64_t] right_by_values, bint allow_exact_matches=1, @@ -149,7 +150,7 @@ def asof_join_int64_t_by_int64_t(ndarray[int64_t] left_values, Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos ndarray[int64_t] left_indexer, right_indexer bint has_tolerance = 0 - int64_t tolerance_ + uint32_t tolerance_ Int64HashTable hash_table int64_t by_value @@ -201,8 +202,8 @@ def asof_join_int64_t_by_int64_t(ndarray[int64_t] left_values, return left_indexer, right_indexer -def asof_join_double_by_int64_t(ndarray[double] left_values, - ndarray[double] right_values, +def asof_join_uint64_t_by_int64_t(ndarray[uint64_t] left_values, + ndarray[uint64_t] right_values, ndarray[int64_t] left_by_values, ndarray[int64_t] right_by_values, bint allow_exact_matches=1, @@ -212,7 +213,7 @@ def asof_join_double_by_int64_t(ndarray[double] left_values, Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos ndarray[int64_t] left_indexer, right_indexer bint has_tolerance = 0 - double tolerance_ + uint64_t tolerance_ Int64HashTable hash_table int64_t by_value @@ -264,21 +265,209 @@ def asof_join_double_by_int64_t(ndarray[double] left_values, return left_indexer, right_indexer -#---------------------------------------------------------------------- -# asof_join -#---------------------------------------------------------------------- +def asof_join_int8_t_by_int64_t(ndarray[int8_t] left_values, + ndarray[int8_t] right_values, + ndarray[int64_t] left_by_values, + ndarray[int64_t] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int8_t tolerance_ + Int64HashTable hash_table + int64_t by_value + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance -def asof_join_int64_t(ndarray[int64_t] left_values, + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = Int64HashTable(right_size) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table.get_item(by_value)\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int16_t_by_int64_t(ndarray[int16_t] left_values, + ndarray[int16_t] right_values, + ndarray[int64_t] left_by_values, + ndarray[int64_t] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int16_t tolerance_ + Int64HashTable hash_table + int64_t by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = Int64HashTable(right_size) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table.get_item(by_value)\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int32_t_by_int64_t(ndarray[int32_t] left_values, + ndarray[int32_t] right_values, + ndarray[int64_t] left_by_values, + ndarray[int64_t] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int32_t tolerance_ + Int64HashTable hash_table + int64_t by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = Int64HashTable(right_size) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table.get_item(by_value)\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int64_t_by_int64_t(ndarray[int64_t] left_values, ndarray[int64_t] right_values, + ndarray[int64_t] left_by_values, + ndarray[int64_t] right_by_values, bint allow_exact_matches=1, tolerance=None): cdef: - Py_ssize_t left_pos, right_pos, left_size, right_size + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos ndarray[int64_t] left_indexer, right_indexer bint has_tolerance = 0 int64_t tolerance_ + Int64HashTable hash_table + int64_t by_value # if we are using tolerance, set our objects if tolerance is not None: @@ -291,6 +480,8 @@ def asof_join_int64_t(ndarray[int64_t] left_values, left_indexer = np.empty(left_size, dtype=np.int64) right_indexer = np.empty(left_size, dtype=np.int64) + hash_table = Int64HashTable(right_size) + right_pos = 0 for left_pos in range(left_size): # restart right_pos if it went negative in a previous iteration @@ -301,36 +492,45 @@ def asof_join_int64_t(ndarray[int64_t] left_values, if allow_exact_matches: while right_pos < right_size and\ right_values[right_pos] <= left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) right_pos += 1 else: while right_pos < right_size and\ right_values[right_pos] < left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) right_pos += 1 right_pos -= 1 # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table.get_item(by_value)\ + if by_value in hash_table else -1 left_indexer[left_pos] = left_pos - right_indexer[left_pos] = right_pos + right_indexer[left_pos] = found_right_pos # if needed, verify that tolerance is met - if has_tolerance and right_pos != -1: - diff = left_values[left_pos] - right_values[right_pos] + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] if diff > tolerance_: right_indexer[left_pos] = -1 return left_indexer, right_indexer -def asof_join_double(ndarray[double] left_values, - ndarray[double] right_values, +def asof_join_float_by_int64_t(ndarray[float] left_values, + ndarray[float] right_values, + ndarray[int64_t] left_by_values, + ndarray[int64_t] right_by_values, bint allow_exact_matches=1, tolerance=None): cdef: - Py_ssize_t left_pos, right_pos, left_size, right_size + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos ndarray[int64_t] left_indexer, right_indexer bint has_tolerance = 0 - double tolerance_ + float tolerance_ + Int64HashTable hash_table + int64_t by_value # if we are using tolerance, set our objects if tolerance is not None: @@ -343,6 +543,8 @@ def asof_join_double(ndarray[double] left_values, left_indexer = np.empty(left_size, dtype=np.int64) right_indexer = np.empty(left_size, dtype=np.int64) + hash_table = Int64HashTable(right_size) + right_pos = 0 for left_pos in range(left_size): # restart right_pos if it went negative in a previous iteration @@ -353,21 +555,1263 @@ def asof_join_double(ndarray[double] left_values, if allow_exact_matches: while right_pos < right_size and\ right_values[right_pos] <= left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) right_pos += 1 else: while right_pos < right_size and\ right_values[right_pos] < left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) right_pos += 1 right_pos -= 1 # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table.get_item(by_value)\ + if by_value in hash_table else -1 left_indexer[left_pos] = left_pos - right_indexer[left_pos] = right_pos + right_indexer[left_pos] = found_right_pos # if needed, verify that tolerance is met - if has_tolerance and right_pos != -1: - diff = left_values[left_pos] - right_values[right_pos] + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] if diff > tolerance_: right_indexer[left_pos] = -1 return left_indexer, right_indexer + + +def asof_join_double_by_int64_t(ndarray[double] left_values, + ndarray[double] right_values, + ndarray[int64_t] left_by_values, + ndarray[int64_t] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + double tolerance_ + Int64HashTable hash_table + int64_t by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = Int64HashTable(right_size) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table.set_item(right_by_values[right_pos], right_pos) + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table.get_item(by_value)\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_uint8_t_by_object(ndarray[uint8_t] left_values, + ndarray[uint8_t] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + uint8_t tolerance_ + dict hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = {} + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table[by_value]\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_uint16_t_by_object(ndarray[uint16_t] left_values, + ndarray[uint16_t] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + uint16_t tolerance_ + dict hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = {} + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table[by_value]\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_uint32_t_by_object(ndarray[uint32_t] left_values, + ndarray[uint32_t] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + uint32_t tolerance_ + dict hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = {} + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table[by_value]\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_uint64_t_by_object(ndarray[uint64_t] left_values, + ndarray[uint64_t] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + uint64_t tolerance_ + dict hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = {} + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table[by_value]\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int8_t_by_object(ndarray[int8_t] left_values, + ndarray[int8_t] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int8_t tolerance_ + dict hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = {} + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table[by_value]\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int16_t_by_object(ndarray[int16_t] left_values, + ndarray[int16_t] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int16_t tolerance_ + dict hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = {} + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table[by_value]\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int32_t_by_object(ndarray[int32_t] left_values, + ndarray[int32_t] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int32_t tolerance_ + dict hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = {} + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table[by_value]\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int64_t_by_object(ndarray[int64_t] left_values, + ndarray[int64_t] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int64_t tolerance_ + dict hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = {} + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table[by_value]\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_float_by_object(ndarray[float] left_values, + ndarray[float] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + float tolerance_ + dict hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = {} + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table[by_value]\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_double_by_object(ndarray[double] left_values, + ndarray[double] right_values, + ndarray[object] left_by_values, + ndarray[object] right_by_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + double tolerance_ + dict hash_table + object by_value + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + hash_table = {} + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + hash_table[right_by_values[right_pos]] = right_pos + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + by_value = left_by_values[left_pos] + found_right_pos = hash_table[by_value]\ + if by_value in hash_table else -1 + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = found_right_pos + + # if needed, verify that tolerance is met + if has_tolerance and found_right_pos != -1: + diff = left_values[left_pos] - right_values[found_right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +#---------------------------------------------------------------------- +# asof_join +#---------------------------------------------------------------------- + + +def asof_join_uint8_t(ndarray[uint8_t] left_values, + ndarray[uint8_t] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + uint8_t tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_uint16_t(ndarray[uint16_t] left_values, + ndarray[uint16_t] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + uint16_t tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_uint32_t(ndarray[uint32_t] left_values, + ndarray[uint32_t] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + uint32_t tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_uint64_t(ndarray[uint64_t] left_values, + ndarray[uint64_t] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + uint64_t tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int8_t(ndarray[int8_t] left_values, + ndarray[int8_t] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int8_t tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int16_t(ndarray[int16_t] left_values, + ndarray[int16_t] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int16_t tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int32_t(ndarray[int32_t] left_values, + ndarray[int32_t] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int32_t tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_int64_t(ndarray[int64_t] left_values, + ndarray[int64_t] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + int64_t tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_float(ndarray[float] left_values, + ndarray[float] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + float tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +def asof_join_double(ndarray[double] left_values, + ndarray[double] right_values, + bint allow_exact_matches=1, + tolerance=None): + + cdef: + Py_ssize_t left_pos, right_pos, left_size, right_size + ndarray[int64_t] left_indexer, right_indexer + bint has_tolerance = 0 + double tolerance_ + + # if we are using tolerance, set our objects + if tolerance is not None: + has_tolerance = 1 + tolerance_ = tolerance + + left_size = len(left_values) + right_size = len(right_values) + + left_indexer = np.empty(left_size, dtype=np.int64) + right_indexer = np.empty(left_size, dtype=np.int64) + + right_pos = 0 + for left_pos in range(left_size): + # restart right_pos if it went negative in a previous iteration + if right_pos < 0: + right_pos = 0 + + # find last position in right whose value is less than left's value + if allow_exact_matches: + while right_pos < right_size and\ + right_values[right_pos] <= left_values[left_pos]: + right_pos += 1 + else: + while right_pos < right_size and\ + right_values[right_pos] < left_values[left_pos]: + right_pos += 1 + right_pos -= 1 + + # save positions as the desired index + left_indexer[left_pos] = left_pos + right_indexer[left_pos] = right_pos + + # if needed, verify that tolerance is met + if has_tolerance and right_pos != -1: + diff = left_values[left_pos] - right_values[right_pos] + if diff > tolerance_: + right_indexer[left_pos] = -1 + + return left_indexer, right_indexer + + +#---------------------------------------------------------------------- +# stringify +#---------------------------------------------------------------------- + +def stringify(ndarray[object, ndim=2] xt): + cdef: + Py_ssize_t n + ndarray[object] result + + n = len(xt) + result = np.empty(n, dtype=np.object) + + for i in range(n): + result[i] = xt[i].tostring() + + return result + diff --git a/pandas/src/joins_func_helper.pxi.in b/pandas/src/joins_func_helper.pxi.in index 06c35cfb69e53..114ea0255c5f6 100644 --- a/pandas/src/joins_func_helper.pxi.in +++ b/pandas/src/joins_func_helper.pxi.in @@ -1,3 +1,4 @@ +# cython: boundscheck=False, wraparound=False """ Template for each `dtype` helper function for hashtable @@ -10,18 +11,25 @@ WARNING: DO NOT edit .pxi FILE directly, .pxi is generated from .pxi.in {{py: -# table_type, by_dtype -by_dtypes = [('PyObjectHashTable', 'object'), ('Int64HashTable', 'int64_t')] +# by_dtype, table_type, init_table, s1, s2, s3, g1, g2 +by_dtypes = [('int64_t', 'Int64HashTable', 'Int64HashTable(right_size)', + '.set_item(', ', ', ')', + '.get_item(', ')'), + ('object', 'dict', '{}', + '[', '] = ', '', + '[', ']')] # on_dtype -on_dtypes = ['int64_t', 'double'] +on_dtypes = ['uint8_t', 'uint16_t', 'uint32_t', 'uint64_t', + 'int8_t', 'int16_t', 'int32_t', 'int64_t', + 'float', 'double'] }} from hashtable cimport * -{{for table_type, by_dtype in by_dtypes}} +{{for by_dtype, table_type, init_table, s1, s2, s3, g1, g2 in by_dtypes}} {{for on_dtype in on_dtypes}} @@ -51,7 +59,7 @@ def asof_join_{{on_dtype}}_by_{{by_dtype}}(ndarray[{{on_dtype}}] left_values, left_indexer = np.empty(left_size, dtype=np.int64) right_indexer = np.empty(left_size, dtype=np.int64) - hash_table = {{table_type}}(right_size) + hash_table = {{init_table}} right_pos = 0 for left_pos in range(left_size): @@ -63,18 +71,18 @@ def asof_join_{{on_dtype}}_by_{{by_dtype}}(ndarray[{{on_dtype}}] left_values, if allow_exact_matches: while right_pos < right_size and\ right_values[right_pos] <= left_values[left_pos]: - hash_table.set_item(right_by_values[right_pos], right_pos) + hash_table{{s1}}right_by_values[right_pos]{{s2}}right_pos{{s3}} right_pos += 1 else: while right_pos < right_size and\ right_values[right_pos] < left_values[left_pos]: - hash_table.set_item(right_by_values[right_pos], right_pos) + hash_table{{s1}}right_by_values[right_pos]{{s2}}right_pos{{s3}} right_pos += 1 right_pos -= 1 # save positions as the desired index by_value = left_by_values[left_pos] - found_right_pos = hash_table.get_item(by_value)\ + found_right_pos = hash_table{{g1}}by_value{{g2}}\ if by_value in hash_table else -1 left_indexer[left_pos] = left_pos right_indexer[left_pos] = found_right_pos @@ -98,7 +106,9 @@ def asof_join_{{on_dtype}}_by_{{by_dtype}}(ndarray[{{on_dtype}}] left_values, {{py: # on_dtype -dtypes = ['int64_t', 'double'] +dtypes = ['uint8_t', 'uint16_t', 'uint32_t', 'uint64_t', + 'int8_t', 'int16_t', 'int32_t', 'int64_t', + 'float', 'double'] }} @@ -158,3 +168,21 @@ def asof_join_{{on_dtype}}(ndarray[{{on_dtype}}] left_values, {{endfor}} + +#---------------------------------------------------------------------- +# stringify +#---------------------------------------------------------------------- + +def stringify(ndarray[object, ndim=2] xt): + cdef: + Py_ssize_t n + ndarray[object] result + + n = len(xt) + result = np.empty(n, dtype=np.object) + + for i in range(n): + result[i] = xt[i].tostring() + + return result + diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index d2060185c3246..d057a0a52b94d 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -28,7 +28,8 @@ is_list_like, _ensure_int64, _ensure_float64, - _ensure_object) + _ensure_object, + _get_dtype) from pandas.types.missing import na_value_for_dtype from pandas.core.generic import NDFrame @@ -926,17 +927,13 @@ def get_result(self): return result -_asof_functions = { - 'int64_t': _join.asof_join_int64_t, - 'double': _join.asof_join_double, -} +def _asof_function(on_type): + return getattr(_join, 'asof_join_%s' % on_type, None) + + +def _asof_by_function(on_type, by_type): + return getattr(_join, 'asof_join_%s_by_%s' % (on_type, by_type), None) -_asof_by_functions = { - ('int64_t', 'int64_t'): _join.asof_join_int64_t_by_int64_t, - ('double', 'int64_t'): _join.asof_join_double_by_int64_t, - ('int64_t', 'object'): _join.asof_join_int64_t_by_object, - ('double', 'object'): _join.asof_join_double_by_object, -} _type_casters = { 'int64_t': _ensure_int64, @@ -944,9 +941,30 @@ def get_result(self): 'object': _ensure_object, } +_cyton_types = { + 'uint8': 'uint8_t', + 'uint32': 'uint32_t', + 'uint16': 'uint16_t', + 'uint64': 'uint64_t', + 'int8': 'int8_t', + 'int32': 'int32_t', + 'int16': 'int16_t', + 'int64': 'int64_t', + 'float16': 'float', + 'float32': 'float', + 'float64': 'double', +} + def _get_cython_type(dtype): - """ Given a dtype, return 'int64_t', 'double', or 'object' """ + """ Given a dtype, return a C name like 'int64_t' or 'double' """ + type_name = _get_dtype(dtype).name + ctype = _cyton_types.get(type_name, 'object') + return ctype + + +def _get_cython_type_upcast(dtype): + """ Upcast a dtype to 'int64_t', 'double', or 'object' """ if is_integer_dtype(dtype): return 'int64_t' elif is_float_dtype(dtype): @@ -990,9 +1008,6 @@ def _validate_specification(self): if not is_list_like(self.by): self.by = [self.by] - if len(self.by) != 1: - raise MergeError("can only asof by a single key") - self.left_on = self.by + list(self.left_on) self.right_on = self.by + list(self.right_on) @@ -1046,6 +1061,11 @@ def _get_merge_keys(self): def _get_join_indexers(self): """ return the join indexers """ + def flip_stringify(xs): + """ flip an array of arrays and string-ify contents """ + xt = np.transpose(xs) + return _join.stringify(_ensure_object(xt)) + # values to compare left_values = self.left_join_keys[-1] right_values = self.right_join_keys[-1] @@ -1067,22 +1087,23 @@ def _get_join_indexers(self): # a "by" parameter requires special handling if self.by is not None: - left_by_values = self.left_join_keys[0] - right_by_values = self.right_join_keys[0] - - # choose appropriate function by type - on_type = _get_cython_type(left_values.dtype) - by_type = _get_cython_type(left_by_values.dtype) + if len(self.left_join_keys) > 2: + # get string representation of values if more than one + left_by_values = flip_stringify(self.left_join_keys[0:-1]) + right_by_values = flip_stringify(self.right_join_keys[0:-1]) + else: + left_by_values = self.left_join_keys[0] + right_by_values = self.right_join_keys[0] - on_type_caster = _type_casters[on_type] + # upcast 'by' parameter because HashTable is limited + by_type = _get_cython_type_upcast(left_by_values.dtype) by_type_caster = _type_casters[by_type] - func = _asof_by_functions[(on_type, by_type)] - - left_values = on_type_caster(left_values) - right_values = on_type_caster(right_values) left_by_values = by_type_caster(left_by_values) right_by_values = by_type_caster(right_by_values) + # choose appropriate function by type + on_type = _get_cython_type(left_values.dtype) + func = _asof_by_function(on_type, by_type) return func(left_values, right_values, left_by_values, @@ -1092,12 +1113,7 @@ def _get_join_indexers(self): else: # choose appropriate function by type on_type = _get_cython_type(left_values.dtype) - type_caster = _type_casters[on_type] - func = _asof_functions[on_type] - - left_values = type_caster(left_values) - right_values = type_caster(right_values) - + func = _asof_function(on_type) return func(left_values, right_values, self.allow_exact_matches, diff --git a/pandas/tools/tests/test_merge_asof.py b/pandas/tools/tests/test_merge_asof.py index f413618624592..25015b3d3bf7c 100644 --- a/pandas/tools/tests/test_merge_asof.py +++ b/pandas/tools/tests/test_merge_asof.py @@ -130,6 +130,117 @@ def test_missing_right_by(self): expected.loc[expected.ticker == 'MSFT', ['bid', 'ask']] = np.nan assert_frame_equal(result, expected) + def test_multiby(self): + # GH13936 + trades = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.046', + '20160525 13:30:00.048', + '20160525 13:30:00.050']), + 'ticker': ['MSFT', 'MSFT', + 'GOOG', 'GOOG', 'AAPL'], + 'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'], + 'price': [51.95, 51.95, + 720.77, 720.92, 98.00], + 'quantity': [75, 155, + 100, 100, 100]}, + columns=['time', 'ticker', 'exch', + 'price', 'quantity']) + + quotes = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.030', + '20160525 13:30:00.041', + '20160525 13:30:00.045', + '20160525 13:30:00.049']), + 'ticker': ['GOOG', 'MSFT', 'MSFT', + 'MSFT', 'GOOG', 'AAPL'], + 'exch': ['BATS', 'NSDQ', 'ARCA', 'ARCA', + 'NSDQ', 'ARCA'], + 'bid': [720.51, 51.95, 51.97, 51.99, + 720.50, 97.99], + 'ask': [720.92, 51.96, 51.98, 52.00, + 720.93, 98.01]}, + columns=['time', 'ticker', 'exch', 'bid', 'ask']) + + expected = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.046', + '20160525 13:30:00.048', + '20160525 13:30:00.050']), + 'ticker': ['MSFT', 'MSFT', + 'GOOG', 'GOOG', 'AAPL'], + 'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'], + 'price': [51.95, 51.95, + 720.77, 720.92, 98.00], + 'quantity': [75, 155, + 100, 100, 100], + 'bid': [np.nan, 51.95, 720.50, 720.51, np.nan], + 'ask': [np.nan, 51.96, 720.93, 720.92, np.nan]}, + columns=['time', 'ticker', 'exch', + 'price', 'quantity', 'bid', 'ask']) + + result = pd.merge_asof(trades, quotes, on='time', + by=['ticker', 'exch']) + assert_frame_equal(result, expected) + + def test_multiby_heterogeneous_types(self): + # GH13936 + trades = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.046', + '20160525 13:30:00.048', + '20160525 13:30:00.050']), + 'ticker': [0, 0, 1, 1, 2], + 'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'], + 'price': [51.95, 51.95, + 720.77, 720.92, 98.00], + 'quantity': [75, 155, + 100, 100, 100]}, + columns=['time', 'ticker', 'exch', + 'price', 'quantity']) + + quotes = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.030', + '20160525 13:30:00.041', + '20160525 13:30:00.045', + '20160525 13:30:00.049']), + 'ticker': [1, 0, 0, 0, 1, 2], + 'exch': ['BATS', 'NSDQ', 'ARCA', 'ARCA', + 'NSDQ', 'ARCA'], + 'bid': [720.51, 51.95, 51.97, 51.99, + 720.50, 97.99], + 'ask': [720.92, 51.96, 51.98, 52.00, + 720.93, 98.01]}, + columns=['time', 'ticker', 'exch', 'bid', 'ask']) + + expected = pd.DataFrame({ + 'time': pd.to_datetime(['20160525 13:30:00.023', + '20160525 13:30:00.023', + '20160525 13:30:00.046', + '20160525 13:30:00.048', + '20160525 13:30:00.050']), + 'ticker': [0, 0, 1, 1, 2], + 'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'], + 'price': [51.95, 51.95, + 720.77, 720.92, 98.00], + 'quantity': [75, 155, + 100, 100, 100], + 'bid': [np.nan, 51.95, 720.50, 720.51, np.nan], + 'ask': [np.nan, 51.96, 720.93, 720.92, np.nan]}, + columns=['time', 'ticker', 'exch', + 'price', 'quantity', 'bid', 'ask']) + + result = pd.merge_asof(trades, quotes, on='time', + by=['ticker', 'exch']) + assert_frame_equal(result, expected) + def test_basic2(self): expected = self.read_data('asof2.csv') @@ -428,6 +539,69 @@ def test_on_float(self): assert_frame_equal(result, expected) + def test_on_specialized_type(self): + # GH13936 + for dtype in [np.uint16, np.uint32, np.uint64, + np.int16, np.int32, np.int64, + np.float32, np.float64]: + df1 = pd.DataFrame({ + 'value': [5, 2, 25, 300, 78, 1040, 79], + 'symbol': list("ABCDEFG")}, + columns=['symbol', 'value']) + df1.value = dtype(df1.value) + + df2 = pd.DataFrame({ + 'value': [0, 100, 1000, 10000], + 'result': list('xyzw')}, + columns=['value', 'result']) + df2.value = dtype(df2.value) + + df1 = df1.sort_values('value').reset_index(drop=True) + + result = pd.merge_asof(df1, df2, on='value') + + expected = pd.DataFrame({ + 'symbol': list("BACEGDF"), + 'value': [2, 5, 25, 78, 79, 300, 1040], + 'result': list('xxxxxyz')}, + columns=['symbol', 'value', 'result']) + expected.value = dtype(expected.value) + + assert_frame_equal(result, expected) + + def test_on_specialized_type_by_int(self): + # GH13936 + for dtype in [np.uint16, np.uint32, np.uint64, + np.int16, np.int32, np.int64, + np.float32, np.float64]: + df1 = pd.DataFrame({ + 'value': [5, 2, 25, 300, 78, 1040, 79], + 'key': [1, 2, 3, 2, 3, 1, 2], + 'symbol': list("ABCDEFG")}, + columns=['symbol', 'key', 'value']) + df1.value = dtype(df1.value) + + df2 = pd.DataFrame({ + 'value': [0, 100, 1000, 10000], + 'key': [1, 2, 2, 3], + 'result': list('xyzw')}, + columns=['value', 'key', 'result']) + df2.value = dtype(df2.value) + + df1 = df1.sort_values('value').reset_index(drop=True) + + result = pd.merge_asof(df1, df2, on='value', by='key') + + expected = pd.DataFrame({ + 'symbol': list("BACEGDF"), + 'key': [2, 1, 3, 3, 2, 2, 1], + 'value': [2, 5, 25, 78, 79, 300, 1040], + 'result': [np.nan, 'x', np.nan, np.nan, np.nan, 'y', 'x']}, + columns=['symbol', 'key', 'value', 'result']) + expected.value = dtype(expected.value) + + assert_frame_equal(result, expected) + def test_on_float_by_int(self): # type specialize both "by" and "on" parameters df1 = pd.DataFrame({ From 46cc3099a07b9a0a0dfcb3ec13079ee9f2b33d4d Mon Sep 17 00:00:00 2001 From: "Christopher C. Aycock" Date: Thu, 1 Dec 2016 14:12:55 -0500 Subject: [PATCH 2/8] Update documentation --- doc/source/whatsnew/v0.19.2.txt | 12 ++++++++++++ pandas/tools/merge.py | 9 ++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/doc/source/whatsnew/v0.19.2.txt b/doc/source/whatsnew/v0.19.2.txt index cafbdb731f494..0e668e9dcf1d2 100644 --- a/doc/source/whatsnew/v0.19.2.txt +++ b/doc/source/whatsnew/v0.19.2.txt @@ -71,3 +71,15 @@ Bug Fixes - Explicit check in ``to_stata`` and ``StataWriter`` for out-of-range values when writing doubles (:issue:`14618`) + + + + +.. _whatsnew_0192.enhancements.other: + +Other enhancements +^^^^^^^^^^^^^^^^^^ + +- ``pd.merge_asof()`` can take multiple columns in ``by`` parameter and has specialized types (:issue:`13936`) + + diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index ca71b9cca2839..1669f99d4783d 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -271,8 +271,8 @@ def merge_asof(left, right, on=None, DataFrame whose 'on' key is less than or equal to the left's key. Both DataFrames must be sorted by the key. - Optionally perform group-wise merge. This searches for the nearest match - on the 'on' key within the same group according to 'by'. + Optionally match on equivalent keys with 'by' before searching for nearest + match with 'on'. .. versionadded:: 0.19.0 @@ -289,9 +289,8 @@ def merge_asof(left, right, on=None, Field name to join on in left DataFrame. right_on : label Field name to join on in right DataFrame. - by : column name - Group both the left and right DataFrames by the group column; perform - the merge operation on these pieces and recombine. + by : column name or list of column names + Match on these columns before performing merge operation. suffixes : 2-length sequence (tuple, list, ...) Suffix to apply to overlapping column names in the left and right side, respectively From fafbb02265de0c9725e5e0baa2cd11240127be64 Mon Sep 17 00:00:00 2001 From: "Christopher C. Aycock" Date: Mon, 12 Dec 2016 11:26:45 -0500 Subject: [PATCH 3/8] Updated benchmarks to reflect new ASV setup --- asv_bench/benchmarks/join_merge.py | 76 +++++------------------------- 1 file changed, 13 insertions(+), 63 deletions(-) diff --git a/asv_bench/benchmarks/join_merge.py b/asv_bench/benchmarks/join_merge.py index 875a0c73d0652..d9c631fa92efd 100644 --- a/asv_bench/benchmarks/join_merge.py +++ b/asv_bench/benchmarks/join_merge.py @@ -239,42 +239,6 @@ def time_merge_dataframe_integer_key(self): merge(self.df, self.df2, on='key1') -class merge_asof_int32_noby(object): - - def setup(self): - np.random.seed(0) - one_count = 200000 - two_count = 1000000 - self.df1 = pd.DataFrame({'time': np.random.randint(0, one_count/20, one_count), - 'value1': np.random.randn(one_count)}) - self.df1.time = np.int32(self.df1.time) - self.df2 = pd.DataFrame({'time': np.random.randint(0, two_count/20, two_count), - 'value2': np.random.randn(two_count)}) - self.df2.time = np.int32(self.df2.time) - self.df1 = self.df1.sort_values('time') - self.df2 = self.df2.sort_values('time') - - def time_merge_asof_int32_noby(self): - merge_asof(self.df1, self.df2, on='time') - - -class merge_asof_by_object(object): - - def setup(self): - import string - np.random.seed(0) - one_count = 200000 - two_count = 1000000 - self.df1 = pd.DataFrame({'time': np.random.randint(0, one_count/20, one_count), - 'key': np.random.choice(list(string.ascii_uppercase), one_count), - 'value1': np.random.randn(one_count)}) - self.df2 = pd.DataFrame({'time': np.random.randint(0, two_count/20, two_count), - 'key': np.random.choice(list(string.ascii_uppercase), two_count), - 'value2': np.random.randn(two_count)}) - self.df1 = self.df1.sort_values('time') - self.df2 = self.df2.sort_values('time') - - class i8merge(object): goal_time = 0.2 @@ -306,35 +270,8 @@ def setup(self): 'key' : np.tile(np.arange(0, 10000, 2), 10), 'lvalue': np.random.randn(50000)}) -<<<<<<< HEAD -class merge_asof_multiby(object): - - def setup(self): - import string - np.random.seed(0) - one_count = 200000 - two_count = 1000000 - self.df1 = pd.DataFrame({'time': np.random.randint(0, one_count/20, one_count), - 'key1': np.random.choice(list(string.ascii_uppercase), one_count), - 'key2': np.random.choice(list(string.ascii_uppercase), one_count), - 'value1': np.random.randn(one_count)}) - self.df2 = pd.DataFrame({'time': np.random.randint(0, two_count/20, two_count), - 'key1': np.random.choice(list(string.ascii_uppercase), two_count), - 'key2': np.random.choice(list(string.ascii_uppercase), two_count), - 'value2': np.random.randn(two_count)}) - self.df1 = self.df1.sort_values('time') - self.df2 = self.df2.sort_values('time') - - def time_merge_asof_multiby(self): - merge_asof(self.df1, self.df2, on='time', by=['key1', 'key2']) - - -class join_non_unique_equal(object): - goal_time = 0.2 -======= self.right = pd.DataFrame({'key' : np.arange(10000), 'rvalue' : np.random.randn(10000)}) ->>>>>>> master def time_merge_ordered(self): merge_ordered(self.left, self.right, on='key', left_by='group') @@ -365,12 +302,19 @@ def setup(self): self.df1 = self.df1.sort_values('time') self.df2 = self.df2.sort_values('time') + self.df1['time32'] = np.int32(self.df1.time) + self.df2['time32'] = np.int32(self.df2.time) + self.df1a = self.df1[['time', 'value1']] self.df2a = self.df2[['time', 'value2']] self.df1b = self.df1[['time', 'key', 'value1']] self.df2b = self.df2[['time', 'key', 'value2']] self.df1c = self.df1[['time', 'key2', 'value1']] self.df2c = self.df2[['time', 'key2', 'value2']] + self.df1d = self.df1[['time32', 'value1']] + self.df2d = self.df2[['time32', 'value2']] + self.df1e = self.df1[['time', 'key', 'key2', 'value1']] + self.df2e = self.df2[['time', 'key', 'key2', 'value2']] def time_noby(self): merge_asof(self.df1a, self.df2a, on='time') @@ -381,6 +325,12 @@ def time_by_object(self): def time_by_int(self): merge_asof(self.df1c, self.df2c, on='time', by='key2') + def time_on_int32(self): + merge_asof(self.df1d, self.df2d, on='time32') + + def time_multiby(self): + merge_asof(self.df1e, self.df2e, on='time', by=['key', 'key2']) + #---------------------------------------------------------------------- # data alignment From 2bce3cc3cc6ba7e50d24191898909b96b8a9bba8 Mon Sep 17 00:00:00 2001 From: "Christopher C. Aycock" Date: Mon, 12 Dec 2016 14:17:58 -0500 Subject: [PATCH 4/8] Revert dict back to PyObjectHashTable in response to code review --- pandas/src/joins_func_helper.pxi.in | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/pandas/src/joins_func_helper.pxi.in b/pandas/src/joins_func_helper.pxi.in index 114ea0255c5f6..7aa737b4e74fc 100644 --- a/pandas/src/joins_func_helper.pxi.in +++ b/pandas/src/joins_func_helper.pxi.in @@ -11,13 +11,8 @@ WARNING: DO NOT edit .pxi FILE directly, .pxi is generated from .pxi.in {{py: -# by_dtype, table_type, init_table, s1, s2, s3, g1, g2 -by_dtypes = [('int64_t', 'Int64HashTable', 'Int64HashTable(right_size)', - '.set_item(', ', ', ')', - '.get_item(', ')'), - ('object', 'dict', '{}', - '[', '] = ', '', - '[', ']')] +# table_type, by_dtype +by_dtypes = [('PyObjectHashTable', 'object'), ('Int64HashTable', 'int64_t')] # on_dtype on_dtypes = ['uint8_t', 'uint16_t', 'uint32_t', 'uint64_t', @@ -29,7 +24,7 @@ on_dtypes = ['uint8_t', 'uint16_t', 'uint32_t', 'uint64_t', from hashtable cimport * -{{for by_dtype, table_type, init_table, s1, s2, s3, g1, g2 in by_dtypes}} +{{for table_type, by_dtype in by_dtypes}} {{for on_dtype in on_dtypes}} @@ -59,7 +54,7 @@ def asof_join_{{on_dtype}}_by_{{by_dtype}}(ndarray[{{on_dtype}}] left_values, left_indexer = np.empty(left_size, dtype=np.int64) right_indexer = np.empty(left_size, dtype=np.int64) - hash_table = {{init_table}} + hash_table = {{table_type}}(right_size) right_pos = 0 for left_pos in range(left_size): @@ -71,18 +66,18 @@ def asof_join_{{on_dtype}}_by_{{by_dtype}}(ndarray[{{on_dtype}}] left_values, if allow_exact_matches: while right_pos < right_size and\ right_values[right_pos] <= left_values[left_pos]: - hash_table{{s1}}right_by_values[right_pos]{{s2}}right_pos{{s3}} + hash_table.set_item(right_by_values[right_pos], right_pos) right_pos += 1 else: while right_pos < right_size and\ right_values[right_pos] < left_values[left_pos]: - hash_table{{s1}}right_by_values[right_pos]{{s2}}right_pos{{s3}} + hash_table.set_item(right_by_values[right_pos], right_pos) right_pos += 1 right_pos -= 1 # save positions as the desired index by_value = left_by_values[left_pos] - found_right_pos = hash_table{{g1}}by_value{{g2}}\ + found_right_pos = hash_table.get_item(by_value)\ if by_value in hash_table else -1 left_indexer[left_pos] = left_pos right_indexer[left_pos] = found_right_pos From 0ad1687dfdafc790851b60c88c0711f095f683a1 Mon Sep 17 00:00:00 2001 From: "Christopher C. Aycock" Date: Mon, 12 Dec 2016 15:46:25 -0500 Subject: [PATCH 5/8] Fixed whatsnew --- doc/source/whatsnew/v0.19.2.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v0.19.2.txt b/doc/source/whatsnew/v0.19.2.txt index 1435d48117b32..8f23ff81f29b4 100644 --- a/doc/source/whatsnew/v0.19.2.txt +++ b/doc/source/whatsnew/v0.19.2.txt @@ -87,6 +87,6 @@ Bug Fixes Other enhancements ^^^^^^^^^^^^^^^^^^ -- ``pd.merge_asof()`` can take multiple columns in ``by`` parameter and has specialized types (:issue:`13936`) +- ``pd.merge_asof()`` can take multiple columns in ``by`` parameter and has specialized dtypes for better performace (:issue:`13936`) From 89256f01a593618f63b1ea9988c8d52014ca9870 Mon Sep 17 00:00:00 2001 From: "Christopher C. Aycock" Date: Mon, 12 Dec 2016 17:40:48 -0500 Subject: [PATCH 6/8] Test 8-bit integers and raise error on 16-bit floats; add comments --- pandas/src/joins_func_helper.pxi.in | 1 + pandas/tools/merge.py | 5 ++++- pandas/tools/tests/test_merge_asof.py | 20 ++++++++++---------- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pandas/src/joins_func_helper.pxi.in b/pandas/src/joins_func_helper.pxi.in index 7aa737b4e74fc..5ef3dda398217 100644 --- a/pandas/src/joins_func_helper.pxi.in +++ b/pandas/src/joins_func_helper.pxi.in @@ -169,6 +169,7 @@ def asof_join_{{on_dtype}}(ndarray[{{on_dtype}}] left_values, #---------------------------------------------------------------------- def stringify(ndarray[object, ndim=2] xt): + """ Invokes .tostring() on each array entry within a 2D array. """ cdef: Py_ssize_t n ndarray[object] result diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index f18294fc823cd..26c9afea1896e 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -949,7 +949,7 @@ def _asof_by_function(on_type, by_type): 'int32': 'int32_t', 'int16': 'int16_t', 'int64': 'int64_t', - 'float16': 'float', + 'float16': 'error', 'float32': 'float', 'float64': 'double', } @@ -959,6 +959,8 @@ def _get_cython_type(dtype): """ Given a dtype, return a C name like 'int64_t' or 'double' """ type_name = _get_dtype(dtype).name ctype = _cyton_types.get(type_name, 'object') + if ctype == 'error': + raise MergeError('unsupported type: ' + type_name) return ctype @@ -1063,6 +1065,7 @@ def _get_join_indexers(self): def flip_stringify(xs): """ flip an array of arrays and string-ify contents """ xt = np.transpose(xs) + # numpy arrays aren't hashable, so we convert to a string return _join.stringify(_ensure_object(xt)) # values to compare diff --git a/pandas/tools/tests/test_merge_asof.py b/pandas/tools/tests/test_merge_asof.py index 48a2857fb30eb..e2128d7fa6d8b 100644 --- a/pandas/tools/tests/test_merge_asof.py +++ b/pandas/tools/tests/test_merge_asof.py @@ -565,17 +565,17 @@ def test_on_float(self): def test_on_specialized_type(self): # GH13936 - for dtype in [np.uint16, np.uint32, np.uint64, - np.int16, np.int32, np.int64, + for dtype in [np.uint8, np.uint16, np.uint32, np.uint64, + np.int8, np.int16, np.int32, np.int64, np.float32, np.float64]: df1 = pd.DataFrame({ - 'value': [5, 2, 25, 300, 78, 1040, 79], + 'value': [5, 2, 25, 100, 78, 120, 79], 'symbol': list("ABCDEFG")}, columns=['symbol', 'value']) df1.value = dtype(df1.value) df2 = pd.DataFrame({ - 'value': [0, 100, 1000, 10000], + 'value': [0, 80, 120, 125], 'result': list('xyzw')}, columns=['value', 'result']) df2.value = dtype(df2.value) @@ -586,7 +586,7 @@ def test_on_specialized_type(self): expected = pd.DataFrame({ 'symbol': list("BACEGDF"), - 'value': [2, 5, 25, 78, 79, 300, 1040], + 'value': [2, 5, 25, 78, 79, 100, 120], 'result': list('xxxxxyz')}, columns=['symbol', 'value', 'result']) expected.value = dtype(expected.value) @@ -595,18 +595,18 @@ def test_on_specialized_type(self): def test_on_specialized_type_by_int(self): # GH13936 - for dtype in [np.uint16, np.uint32, np.uint64, - np.int16, np.int32, np.int64, + for dtype in [np.uint8, np.uint16, np.uint32, np.uint64, + np.int8, np.int16, np.int32, np.int64, np.float32, np.float64]: df1 = pd.DataFrame({ - 'value': [5, 2, 25, 300, 78, 1040, 79], + 'value': [5, 2, 25, 100, 78, 120, 79], 'key': [1, 2, 3, 2, 3, 1, 2], 'symbol': list("ABCDEFG")}, columns=['symbol', 'key', 'value']) df1.value = dtype(df1.value) df2 = pd.DataFrame({ - 'value': [0, 100, 1000, 10000], + 'value': [0, 80, 120, 125], 'key': [1, 2, 2, 3], 'result': list('xyzw')}, columns=['value', 'key', 'result']) @@ -619,7 +619,7 @@ def test_on_specialized_type_by_int(self): expected = pd.DataFrame({ 'symbol': list("BACEGDF"), 'key': [2, 1, 3, 3, 2, 2, 1], - 'value': [2, 5, 25, 78, 79, 300, 1040], + 'value': [2, 5, 25, 78, 79, 100, 120], 'result': [np.nan, 'x', np.nan, np.nan, np.nan, 'y', 'x']}, columns=['symbol', 'key', 'value', 'result']) expected.value = dtype(expected.value) From 1f208a8179589bdb1a7f1b961de6d6d1cd20fb2e Mon Sep 17 00:00:00 2001 From: "Christopher C. Aycock" Date: Wed, 14 Dec 2016 17:26:57 -0500 Subject: [PATCH 7/8] Use tuple representation instead of strings --- doc/source/whatsnew/v0.19.2.txt | 2 -- pandas/src/joins_func_helper.pxi.in | 19 ------------------- pandas/tools/merge.py | 23 +++++++++++++---------- 3 files changed, 13 insertions(+), 31 deletions(-) diff --git a/doc/source/whatsnew/v0.19.2.txt b/doc/source/whatsnew/v0.19.2.txt index 9c78a6274e06a..d16697a00a689 100644 --- a/doc/source/whatsnew/v0.19.2.txt +++ b/doc/source/whatsnew/v0.19.2.txt @@ -90,5 +90,3 @@ Bug Fixes - Explicit check in ``to_stata`` and ``StataWriter`` for out-of-range values when writing doubles (:issue:`14618`) - Bug in ``unstack()`` if called with a list of column(s) as an argument, regardless of the dtypes of all columns, they get coerced to ``object`` (:issue:`11847`) - - diff --git a/pandas/src/joins_func_helper.pxi.in b/pandas/src/joins_func_helper.pxi.in index 5ef3dda398217..33926a23f7f41 100644 --- a/pandas/src/joins_func_helper.pxi.in +++ b/pandas/src/joins_func_helper.pxi.in @@ -163,22 +163,3 @@ def asof_join_{{on_dtype}}(ndarray[{{on_dtype}}] left_values, {{endfor}} - -#---------------------------------------------------------------------- -# stringify -#---------------------------------------------------------------------- - -def stringify(ndarray[object, ndim=2] xt): - """ Invokes .tostring() on each array entry within a 2D array. """ - cdef: - Py_ssize_t n - ndarray[object] result - - n = len(xt) - result = np.empty(n, dtype=np.object) - - for i in range(n): - result[i] = xt[i].tostring() - - return result - diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index a8a7cc0a7917d..07e2b3bf3875a 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -5,6 +5,8 @@ import copy import warnings +import string + import numpy as np from pandas.compat import range, lrange, lzip, zip, map, filter import pandas.compat as compat @@ -303,12 +305,12 @@ def merge_asof(left, right, on=None, by : column name or list of column names Match on these columns before performing merge operation. left_by : column name - Field name to match on in the left DataFrame. + Field names to match on in the left DataFrame. .. versionadded:: 0.19.2 right_by : column name - Field name to match on in the right DataFrame. + Field names to match on in the right DataFrame. .. versionadded:: 0.19.2 @@ -1156,11 +1158,12 @@ def _get_merge_keys(self): def _get_join_indexers(self): """ return the join indexers """ - def flip_stringify(xs): - """ flip an array of arrays and string-ify contents """ - xt = np.transpose(xs) - # numpy arrays aren't hashable, so we convert to a string - return _join.stringify(_ensure_object(xt)) + def flip(xs): + """ unlike np.transpose, this returns an array of tuples """ + labels = list(string.ascii_lowercase[:len(xs)]) + dtypes = [x.dtype for x in xs] + labeled_dtypes = list(zip(labels, dtypes)) + return np.array(lzip(*xs), labeled_dtypes) # values to compare left_values = (self.left.index.values if self.left_index else @@ -1186,9 +1189,9 @@ def flip_stringify(xs): # a "by" parameter requires special handling if self.left_by is not None: if len(self.left_join_keys) > 2: - # get string representation of values if more than one - left_by_values = flip_stringify(self.left_join_keys[0:-1]) - right_by_values = flip_stringify(self.right_join_keys[0:-1]) + # get tuple representation of values if more than one + left_by_values = flip(self.left_join_keys[0:-1]) + right_by_values = flip(self.right_join_keys[0:-1]) else: left_by_values = self.left_join_keys[0] right_by_values = self.right_join_keys[0] From ffcf0c23fe20d58635ef577c4f82346084c4e4b1 Mon Sep 17 00:00:00 2001 From: "Christopher C. Aycock" Date: Fri, 16 Dec 2016 10:39:28 -0500 Subject: [PATCH 8/8] Added test to reject float16; fixed typos --- pandas/tools/merge.py | 4 +-- pandas/tools/tests/test_merge_asof.py | 50 ++++++++++++++++----------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index 07e2b3bf3875a..efae7c63a9d0e 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -1013,7 +1013,7 @@ def _asof_by_function(on_type, by_type): 'object': _ensure_object, } -_cyton_types = { +_cython_types = { 'uint8': 'uint8_t', 'uint32': 'uint32_t', 'uint16': 'uint16_t', @@ -1031,7 +1031,7 @@ def _asof_by_function(on_type, by_type): def _get_cython_type(dtype): """ Given a dtype, return a C name like 'int64_t' or 'double' """ type_name = _get_dtype(dtype).name - ctype = _cyton_types.get(type_name, 'object') + ctype = _cython_types.get(type_name, 'object') if ctype == 'error': raise MergeError('unsupported type: ' + type_name) return ctype diff --git a/pandas/tools/tests/test_merge_asof.py b/pandas/tools/tests/test_merge_asof.py index d4a5faad87643..f3e8bc913ad67 100644 --- a/pandas/tools/tests/test_merge_asof.py +++ b/pandas/tools/tests/test_merge_asof.py @@ -657,7 +657,7 @@ def test_on_specialized_type(self): # GH13936 for dtype in [np.uint8, np.uint16, np.uint32, np.uint64, np.int8, np.int16, np.int32, np.int64, - np.float32, np.float64]: + np.float16, np.float32, np.float64]: df1 = pd.DataFrame({ 'value': [5, 2, 25, 100, 78, 120, 79], 'symbol': list("ABCDEFG")}, @@ -672,22 +672,26 @@ def test_on_specialized_type(self): df1 = df1.sort_values('value').reset_index(drop=True) - result = pd.merge_asof(df1, df2, on='value') + if dtype == np.float16: + with self.assertRaises(MergeError): + pd.merge_asof(df1, df2, on='value') + else: + result = pd.merge_asof(df1, df2, on='value') - expected = pd.DataFrame({ - 'symbol': list("BACEGDF"), - 'value': [2, 5, 25, 78, 79, 100, 120], - 'result': list('xxxxxyz')}, - columns=['symbol', 'value', 'result']) - expected.value = dtype(expected.value) + expected = pd.DataFrame({ + 'symbol': list("BACEGDF"), + 'value': [2, 5, 25, 78, 79, 100, 120], + 'result': list('xxxxxyz')}, + columns=['symbol', 'value', 'result']) + expected.value = dtype(expected.value) - assert_frame_equal(result, expected) + assert_frame_equal(result, expected) def test_on_specialized_type_by_int(self): # GH13936 for dtype in [np.uint8, np.uint16, np.uint32, np.uint64, np.int8, np.int16, np.int32, np.int64, - np.float32, np.float64]: + np.float16, np.float32, np.float64]: df1 = pd.DataFrame({ 'value': [5, 2, 25, 100, 78, 120, 79], 'key': [1, 2, 3, 2, 3, 1, 2], @@ -704,17 +708,21 @@ def test_on_specialized_type_by_int(self): df1 = df1.sort_values('value').reset_index(drop=True) - result = pd.merge_asof(df1, df2, on='value', by='key') - - expected = pd.DataFrame({ - 'symbol': list("BACEGDF"), - 'key': [2, 1, 3, 3, 2, 2, 1], - 'value': [2, 5, 25, 78, 79, 100, 120], - 'result': [np.nan, 'x', np.nan, np.nan, np.nan, 'y', 'x']}, - columns=['symbol', 'key', 'value', 'result']) - expected.value = dtype(expected.value) - - assert_frame_equal(result, expected) + if dtype == np.float16: + with self.assertRaises(MergeError): + pd.merge_asof(df1, df2, on='value', by='key') + else: + result = pd.merge_asof(df1, df2, on='value', by='key') + + expected = pd.DataFrame({ + 'symbol': list("BACEGDF"), + 'key': [2, 1, 3, 3, 2, 2, 1], + 'value': [2, 5, 25, 78, 79, 100, 120], + 'result': [np.nan, 'x', np.nan, np.nan, np.nan, 'y', 'x']}, + columns=['symbol', 'key', 'value', 'result']) + expected.value = dtype(expected.value) + + assert_frame_equal(result, expected) def test_on_float_by_int(self): # type specialize both "by" and "on" parameters