Skip to content

PERF: GH10213 kth_smallest GIL release #10927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion asv_bench/benchmarks/gil.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,25 @@ def take_1d_pg2_int64(self):

@test_parallel(num_threads=2)
def take_1d_pg2_float64(self):
com.take_1d(self.df.float64.values, self.indexer)
com.take_1d(self.df.float64.values, self.indexer)


class nogil_kth_smallest(object):
number = 1
repeat = 5

def setup(self):
if (not have_real_test_parallel):
raise NotImplementedError
np.random.seed(1234)
self.N = 10000000
self.k = 500000
self.a = np.random.randn(self.N)
self.b = self.a.copy()
self.kwargs_list = [{'arr': self.a}, {'arr': self.b}]

def time_nogil_kth_smallest(self):
@test_parallel(num_threads=2, kwargs_list=self.kwargs_list)
def run(arr):
algos.kth_smallest(arr, self.k)
run()
1 change: 1 addition & 0 deletions asv_bench/benchmarks/pandas_vb_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pandas.util.testing as tm
import random
import numpy as np
import threading
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that the first test_parallel call doesn't have to do the initialization.

try:
from pandas.compat import range
except ImportError:
Expand Down
5 changes: 3 additions & 2 deletions doc/source/whatsnew/v0.17.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ Releasing the GIL

We are releasing the global-interpreter-lock (GIL) on some cython operations.
This will allow other threads to run simultaneously during computation, potentially allowing performance improvements
from multi-threading. Notably ``groupby`` and some indexing operations are a benefit from this. (:issue:`8882`)
from multi-threading. Notably ``groupby``, ``nsmallest`` and some indexing operations benefit from this. (:issue:`8882`)

For example the groupby expression in the following code will have the GIL released during the factorization step, e.g. ``df.groupby('key')``
as well as the ``.sum()`` operation.

.. code-block:: python

N = 1e6
N = 1000000
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1e6 is a float and should be avoided. Will error soon in numpy.

ngroups = 10
df = DataFrame({'key' : np.random.randint(0,ngroups,size=N),
'data' : np.random.randn(N) })
df.groupby('key')['data'].sum()
Expand Down
44 changes: 22 additions & 22 deletions pandas/algos.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ ctypedef fused numeric:
float64_t


cdef inline Py_ssize_t swap(numeric *a, numeric *b) except -1:
cdef inline Py_ssize_t swap(numeric *a, numeric *b) nogil except -1:
cdef numeric t

# cython doesn't allow pointer dereference so use array syntax
Expand All @@ -756,27 +756,27 @@ cpdef numeric kth_smallest(numeric[:] a, Py_ssize_t k):
cdef:
Py_ssize_t i, j, l, m, n = a.size
numeric x

l = 0
m = n - 1

while l < m:
x = a[k]
i = l
j = m

while 1:
while a[i] < x: i += 1
while x < a[j]: j -= 1
if i <= j:
swap(&a[i], &a[j])
i += 1; j -= 1

if i > j: break

if j < k: l = i
if k < i: m = j
return a[k]
with nogil:
l = 0
m = n - 1

while l < m:
x = a[k]
i = l
j = m

while 1:
while a[i] < x: i += 1
while x < a[j]: j -= 1
if i <= j:
swap(&a[i], &a[j])
i += 1; j -= 1

if i > j: break

if j < k: l = i
if k < i: m = j
return a[k]


cdef inline kth_smallest_c(float64_t* a, Py_ssize_t k, Py_ssize_t n):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think kth_smallest_c is called anywhere in the codebase, see if you can take it out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this group_median
calls
group_median here,
which calls _median_linear, which in turn calls kth_smallest_c

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, good start on this.....

Expand Down
17 changes: 14 additions & 3 deletions pandas/util/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2044,14 +2044,16 @@ def use_numexpr(use, min_elements=expr._MIN_ELEMENTS):
if inspect.isfunction(obj) and name.startswith('assert'):
setattr(TestCase, name, staticmethod(obj))

def test_parallel(num_threads=2):

def test_parallel(num_threads=2, kwargs_list=None):
"""Decorator to run the same function multiple times in parallel.

Parameters
----------
num_threads : int, optional
The number of times the function is run in parallel.

kwargs_list : list of dicts, optional
The list of kwargs to update original function kwargs on different threads.
Notes
-----
This decorator does not pass the return value of the decorated function.
Expand All @@ -2061,14 +2063,23 @@ def test_parallel(num_threads=2):
"""

assert num_threads > 0
has_kwargs_list = kwargs_list is not None
if has_kwargs_list:
assert len(kwargs_list) == num_threads
import threading

def wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
if has_kwargs_list:
update_kwargs = lambda i: dict(kwargs, **kwargs_list[i])
else:
update_kwargs = lambda i: kwargs
threads = []
for i in range(num_threads):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
updated_kwargs = update_kwargs(i)
thread = threading.Thread(target=func, args=args,
kwargs=updated_kwargs)
threads.append(thread)
for thread in threads:
thread.start()
Expand Down