From 4db9a2f9115c006ab7ad058125dd075e1866aa9e Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Wed, 6 Dec 2017 22:58:48 -0800 Subject: [PATCH 1/3] CLN: ASV Gil benchmark --- asv_bench/benchmarks/attrs_caching.py | 4 +- asv_bench/benchmarks/gil.py | 440 ++++++++--------------- asv_bench/benchmarks/pandas_vb_common.py | 2 +- ci/lint.sh | 2 +- 4 files changed, 144 insertions(+), 304 deletions(-) diff --git a/asv_bench/benchmarks/attrs_caching.py b/asv_bench/benchmarks/attrs_caching.py index 3c091be7a8424..48f0b7d71144c 100644 --- a/asv_bench/benchmarks/attrs_caching.py +++ b/asv_bench/benchmarks/attrs_caching.py @@ -1,18 +1,18 @@ import numpy as np from pandas import DataFrame - try: from pandas.util import cache_readonly except ImportError: from pandas.util.decorators import cache_readonly +from .pandas_vb_common import setup # noqa + class DataFrameAttributes(object): goal_time = 0.2 def setup(self): - np.random.seed(1234) self.df = DataFrame(np.random.randn(10, 6)) self.cur_index = self.df.index diff --git a/asv_bench/benchmarks/gil.py b/asv_bench/benchmarks/gil.py index 78a94976e732d..654e5d3bfec0e 100644 --- a/asv_bench/benchmarks/gil.py +++ b/asv_bench/benchmarks/gil.py @@ -1,241 +1,135 @@ -from .pandas_vb_common import * - +import numpy as np +import pandas.util.testing as tm +from pandas import (DataFrame, Series, rolling_median, rolling_mean, + rolling_min, rolling_max, rolling_var, rolling_skew, + rolling_kurt, rolling_std, read_csv, factorize, date_range) from pandas.core.algorithms import take_1d - -try: - from cStringIO import StringIO -except ImportError: - from io import StringIO - try: from pandas._libs import algos except ImportError: from pandas import algos - try: from pandas.util.testing import test_parallel - have_real_test_parallel = True except ImportError: have_real_test_parallel = False - def test_parallel(num_threads=1): - def wrapper(fname): return fname - return wrapper +from .pandas_vb_common import BaseIO, setup # noqa -class NoGilGroupby(object): - goal_time = 0.2 - def setup(self): - self.N = 1000000 - self.ngroups = 1000 - np.random.seed(1234) - self.df = DataFrame({'key': np.random.randint(0, self.ngroups, size=self.N), 'data': np.random.randn(self.N), }) +class ParallelGroupbyMethods(object): - np.random.seed(1234) - self.size = 2 ** 22 - self.ngroups = 100 - self.data = Series(np.random.randint(0, self.ngroups, size=self.size)) + goal_time = 0.2 + params = ([2, 4, 8], ['count', 'last', 'max', 'mean', 'min', 'prod', + 'sum', 'var']) + param_names = ['threads', 'method'] - if (not have_real_test_parallel): + def setup(self, threads, method): + if not have_real_test_parallel: raise NotImplementedError + N = 10**6 + ngroups = 10**3 + df = DataFrame({'key': np.random.randint(0, ngroups, size=N), + 'data': np.random.randn(N)}) - @test_parallel(num_threads=2) - def _pg2_count(self): - self.df.groupby('key')['data'].count() - - def time_count_2(self): - self._pg2_count() - - @test_parallel(num_threads=2) - def _pg2_last(self): - self.df.groupby('key')['data'].last() - - def time_last_2(self): - self._pg2_last() - - @test_parallel(num_threads=2) - def _pg2_max(self): - self.df.groupby('key')['data'].max() - - def time_max_2(self): - self._pg2_max() - - @test_parallel(num_threads=2) - def _pg2_mean(self): - self.df.groupby('key')['data'].mean() - - def time_mean_2(self): - self._pg2_mean() - - @test_parallel(num_threads=2) - def _pg2_min(self): - self.df.groupby('key')['data'].min() - - def time_min_2(self): - self._pg2_min() - - @test_parallel(num_threads=2) - def _pg2_prod(self): - self.df.groupby('key')['data'].prod() - - def time_prod_2(self): - self._pg2_prod() - - @test_parallel(num_threads=2) - def _pg2_sum(self): - self.df.groupby('key')['data'].sum() - - def time_sum_2(self): - self._pg2_sum() - - @test_parallel(num_threads=4) - def _pg4_sum(self): - self.df.groupby('key')['data'].sum() - - def time_sum_4(self): - self._pg4_sum() - - def time_sum_4_notp(self): - for i in range(4): - self.df.groupby('key')['data'].sum() - - def _f_sum(self): - self.df.groupby('key')['data'].sum() - - @test_parallel(num_threads=8) - def _pg8_sum(self): - self._f_sum() - - def time_sum_8(self): - self._pg8_sum() - - def time_sum_8_notp(self): - for i in range(8): - self._f_sum() - - @test_parallel(num_threads=2) - def _pg2_var(self): - self.df.groupby('key')['data'].var() - - def time_var_2(self): - self._pg2_var() - - # get groups - - def _groups(self): - self.data.groupby(self.data).groups - - @test_parallel(num_threads=2) - def _pg2_groups(self): - self._groups() + @test_parallel(num_threads=threads) + def parallel(): + getattr(df.groupby('key')['data'], method)() + self.parallel = parallel - def time_groups_2(self): - self._pg2_groups() + def loop(): + getattr(df.groupby('key')['data'], method)() + self.loop = loop - @test_parallel(num_threads=4) - def _pg4_groups(self): - self._groups() + def time_parallel(self, threads, method): + self.parallel() - def time_groups_4(self): - self._pg4_groups() + def time_loop(self, threads, method): + for i in range(threads): + self.loop() - @test_parallel(num_threads=8) - def _pg8_groups(self): - self._groups() - def time_groups_8(self): - self._pg8_groups() +class ParallelGroups(object): - - -class nogil_take1d_float64(object): goal_time = 0.2 + params = [2, 4, 8] + param_names = ['threads'] - def setup(self): - self.N = 1000000 - self.ngroups = 1000 - np.random.seed(1234) - self.df = DataFrame({'key': np.random.randint(0, self.ngroups, size=self.N), 'data': np.random.randn(self.N), }) - if (not have_real_test_parallel): + def setup(self, threads): + if not have_real_test_parallel: raise NotImplementedError - self.N = 10000000.0 - self.df = DataFrame({'int64': np.arange(self.N, dtype='int64'), 'float64': np.arange(self.N, dtype='float64'), }) - self.indexer = np.arange(100, (len(self.df) - 100)) + size = 2**22 + ngroups = 10**3 + data = Series(np.random.randint(0, ngroups, size=size)) - def time_nogil_take1d_float64(self): - self.take_1d_pg2_int64() + @test_parallel(num_threads=threads) + def get_groups(): + data.groupby(data).groups + self.get_groups = get_groups - @test_parallel(num_threads=2) - def take_1d_pg2_int64(self): - take_1d(self.df.int64.values, self.indexer) + def time_get_groups(self, threads): + self.get_groups() - @test_parallel(num_threads=2) - def take_1d_pg2_float64(self): - take_1d(self.df.float64.values, self.indexer) +class ParallelTake1D(object): -class nogil_take1d_int64(object): goal_time = 0.2 + params = ['int64', 'float64'] + param_names = ['dtype'] - def setup(self): - self.N = 1000000 - self.ngroups = 1000 - np.random.seed(1234) - self.df = DataFrame({'key': np.random.randint(0, self.ngroups, size=self.N), 'data': np.random.randn(self.N), }) - if (not have_real_test_parallel): + def setup(self, dtype): + if not have_real_test_parallel: raise NotImplementedError - self.N = 10000000.0 - self.df = DataFrame({'int64': np.arange(self.N, dtype='int64'), 'float64': np.arange(self.N, dtype='float64'), }) - self.indexer = np.arange(100, (len(self.df) - 100)) + N = 10**6 + df = DataFrame({'col': np.arange(N, dtype=dtype)}) + indexer = np.arange(100, len(df) - 100) - def time_nogil_take1d_int64(self): - self.take_1d_pg2_float64() + @test_parallel(num_threads=2) + def parallel_take1d(): + take_1d(df['col'].values, indexer) + self.parallel_take1d = parallel_take1d - @test_parallel(num_threads=2) - def take_1d_pg2_int64(self): - take_1d(self.df.int64.values, self.indexer) + def time_take1d(self, dtype): + self.parallel_take1d() - @test_parallel(num_threads=2) - def take_1d_pg2_float64(self): - take_1d(self.df.float64.values, self.indexer) +class ParallelKth(object): -class nogil_kth_smallest(object): number = 1 repeat = 5 def setup(self): - if (not have_real_test_parallel): + if not have_real_test_parallel: raise NotImplementedError - np.random.seed(1234) - self.N = 10000000 - self.k = 500000 - self.a = np.random.randn(self.N) - self.b = self.a.copy() - self.kwargs_list = [{'arr': self.a}, {'arr': self.b}] + N = 10**7 + k = 5 * 10**5 + kwargs_list = [{'arr': np.random.randn(N)}, + {'arr': np.random.randn(N)}] - def time_nogil_kth_smallest(self): - @test_parallel(num_threads=2, kwargs_list=self.kwargs_list) - def run(arr): - algos.kth_smallest(arr, self.k) - run() + @test_parallel(num_threads=2, kwargs_list=kwargs_list) + def parallel_kth_smallest(arr): + algos.kth_smallest(arr, k) + self.parallel_kth_smallest = parallel_kth_smallest + def time_kth_smallest(self): + self.parallel_kth_smallest() + + +class ParallelDatetimeFields(object): -class nogil_datetime_fields(object): goal_time = 0.2 def setup(self): - self.N = 100000000 - self.dti = pd.date_range('1900-01-01', periods=self.N, freq='T') - self.period = self.dti.to_period('D') - if (not have_real_test_parallel): + if not have_real_test_parallel: raise NotImplementedError + N = 10**6 + self.dti = date_range('1900-01-01', periods=N, freq='T') + self.period = self.dti.to_period('D') def time_datetime_field_year(self): @test_parallel(num_threads=2) @@ -274,149 +168,95 @@ def run(period): run(self.period) -class nogil_rolling_algos_slow(object): - goal_time = 0.2 +class ParallelRolling(object): - def setup(self): - self.win = 100 - np.random.seed(1234) - self.arr = np.random.rand(100000) - if (not have_real_test_parallel): - raise NotImplementedError - - def time_nogil_rolling_median(self): - @test_parallel(num_threads=2) - def run(arr, win): - rolling_median(arr, win) - run(self.arr, self.win) - - -class nogil_rolling_algos_fast(object): goal_time = 0.2 + params = ['rolling_median', 'rolling_mean', 'rolling_min', 'rolling_max', + 'rolling_var', 'rolling_skew', 'rolling_kurt', 'rolling_std'] + param_names = ['method'] - def setup(self): - self.win = 100 - np.random.seed(1234) - self.arr = np.random.rand(1000000) - if (not have_real_test_parallel): + def setup(self, method): + if not have_real_test_parallel: raise NotImplementedError + win = 100 + arr = np.random.rand(100000) + rolling = {'rolling_median': rolling_median, + 'rolling_mean': rolling_mean, + 'rolling_min': rolling_min, + 'rolling_max': rolling_max, + 'rolling_var': rolling_var, + 'rolling_skew': rolling_skew, + 'rolling_kurt': rolling_kurt, + 'rolling_std': rolling_std} - def time_nogil_rolling_mean(self): - @test_parallel(num_threads=2) - def run(arr, win): - rolling_mean(arr, win) - run(self.arr, self.win) - - def time_nogil_rolling_min(self): @test_parallel(num_threads=2) - def run(arr, win): - rolling_min(arr, win) - run(self.arr, self.win) + def parallel_rolling(): + rolling[method](arr, win) + self.parallel_rolling = parallel_rolling - def time_nogil_rolling_max(self): - @test_parallel(num_threads=2) - def run(arr, win): - rolling_max(arr, win) - run(self.arr, self.win) - - def time_nogil_rolling_var(self): - @test_parallel(num_threads=2) - def run(arr, win): - rolling_var(arr, win) - run(self.arr, self.win) - - def time_nogil_rolling_skew(self): - @test_parallel(num_threads=2) - def run(arr, win): - rolling_skew(arr, win) - run(self.arr, self.win) + def time_rolling(self, method): + self.parallel_rolling() - def time_nogil_rolling_kurt(self): - @test_parallel(num_threads=2) - def run(arr, win): - rolling_kurt(arr, win) - run(self.arr, self.win) - - def time_nogil_rolling_std(self): - @test_parallel(num_threads=2) - def run(arr, win): - rolling_std(arr, win) - run(self.arr, self.win) +class ParallelReadCSV(BaseIO): -class nogil_read_csv(object): number = 1 repeat = 5 + params = ['float', 'object', 'datetime'] + param_names = ['dtype'] - def setup(self): - if (not have_real_test_parallel): + def setup(self, dtype): + if not have_real_test_parallel: raise NotImplementedError - # Using the values - self.df = DataFrame(np.random.randn(10000, 50)) - self.df.to_csv('__test__.csv') - - self.rng = date_range('1/1/2000', periods=10000) - self.df_date_time = DataFrame(np.random.randn(10000, 50), index=self.rng) - self.df_date_time.to_csv('__test_datetime__.csv') + rows = 10000 + cols = 50 + data = {'float': DataFrame(np.random.randn(rows, cols)), + 'datetime': DataFrame(np.random.randn(rows, cols), + index=date_range('1/1/2000', + periods=rows)), + 'object': DataFrame('foo', + index=range(rows), + columns=['object%03d'.format(i) + for i in range(5)])} + + self.fname = '__test_{}__.csv'.format(dtype) + df = data[dtype] + df.to_csv(self.fname) - self.df_object = DataFrame('foo', index=self.df.index, columns=self.create_cols('object')) - self.df_object.to_csv('__test_object__.csv') - - def create_cols(self, name): - return [('%s%03d' % (name, i)) for i in range(5)] - - @test_parallel(num_threads=2) - def pg_read_csv(self): - read_csv('__test__.csv', sep=',', header=None, float_precision=None) - - def time_read_csv(self): - self.pg_read_csv() - - @test_parallel(num_threads=2) - def pg_read_csv_object(self): - read_csv('__test_object__.csv', sep=',') - - def time_read_csv_object(self): - self.pg_read_csv_object() + @test_parallel(num_threads=2) + def parallel_read_csv(): + read_csv(self.fname) + self.parallel_read_csv = parallel_read_csv - @test_parallel(num_threads=2) - def pg_read_csv_datetime(self): - read_csv('__test_datetime__.csv', sep=',', header=None) + def time_read_csv(self, dtype): + self.parallel_read_csv() - def time_read_csv_datetime(self): - self.pg_read_csv_datetime() +class ParallelFactorize(object): -class nogil_factorize(object): number = 1 repeat = 5 + params = [2, 4, 8] + param_names = ['threads'] - def setup(self): - if (not have_real_test_parallel): + def setup(self, threads): + if not have_real_test_parallel: raise NotImplementedError - np.random.seed(1234) - self.strings = tm.makeStringIndex(100000) - - def factorize_strings(self): - pd.factorize(self.strings) - - @test_parallel(num_threads=4) - def _pg_factorize_strings_4(self): - self.factorize_strings() + strings = tm.makeStringIndex(100000) - def time_factorize_strings_4(self): - for i in range(2): - self._pg_factorize_strings_4() + @test_parallel(num_threads=threads) + def parallel(): + factorize(strings) + self.parallel = parallel - @test_parallel(num_threads=2) - def _pg_factorize_strings_2(self): - self.factorize_strings() + def loop(): + factorize(strings) + self.loop = loop - def time_factorize_strings_2(self): - for i in range(4): - self._pg_factorize_strings_2() + def time_parallel(self, threads): + self.parallel() - def time_factorize_strings(self): - for i in range(8): - self.factorize_strings() + def time_loop(self, threads): + for i in range(threads): + self.loop() diff --git a/asv_bench/benchmarks/pandas_vb_common.py b/asv_bench/benchmarks/pandas_vb_common.py index 74517f184ae6f..b7040bfdb9397 100644 --- a/asv_bench/benchmarks/pandas_vb_common.py +++ b/asv_bench/benchmarks/pandas_vb_common.py @@ -36,7 +36,7 @@ def remove(self, f): # causes an exception to be raised pass - def teardown(self): + def teardown(self, *args, **kwargs): self.remove(self.fname) # try em until it works! diff --git a/ci/lint.sh b/ci/lint.sh index bec82602fa509..1c157abbff060 100755 --- a/ci/lint.sh +++ b/ci/lint.sh @@ -24,7 +24,7 @@ if [ "$LINT" ]; then echo "Linting setup.py DONE" echo "Linting asv_bench/benchmarks/" - flake8 asv_bench/benchmarks/ --exclude=asv_bench/benchmarks/[ghijoprs]*.py --ignore=F811 + flake8 asv_bench/benchmarks/ --exclude=asv_bench/benchmarks/[ijoprs]*.py --ignore=F811 if [ $? -ne "0" ]; then RET=1 fi From c38b168d2b2bce86c4de2eaafe32d6611b4fced4 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Thu, 7 Dec 2017 21:06:38 -0800 Subject: [PATCH 2/3] Fix lint check --- asv_bench/benchmarks/groupby.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 3abf2338e1d94..e6dc75d0f93c2 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -16,10 +16,10 @@ class ApplyDictReturn(object): def setup(self): self.labels = np.arange(1000).repeat(10) self.data = Series(np.random.randn(len(self.labels))) - self.f = lambda x: {'first': x.values[0], 'last': x.values[(-1)]} def time_groupby_apply_dict_return(self): - self.data.groupby(self.labels).apply(self.f) + self.data.groupby(self.labels).apply(lambda x: {'first': x.values[0], + 'last': x.values[-1]}) class Apply(object): From 0c4f3e707be8145dea5261c33831b1105cb5d879 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 10 Dec 2017 18:30:49 -0800 Subject: [PATCH 3/3] Fix lambda lint & some groupby benches --- asv_bench/benchmarks/groupby.py | 76 +++++++++++++++++---------------- 1 file changed, 39 insertions(+), 37 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index e6dc75d0f93c2..1978d240abedd 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -26,22 +26,22 @@ class Apply(object): goal_time = 0.2 - def setup(self): + def setup_cache(self): N = 10**4 labels = np.random.randint(0, 2000, size=N) labels2 = np.random.randint(0, 3, size=N) - self.df = DataFrame({'key': labels, - 'key2': labels2, - 'value1': np.random.randn(N), - 'value2': ['foo', 'bar', 'baz', 'qux'] * (N // 4), - }) - self.scalar_function = lambda x: 1 + df = DataFrame({'key': labels, + 'key2': labels2, + 'value1': np.random.randn(N), + 'value2': ['foo', 'bar', 'baz', 'qux'] * (N // 4) + }) + return df - def time_scalar_function_multi_col(self): - self.df.groupby(['key', 'key2']).apply(self.scalar_function) + def time_scalar_function_multi_col(self, df): + df.groupby(['key', 'key2']).apply(lambda x: 1) - def time_scalar_function_single_col(self): - self.df.groupby('key').apply(self.scalar_function) + def time_scalar_function_single_col(self, df): + df.groupby('key').apply(lambda x: 1) @staticmethod def df_copy_function(g): @@ -49,11 +49,11 @@ def df_copy_function(g): g.name return g.copy() - def time_copy_function_multi_col(self): - self.df.groupby(['key', 'key2']).apply(self.df_copy_function) + def time_copy_function_multi_col(self, df): + df.groupby(['key', 'key2']).apply(self.df_copy_function) - def time_copy_overhead_single_col(self): - self.df.groupby('key').apply(self.df_copy_function) + def time_copy_overhead_single_col(self, df): + df.groupby('key').apply(self.df_copy_function) class Groups(object): @@ -274,13 +274,16 @@ class GroupStrings(object): goal_time = 0.2 def setup(self): - n = (5 * 7 * 11) * (1 << 9) + n = 2 * 10**5 alpha = list(map(''.join, product((ascii_letters + digits), repeat=4))) - f = lambda k: np.repeat(np.random.choice(alpha, (n // k)), k) - self.df = DataFrame({'a': f(11), - 'b': f(7), - 'c': f(5), - 'd': f(1)}) + self.df = DataFrame({'a': np.repeat(np.random.choice(alpha, + (n // 11)), 11), + 'b': np.repeat(np.random.choice(alpha, + (n // 7)), 7), + 'c': np.repeat(np.random.choice(alpha, + (n // 5)), 5), + 'd': np.repeat(np.random.choice(alpha, + (n // 1)), 1)}) self.df['joe'] = (np.random.randn(len(self.df)) * 10).round(3) i = np.random.permutation(len(self.df)) self.df = self.df.iloc[i].reset_index(drop=True) @@ -293,29 +296,29 @@ class MultiColumn(object): goal_time = 0.2 - def setup(self): + def setup_cache(self): N = 10**5 key1 = np.tile(np.arange(100, dtype=object), 1000) key2 = key1.copy() np.random.shuffle(key1) np.random.shuffle(key2) - self.df = DataFrame({'key1': key1, - 'key2': key2, - 'data1': np.random.randn(N), - 'data2': np.random.randn(N)}) - self.f = lambda x: x.values.sum() + df = DataFrame({'key1': key1, + 'key2': key2, + 'data1': np.random.randn(N), + 'data2': np.random.randn(N)}) + return df - def time_lambda_sum(self): - self.df.groupby(['key1', 'key2']).agg(self.f) + def time_lambda_sum(self, df): + df.groupby(['key1', 'key2']).agg(lambda x: x.values.sum()) - def time_cython_sum(self): - self.df.groupby(['key1', 'key2']).sum() + def time_cython_sum(self, df): + df.groupby(['key1', 'key2']).sum() - def time_col_select_lambda_sum(self): - self.df.groupby(['key1', 'key2'])['data1'].agg(self.f) + def time_col_select_lambda_sum(self, df): + df.groupby(['key1', 'key2'])['data1'].agg(lambda x: x.values.sum()) - def time_col_select_numpy_sum(self): - self.df.groupby(['key1', 'key2'])['data1'].agg(np.sum) + def time_col_select_numpy_sum(self, df): + df.groupby(['key1', 'key2'])['data1'].agg(np.sum) class Size(object): @@ -582,7 +585,6 @@ def setup(self): arr[2::10000, 2] = np.nan data = DataFrame(arr, index=index, columns=['col1', 'col20', 'col3']) self.df = data - self.f_max = lambda x: max(x) n = 20000 self.df1 = DataFrame(np.random.randint(1, n, (n, 3)), @@ -596,7 +598,7 @@ def setup(self): self.df4['jim'] = self.df4['joe'] def time_transform_lambda_max(self): - self.df.groupby(level='lev1').transform(self.f_max) + self.df.groupby(level='lev1').transform(lambda x: max(x)) def time_transform_ufunc_max(self): self.df.groupby(level='lev1').transform(np.max)