Skip to content

REF: use fused types for the rest of libjoin #23214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
360 changes: 356 additions & 4 deletions pandas/_libs/join.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ cdef double nan = NaN
from pandas._libs.algos import groupsort_indexer, ensure_platform_int
from pandas.core.algorithms import take_nd

include "join_func_helper.pxi"


def inner_join(ndarray[int64_t] left, ndarray[int64_t] right,
Py_ssize_t max_groups):
Expand Down Expand Up @@ -309,8 +307,8 @@ left_join_indexer_unique_int64 = left_join_indexer_unique["int64_t"]
left_join_indexer_unique_uint64 = left_join_indexer_unique["uint64_t"]


# @cython.wraparound(False)
# @cython.boundscheck(False)
@cython.wraparound(False)
@cython.boundscheck(False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the git history my impression is that this may have been commented-out unintentionally. Maybe the CI will tell us otherwise.

def left_join_indexer(ndarray[join_t] left, ndarray[join_t] right):
"""
Two-pass algorithm for monotonic indexes. Handles many-to-one merges
Expand Down Expand Up @@ -656,3 +654,357 @@ outer_join_indexer_object = outer_join_indexer["object"]
outer_join_indexer_int32 = outer_join_indexer["int32_t"]
outer_join_indexer_int64 = outer_join_indexer["int64_t"]
outer_join_indexer_uint64 = outer_join_indexer["uint64_t"]


# ----------------------------------------------------------------------
# asof_join_by
# ----------------------------------------------------------------------

from hashtable cimport (
HashTable, PyObjectHashTable, UInt64HashTable, Int64HashTable)

ctypedef fused asof_t:
uint8_t
uint16_t
uint32_t
uint64_t
int8_t
int16_t
int32_t
int64_t
float
double

ctypedef fused by_t:
object
int64_t
uint64_t


def asof_join_backward_on_X_by_Y(ndarray[asof_t] left_values,
ndarray[asof_t] right_values,
ndarray[by_t] left_by_values,
ndarray[by_t] right_by_values,
bint allow_exact_matches=1,
tolerance=None):

cdef:
Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos
ndarray[int64_t] left_indexer, right_indexer
bint has_tolerance = 0
asof_t tolerance_ = 0
asof_t diff = 0
HashTable hash_table
by_t by_value

# if we are using tolerance, set our objects
if tolerance is not None:
has_tolerance = 1
tolerance_ = tolerance

left_size = len(left_values)
right_size = len(right_values)

left_indexer = np.empty(left_size, dtype=np.int64)
right_indexer = np.empty(left_size, dtype=np.int64)

if by_t is object:
hash_table = PyObjectHashTable(right_size)
elif by_t is int64_t:
hash_table = Int64HashTable(right_size)
elif by_t is uint64_t:
hash_table = UInt64HashTable(right_size)

right_pos = 0
for left_pos in range(left_size):
# restart right_pos if it went negative in a previous iteration
if right_pos < 0:
right_pos = 0

# find last position in right whose value is less than left's
if allow_exact_matches:
while (right_pos < right_size and
right_values[right_pos] <= left_values[left_pos]):
hash_table.set_item(right_by_values[right_pos], right_pos)
right_pos += 1
else:
while (right_pos < right_size and
right_values[right_pos] < left_values[left_pos]):
hash_table.set_item(right_by_values[right_pos], right_pos)
right_pos += 1
right_pos -= 1

# save positions as the desired index
by_value = left_by_values[left_pos]
found_right_pos = (hash_table.get_item(by_value)
if by_value in hash_table else -1)
left_indexer[left_pos] = left_pos
right_indexer[left_pos] = found_right_pos

# if needed, verify that tolerance is met
if has_tolerance and found_right_pos != -1:
diff = left_values[left_pos] - right_values[found_right_pos]
if diff > tolerance_:
right_indexer[left_pos] = -1

return left_indexer, right_indexer


def asof_join_forward_on_X_by_Y(ndarray[asof_t] left_values,
ndarray[asof_t] right_values,
ndarray[by_t] left_by_values,
ndarray[by_t] right_by_values,
bint allow_exact_matches=1,
tolerance=None):

cdef:
Py_ssize_t left_pos, right_pos, left_size, right_size, found_right_pos
ndarray[int64_t] left_indexer, right_indexer
bint has_tolerance = 0
asof_t tolerance_ = 0
asof_t diff = 0
HashTable hash_table
by_t by_value

# if we are using tolerance, set our objects
if tolerance is not None:
has_tolerance = 1
tolerance_ = tolerance

left_size = len(left_values)
right_size = len(right_values)

left_indexer = np.empty(left_size, dtype=np.int64)
right_indexer = np.empty(left_size, dtype=np.int64)

if by_t is object:
hash_table = PyObjectHashTable(right_size)
elif by_t is int64_t:
hash_table = Int64HashTable(right_size)
elif by_t is uint64_t:
hash_table = UInt64HashTable(right_size)

right_pos = right_size - 1
for left_pos in range(left_size - 1, -1, -1):
# restart right_pos if it went over in a previous iteration
if right_pos == right_size:
right_pos = right_size - 1

# find first position in right whose value is greater than left's
if allow_exact_matches:
while (right_pos >= 0 and
right_values[right_pos] >= left_values[left_pos]):
hash_table.set_item(right_by_values[right_pos], right_pos)
right_pos -= 1
else:
while (right_pos >= 0 and
right_values[right_pos] > left_values[left_pos]):
hash_table.set_item(right_by_values[right_pos], right_pos)
right_pos -= 1
right_pos += 1

# save positions as the desired index
by_value = left_by_values[left_pos]
found_right_pos = (hash_table.get_item(by_value)
if by_value in hash_table else -1)
left_indexer[left_pos] = left_pos
right_indexer[left_pos] = found_right_pos

# if needed, verify that tolerance is met
if has_tolerance and found_right_pos != -1:
diff = right_values[found_right_pos] - left_values[left_pos]
if diff > tolerance_:
right_indexer[left_pos] = -1

return left_indexer, right_indexer


def asof_join_nearest_on_X_by_Y(ndarray[asof_t] left_values,
ndarray[asof_t] right_values,
ndarray[by_t] left_by_values,
ndarray[by_t] right_by_values,
bint allow_exact_matches=1,
tolerance=None):

cdef:
Py_ssize_t left_size, right_size, i
ndarray[int64_t] left_indexer, right_indexer, bli, bri, fli, fri
asof_t bdiff, fdiff

left_size = len(left_values)
right_size = len(right_values)

left_indexer = np.empty(left_size, dtype=np.int64)
right_indexer = np.empty(left_size, dtype=np.int64)

# search both forward and backward
bli, bri = asof_join_backward_on_X_by_Y(left_values,
right_values,
left_by_values,
right_by_values,
allow_exact_matches,
tolerance)
fli, fri = asof_join_forward_on_X_by_Y(left_values,
right_values,
left_by_values,
right_by_values,
allow_exact_matches,
tolerance)

for i in range(len(bri)):
# choose timestamp from right with smaller difference
if bri[i] != -1 and fri[i] != -1:
bdiff = left_values[bli[i]] - right_values[bri[i]]
fdiff = right_values[fri[i]] - left_values[fli[i]]
right_indexer[i] = bri[i] if bdiff <= fdiff else fri[i]
else:
right_indexer[i] = bri[i] if bri[i] != -1 else fri[i]
left_indexer[i] = bli[i]

return left_indexer, right_indexer


# ----------------------------------------------------------------------
# asof_join
# ----------------------------------------------------------------------

def asof_join_backward(ndarray[asof_t] left_values,
ndarray[asof_t] right_values,
bint allow_exact_matches=1,
tolerance=None):

cdef:
Py_ssize_t left_pos, right_pos, left_size, right_size
ndarray[int64_t] left_indexer, right_indexer
bint has_tolerance = 0
asof_t tolerance_ = 0
asof_t diff = 0

# if we are using tolerance, set our objects
if tolerance is not None:
has_tolerance = 1
tolerance_ = tolerance

left_size = len(left_values)
right_size = len(right_values)

left_indexer = np.empty(left_size, dtype=np.int64)
right_indexer = np.empty(left_size, dtype=np.int64)

right_pos = 0
for left_pos in range(left_size):
# restart right_pos if it went negative in a previous iteration
if right_pos < 0:
right_pos = 0

# find last position in right whose value is less than left's
if allow_exact_matches:
while (right_pos < right_size and
right_values[right_pos] <= left_values[left_pos]):
right_pos += 1
else:
while (right_pos < right_size and
right_values[right_pos] < left_values[left_pos]):
right_pos += 1
right_pos -= 1

# save positions as the desired index
left_indexer[left_pos] = left_pos
right_indexer[left_pos] = right_pos

# if needed, verify that tolerance is met
if has_tolerance and right_pos != -1:
diff = left_values[left_pos] - right_values[right_pos]
if diff > tolerance_:
right_indexer[left_pos] = -1

return left_indexer, right_indexer


def asof_join_forward(ndarray[asof_t] left_values,
ndarray[asof_t] right_values,
bint allow_exact_matches=1,
tolerance=None):

cdef:
Py_ssize_t left_pos, right_pos, left_size, right_size
ndarray[int64_t] left_indexer, right_indexer
bint has_tolerance = 0
asof_t tolerance_ = 0
asof_t diff = 0

# if we are using tolerance, set our objects
if tolerance is not None:
has_tolerance = 1
tolerance_ = tolerance

left_size = len(left_values)
right_size = len(right_values)

left_indexer = np.empty(left_size, dtype=np.int64)
right_indexer = np.empty(left_size, dtype=np.int64)

right_pos = right_size - 1
for left_pos in range(left_size - 1, -1, -1):
# restart right_pos if it went over in a previous iteration
if right_pos == right_size:
right_pos = right_size - 1

# find first position in right whose value is greater than left's
if allow_exact_matches:
while (right_pos >= 0 and
right_values[right_pos] >= left_values[left_pos]):
right_pos -= 1
else:
while (right_pos >= 0 and
right_values[right_pos] > left_values[left_pos]):
right_pos -= 1
right_pos += 1

# save positions as the desired index
left_indexer[left_pos] = left_pos
right_indexer[left_pos] = (right_pos
if right_pos != right_size else -1)

# if needed, verify that tolerance is met
if has_tolerance and right_pos != right_size:
diff = right_values[right_pos] - left_values[left_pos]
if diff > tolerance_:
right_indexer[left_pos] = -1

return left_indexer, right_indexer


def asof_join_nearest(ndarray[asof_t] left_values,
ndarray[asof_t] right_values,
bint allow_exact_matches=1,
tolerance=None):

cdef:
Py_ssize_t left_size, right_size, i
ndarray[int64_t] left_indexer, right_indexer, bli, bri, fli, fri
asof_t bdiff, fdiff

left_size = len(left_values)
right_size = len(right_values)

left_indexer = np.empty(left_size, dtype=np.int64)
right_indexer = np.empty(left_size, dtype=np.int64)

# search both forward and backward
bli, bri = asof_join_backward(left_values, right_values,
allow_exact_matches, tolerance)
fli, fri = asof_join_forward(left_values, right_values,
allow_exact_matches, tolerance)

for i in range(len(bri)):
# choose timestamp from right with smaller difference
if bri[i] != -1 and fri[i] != -1:
bdiff = left_values[bli[i]] - right_values[bri[i]]
fdiff = right_values[fri[i]] - left_values[fli[i]]
right_indexer[i] = bri[i] if bdiff <= fdiff else fri[i]
else:
right_indexer[i] = bri[i] if bri[i] != -1 else fri[i]
left_indexer[i] = bli[i]

return left_indexer, right_indexer
Loading