From 6db459acf027bfbe4905d036e7f4d09f22717fea Mon Sep 17 00:00:00 2001 From: Ka Wo Chen Date: Sat, 29 Aug 2015 02:35:08 -0400 Subject: [PATCH] PERF: GH10213 kth_smallest GIL release --- asv_bench/benchmarks/gil.py | 23 ++++++++++++- asv_bench/benchmarks/pandas_vb_common.py | 1 + doc/source/whatsnew/v0.17.0.txt | 5 +-- pandas/algos.pyx | 44 ++++++++++++------------ pandas/util/testing.py | 17 +++++++-- 5 files changed, 62 insertions(+), 28 deletions(-) diff --git a/asv_bench/benchmarks/gil.py b/asv_bench/benchmarks/gil.py index 556dd2c364cdf..4b82781fc39d9 100644 --- a/asv_bench/benchmarks/gil.py +++ b/asv_bench/benchmarks/gil.py @@ -298,4 +298,25 @@ def take_1d_pg2_int64(self): @test_parallel(num_threads=2) def take_1d_pg2_float64(self): - com.take_1d(self.df.float64.values, self.indexer) \ No newline at end of file + com.take_1d(self.df.float64.values, self.indexer) + + +class nogil_kth_smallest(object): + number = 1 + repeat = 5 + + def setup(self): + if (not have_real_test_parallel): + raise NotImplementedError + np.random.seed(1234) + self.N = 10000000 + self.k = 500000 + self.a = np.random.randn(self.N) + self.b = self.a.copy() + self.kwargs_list = [{'arr': self.a}, {'arr': self.b}] + + def time_nogil_kth_smallest(self): + @test_parallel(num_threads=2, kwargs_list=self.kwargs_list) + def run(arr): + algos.kth_smallest(arr, self.k) + run() diff --git a/asv_bench/benchmarks/pandas_vb_common.py b/asv_bench/benchmarks/pandas_vb_common.py index a1326d63a112a..3370131929c22 100644 --- a/asv_bench/benchmarks/pandas_vb_common.py +++ b/asv_bench/benchmarks/pandas_vb_common.py @@ -7,6 +7,7 @@ import pandas.util.testing as tm import random import numpy as np +import threading try: from pandas.compat import range except ImportError: diff --git a/doc/source/whatsnew/v0.17.0.txt b/doc/source/whatsnew/v0.17.0.txt index e9d39e0441055..b9909c14b592f 100644 --- a/doc/source/whatsnew/v0.17.0.txt +++ b/doc/source/whatsnew/v0.17.0.txt @@ -69,14 +69,15 @@ Releasing the GIL We are releasing the global-interpreter-lock (GIL) on some cython operations. This will allow other threads to run simultaneously during computation, potentially allowing performance improvements -from multi-threading. Notably ``groupby`` and some indexing operations are a benefit from this. (:issue:`8882`) +from multi-threading. Notably ``groupby``, ``nsmallest`` and some indexing operations benefit from this. (:issue:`8882`) For example the groupby expression in the following code will have the GIL released during the factorization step, e.g. ``df.groupby('key')`` as well as the ``.sum()`` operation. .. code-block:: python - N = 1e6 + N = 1000000 + ngroups = 10 df = DataFrame({'key' : np.random.randint(0,ngroups,size=N), 'data' : np.random.randn(N) }) df.groupby('key')['data'].sum() diff --git a/pandas/algos.pyx b/pandas/algos.pyx index 9b6bdf57d4509..44b1996272356 100644 --- a/pandas/algos.pyx +++ b/pandas/algos.pyx @@ -740,7 +740,7 @@ ctypedef fused numeric: float64_t -cdef inline Py_ssize_t swap(numeric *a, numeric *b) except -1: +cdef inline Py_ssize_t swap(numeric *a, numeric *b) nogil except -1: cdef numeric t # cython doesn't allow pointer dereference so use array syntax @@ -756,27 +756,27 @@ cpdef numeric kth_smallest(numeric[:] a, Py_ssize_t k): cdef: Py_ssize_t i, j, l, m, n = a.size numeric x - - l = 0 - m = n - 1 - - while l < m: - x = a[k] - i = l - j = m - - while 1: - while a[i] < x: i += 1 - while x < a[j]: j -= 1 - if i <= j: - swap(&a[i], &a[j]) - i += 1; j -= 1 - - if i > j: break - - if j < k: l = i - if k < i: m = j - return a[k] + with nogil: + l = 0 + m = n - 1 + + while l < m: + x = a[k] + i = l + j = m + + while 1: + while a[i] < x: i += 1 + while x < a[j]: j -= 1 + if i <= j: + swap(&a[i], &a[j]) + i += 1; j -= 1 + + if i > j: break + + if j < k: l = i + if k < i: m = j + return a[k] cdef inline kth_smallest_c(float64_t* a, Py_ssize_t k, Py_ssize_t n): diff --git a/pandas/util/testing.py b/pandas/util/testing.py index e3633a1ec4360..aaa83da036c2f 100644 --- a/pandas/util/testing.py +++ b/pandas/util/testing.py @@ -2044,14 +2044,16 @@ def use_numexpr(use, min_elements=expr._MIN_ELEMENTS): if inspect.isfunction(obj) and name.startswith('assert'): setattr(TestCase, name, staticmethod(obj)) -def test_parallel(num_threads=2): + +def test_parallel(num_threads=2, kwargs_list=None): """Decorator to run the same function multiple times in parallel. Parameters ---------- num_threads : int, optional The number of times the function is run in parallel. - + kwargs_list : list of dicts, optional + The list of kwargs to update original function kwargs on different threads. Notes ----- This decorator does not pass the return value of the decorated function. @@ -2061,14 +2063,23 @@ def test_parallel(num_threads=2): """ assert num_threads > 0 + has_kwargs_list = kwargs_list is not None + if has_kwargs_list: + assert len(kwargs_list) == num_threads import threading def wrapper(func): @wraps(func) def inner(*args, **kwargs): + if has_kwargs_list: + update_kwargs = lambda i: dict(kwargs, **kwargs_list[i]) + else: + update_kwargs = lambda i: kwargs threads = [] for i in range(num_threads): - thread = threading.Thread(target=func, args=args, kwargs=kwargs) + updated_kwargs = update_kwargs(i) + thread = threading.Thread(target=func, args=args, + kwargs=updated_kwargs) threads.append(thread) for thread in threads: thread.start()