From 95ac6bdb37841abf372e0ede692c29b74a9de3cb Mon Sep 17 00:00:00 2001 From: Hasan Mahmood Date: Mon, 7 Apr 2014 14:33:32 -0500 Subject: [PATCH 01/11] Fixing copy() methods to work with gevent library --- kafka/client.py | 13 +++++++++---- kafka/conn.py | 8 ++++---- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 39c89ba43..ab922eab5 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -26,7 +26,8 @@ class KafkaClient(object): # one passed to SimpleConsumer.get_message(), otherwise you can get a # socket timeout. def __init__(self, hosts, client_id=CLIENT_ID, - timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, + activate=True): # We need one connection to bootstrap self.client_id = client_id self.timeout = timeout @@ -37,7 +38,8 @@ def __init__(self, hosts, client_id=CLIENT_ID, self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] - self.load_metadata_for_topics() # bootstrap with all metadata + if activate is True: + self.load_metadata_for_topics() # bootstrap with all metadata ################## # Private API # @@ -233,8 +235,11 @@ def copy(self): Create an inactive copy of the client object A reinit() has to be done on the copy before it can be used again """ - c = copy.deepcopy(self) - for k, v in c.conns.items(): + c = KafkaClient(hosts=['{}:{}'.format(entry[0], entry[1]) for entry in self.hosts], + client_id=self.client_id, + timeout=self.timeout, + activate=False) + for k, v in self.conns.iteritems(): c.conns[k] = v.copy() return c diff --git a/kafka/conn.py b/kafka/conn.py index 4fdeb17c7..86c3fd81c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -50,14 +50,15 @@ class KafkaConnection(local): timeout: default 120. The socket timeout for sending and receiving data in seconds. None means no timeout, so a request can block forever. """ - def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, activate=True): super(KafkaConnection, self).__init__() self.host = host self.port = port self.timeout = timeout self._sock = None - self.reinit() + if activate is True: + self.reinit() def __repr__(self): return "" % (self.host, self.port) @@ -133,8 +134,7 @@ def copy(self): Create an inactive copy of the connection object A reinit() has to be done on the copy before it can be used again """ - c = copy.deepcopy(self) - c._sock = None + c = KafkaConnection(host=self.host, port=self.port, timeout=self.timeout, activate=False) return c def close(self): From c09196a1d0a64242c55251916206cdd05f7fcf63 Mon Sep 17 00:00:00 2001 From: Hasan Mahmood Date: Thu, 8 May 2014 18:08:40 -0500 Subject: [PATCH 02/11] Fixing tests for gevent support --- setup.py | 2 +- test/test_gevent.py | 9 +++++++++ tox.ini | 14 ++++++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) create mode 100644 test/test_gevent.py diff --git a/setup.py b/setup.py index 86d1d9f23..67352609b 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ def run(self): version="0.9.1", install_requires=["distribute"], - tests_require=["tox", "mock"], + tests_require=["tox", "mock", "gevent"], cmdclass={"test": Tox}, packages=["kafka"], diff --git a/test/test_gevent.py b/test/test_gevent.py new file mode 100644 index 000000000..0417d94aa --- /dev/null +++ b/test/test_gevent.py @@ -0,0 +1,9 @@ +import gevent.monkey; gevent.monkey.patch_all(subprocess=True, Event=True) + +from .testutil import * +from .service import * +from .test_client import * +from .test_consumer import * +from .test_conn import * +from .test_protocol import * + diff --git a/tox.ini b/tox.ini index 3c5fd17dd..3381edf91 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,20 @@ [tox] envlist = py26, py27, pypy + [testenv] +deps = + unittest2 + nose + coverage + mock + python-snappy + gevent +commands = + nosetests --with-coverage --cover-erase --cover-package kafka [] +setenv = + PROJECT_ROOT = {toxinidir} + +[testenv:pypy] deps = unittest2 nose From b6aa93947bccb6397232fc9f4e227f8da25258c6 Mon Sep 17 00:00:00 2001 From: Hasan Mahmood Date: Thu, 8 May 2014 18:26:04 -0500 Subject: [PATCH 03/11] Skipping gevent tests for pypy --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 3381edf91..aa649e9f1 100644 --- a/tox.ini +++ b/tox.ini @@ -22,6 +22,6 @@ deps = mock python-snappy commands = - nosetests --with-coverage --cover-erase --cover-package kafka [] + nosetests --exclude=test_gevent.py --with-coverage --cover-erase --cover-package kafka [] setenv = PROJECT_ROOT = {toxinidir} From 1e36d870fcf27b88cdedda255887b1b0281bcc8d Mon Sep 17 00:00:00 2001 From: Hasan Mahmood Date: Thu, 8 May 2014 20:15:38 -0500 Subject: [PATCH 04/11] Fixing unit tests for python 2.6 --- kafka/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/client.py b/kafka/client.py index e308f4cab..281c54e3d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -224,7 +224,7 @@ def copy(self): Create an inactive copy of the client object A reinit() has to be done on the copy before it can be used again """ - c = KafkaClient(hosts=['{}:{}'.format(entry[0], entry[1]) for entry in self.hosts], + c = KafkaClient(hosts=['{0}:{1}'.format(entry[0], entry[1]) for entry in self.hosts], client_id=self.client_id, timeout=self.timeout, activate=False) From ae9a37f1e56b75e90c338b400b256153636f2072 Mon Sep 17 00:00:00 2001 From: Hasan Mahmood Date: Fri, 9 May 2014 10:15:48 -0500 Subject: [PATCH 05/11] Removing subprocess from gevent monkey patch --- test/test_gevent.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/test_gevent.py b/test/test_gevent.py index 0417d94aa..8443c9d09 100644 --- a/test/test_gevent.py +++ b/test/test_gevent.py @@ -1,4 +1,4 @@ -import gevent.monkey; gevent.monkey.patch_all(subprocess=True, Event=True) +import gevent.monkey; gevent.monkey.patch_all(Event=True) from .testutil import * from .service import * @@ -6,4 +6,3 @@ from .test_consumer import * from .test_conn import * from .test_protocol import * - From 25495e198e5cbfb746dd6f5eb37b984861cf7350 Mon Sep 17 00:00:00 2001 From: Hasan Mahmood Date: Fri, 9 May 2014 14:47:22 -0500 Subject: [PATCH 06/11] Skipping some tests when testing with gevent --- test/test_consumer_integration.py | 3 +++ test/test_failover_integration.py | 1 + test/test_gevent.py | 7 ++++++- test/test_producer_integration.py | 4 ++++ test/testutil.py | 14 ++++++++++++++ tox.ini | 25 +++++++++++++++++++++---- 6 files changed, 49 insertions(+), 5 deletions(-) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index a6589b360..c6ba8f2c8 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -112,6 +112,7 @@ def test_simple_consumer_pending(self): consumer.stop() + @skip_gevent() @kafka_versions("all") def test_multi_process_consumer(self): # Produce 100 messages to partitions 0 and 1 @@ -124,6 +125,7 @@ def test_multi_process_consumer(self): consumer.stop() + @skip_gevent() @kafka_versions("all") def test_multi_process_consumer_blocking(self): consumer = self.consumer(consumer = MultiProcessConsumer) @@ -152,6 +154,7 @@ def test_multi_process_consumer_blocking(self): consumer.stop() + @skip_gevent() @kafka_versions("all") def test_multi_proc_pending(self): self.send_messages(0, range(0, 10)) diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 6298f62f6..31bc98081 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -68,6 +68,7 @@ def test_switch_leader(self): producer.stop() + @skip_gevent() @kafka_versions("all") def test_switch_leader_async(self): key, topic, partition = random_string(5), self.topic, 0 diff --git a/test/test_gevent.py b/test/test_gevent.py index 8443c9d09..c948a659f 100644 --- a/test/test_gevent.py +++ b/test/test_gevent.py @@ -5,4 +5,9 @@ from .test_client import * from .test_consumer import * from .test_conn import * -from .test_protocol import * +from .test_protocol import * +from .test_util import * +from .test_client_integration import * +from .test_failover_integration import * +from .test_consumer_integration import * +from .test_producer_integration import * diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index c69e1178b..94283bafb 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -247,6 +247,7 @@ def test_acks_cluster_commit(self): producer.stop() + @skip_gevent() @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): start_offset0 = self.current_offset(self.topic, 0) @@ -296,6 +297,7 @@ def test_batched_simple_producer__triggers_by_message(self): producer.stop() + @skip_gevent() @kafka_versions("all") def test_batched_simple_producer__triggers_by_time(self): start_offset0 = self.current_offset(self.topic, 0) @@ -348,6 +350,7 @@ def test_batched_simple_producer__triggers_by_time(self): producer.stop() + @skip_gevent() @kafka_versions("all") def test_async_simple_producer(self): start_offset0 = self.current_offset(self.topic, 0) @@ -361,6 +364,7 @@ def test_async_simple_producer(self): producer.stop() + @skip_gevent() @kafka_versions("all") def test_async_keyed_producer(self): start_offset0 = self.current_offset(self.topic, 0) diff --git a/test/testutil.py b/test/testutil.py index 78e6f7d93..2a8ae5e8f 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -15,6 +15,7 @@ 'random_string', 'ensure_topic_creation', 'get_open_port', + 'skip_gevent', 'kafka_versions', 'KafkaIntegrationTestCase', 'Timer', @@ -24,6 +25,19 @@ def random_string(l): s = "".join(random.choice(string.letters) for i in xrange(l)) return s +def skip_gevent(): + def skip_gevent(func): + @functools.wraps(func) + def wrapper(self): + use_gevent = os.environ.get('USE_GEVENT') + if use_gevent is not None and \ + use_gevent == '1': + self.skipTest('test not support for gevent') + + return func(self) + return wrapper + return skip_gevent + def kafka_versions(*versions): def kafka_versions(func): @functools.wraps(func) diff --git a/tox.ini b/tox.ini index aa649e9f1..735d7ac77 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,20 @@ [tox] -envlist = py26, py27, pypy +envlist = py26, py27, pypy, py26-gevent, py27-gevent [testenv] +deps = + unittest2 + nose + coverage + mock + python-snappy +commands = + nosetests --exclude=test_gevent.py --with-coverage --cover-erase --cover-package kafka [] +setenv = + PROJECT_ROOT = {toxinidir} + +[testenv:py26-gevent] +basepython = python2.6 deps = unittest2 nose @@ -10,18 +23,22 @@ deps = python-snappy gevent commands = - nosetests --with-coverage --cover-erase --cover-package kafka [] + nosetests --with-coverage --cover-erase --cover-package kafka test/test_gevent.py setenv = PROJECT_ROOT = {toxinidir} + USE_GEVENT = 1 -[testenv:pypy] +[testenv:py27-gevent] +basepython = python2.7 deps = unittest2 nose coverage mock python-snappy + gevent commands = - nosetests --exclude=test_gevent.py --with-coverage --cover-erase --cover-package kafka [] + nosetests --with-coverage --cover-erase --cover-package kafka test/test_gevent.py setenv = PROJECT_ROOT = {toxinidir} + USE_GEVENT = 1 From d21d602a1f6a184612ce493e3130842e9bef9a9f Mon Sep 17 00:00:00 2001 From: Hasan Mahmood Date: Fri, 9 May 2014 14:58:31 -0500 Subject: [PATCH 07/11] Adding gevent tests to travis build --- .travis.yml | 3 +++ tox.ini | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index bd5f63aef..918931f34 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,3 +21,6 @@ script: - tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION` - KAFKA_VERSION=0.8.0 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION` - KAFKA_VERSION=0.8.1 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION` + - tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`-gevent + - KAFKA_VERSION=0.8.0 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`-gevent + - KAFKA_VERSION=0.8.1 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`-gevent diff --git a/tox.ini b/tox.ini index 735d7ac77..b4946cc9c 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py26, py27, pypy, py26-gevent, py27-gevent +envlist = py26, py27, pypy, py26-gevent, py27-gevent, pypy-gevent [testenv] deps = @@ -42,3 +42,6 @@ commands = setenv = PROJECT_ROOT = {toxinidir} USE_GEVENT = 1 + +[testenv:pypy-gevent] +commands = \ No newline at end of file From 7364d2e9b7e5f8c6065246c2754c29b188856cab Mon Sep 17 00:00:00 2001 From: Hasan Mahmood Date: Fri, 9 May 2014 15:41:59 -0500 Subject: [PATCH 08/11] Use env var to run gevent tests --- test/__init__.py | 5 +++++ tox.ini | 36 +++--------------------------------- 2 files changed, 8 insertions(+), 33 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index e69de29bb..b1616cb86 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -0,0 +1,5 @@ +import os + +if os.environ.get('USE_GEVENT') == '1': + import gevent.monkey + gevent.monkey.patch_all(Event=True) diff --git a/tox.ini b/tox.ini index b4946cc9c..82d1a5f33 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py26, py27, pypy, py26-gevent, py27-gevent, pypy-gevent +envlist = py26, py27, pypy [testenv] deps = @@ -8,40 +8,10 @@ deps = coverage mock python-snappy -commands = - nosetests --exclude=test_gevent.py --with-coverage --cover-erase --cover-package kafka [] -setenv = - PROJECT_ROOT = {toxinidir} - -[testenv:py26-gevent] -basepython = python2.6 -deps = - unittest2 - nose - coverage - mock - python-snappy - gevent -commands = - nosetests --with-coverage --cover-erase --cover-package kafka test/test_gevent.py -setenv = - PROJECT_ROOT = {toxinidir} - USE_GEVENT = 1 - -[testenv:py27-gevent] -basepython = python2.7 -deps = - unittest2 - nose - coverage - mock - python-snappy + cython gevent commands = - nosetests --with-coverage --cover-erase --cover-package kafka test/test_gevent.py + nosetests --with-coverage --cover-erase --cover-package kafka [] setenv = PROJECT_ROOT = {toxinidir} - USE_GEVENT = 1 -[testenv:pypy-gevent] -commands = \ No newline at end of file From 667cb2ed7b9837f98d23243fca88e99e24843e8b Mon Sep 17 00:00:00 2001 From: Hasan Mahmood Date: Sun, 11 May 2014 13:02:42 -0500 Subject: [PATCH 09/11] Removing test_gevent.py in favor of setting environment variable uUSE_GEVENT --- .travis.yml | 8 ++------ test/test_gevent.py | 13 ------------- travis_run_tests.sh | 16 ++++++++++++++++ travis_selector.sh | 12 ------------ 4 files changed, 18 insertions(+), 31 deletions(-) delete mode 100644 test/test_gevent.py create mode 100755 travis_run_tests.sh delete mode 100755 travis_selector.sh diff --git a/.travis.yml b/.travis.yml index 918931f34..546cccd03 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,9 +18,5 @@ install: - sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm script: - - tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION` - - KAFKA_VERSION=0.8.0 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION` - - KAFKA_VERSION=0.8.1 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION` - - tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`-gevent - - KAFKA_VERSION=0.8.0 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`-gevent - - KAFKA_VERSION=0.8.1 tox -e `./travis_selector.sh $TRAVIS_PYTHON_VERSION`-gevent + - ./travis_run_tests.sh + diff --git a/test/test_gevent.py b/test/test_gevent.py deleted file mode 100644 index c948a659f..000000000 --- a/test/test_gevent.py +++ /dev/null @@ -1,13 +0,0 @@ -import gevent.monkey; gevent.monkey.patch_all(Event=True) - -from .testutil import * -from .service import * -from .test_client import * -from .test_consumer import * -from .test_conn import * -from .test_protocol import * -from .test_util import * -from .test_client_integration import * -from .test_failover_integration import * -from .test_consumer_integration import * -from .test_producer_integration import * diff --git a/travis_run_tests.sh b/travis_run_tests.sh new file mode 100755 index 000000000..a72208c48 --- /dev/null +++ b/travis_run_tests.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +TOX_ENV=$1 +if [ $1 == "2.7" ]; then + $TOX_ENV = "py27" +elif [ $1 == "2.6" ]; then + $TOX_ENV = "py26" +fi; + +tox -e $TOX_ENV +KAFKA_VERSION=0.8.0 tox -e $TOX_ENV +KAFKA_VERSION=0.8.1 tox -e $TOX_ENV +if [ $TOX_ENV != "pypy" ]; then + USE_GEVENT=1 KAFKA_VERSION=0.8.0 tox -e $TOX_ENV + USE_GEVENT=1 KAFKA_VERSION=0.8.1 tox -e $TOX_ENV +fi; \ No newline at end of file diff --git a/travis_selector.sh b/travis_selector.sh deleted file mode 100755 index 21fba7e45..000000000 --- a/travis_selector.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash -# This works with the .travis.yml file to select a python version for testing - -if [ $1 == "pypy" ]; then - echo "pypy" -elif [ $1 == "2.7" ]; then - echo "py27" -elif [ $1 == "2.6" ]; then - echo "py26" -else - echo $1 -fi; From 2f8b92c8baf09b9aaa59c804eea3ad59dbda3b32 Mon Sep 17 00:00:00 2001 From: Hasan Mahmood Date: Sun, 11 May 2014 13:08:58 -0500 Subject: [PATCH 10/11] Passing python version to travis_run_tests.sh --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 546cccd03..6119f601d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,5 +18,5 @@ install: - sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm script: - - ./travis_run_tests.sh + - ./travis_run_tests.sh $TRAVIS_PYTHON_VERSION From 87ea4530ae5d84ca90b9f46762e49424f1fd11fe Mon Sep 17 00:00:00 2001 From: Hasan Mahmood Date: Sun, 11 May 2014 13:09:46 -0500 Subject: [PATCH 11/11] Adding new line at end of file --- travis_run_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/travis_run_tests.sh b/travis_run_tests.sh index a72208c48..736edf7d5 100755 --- a/travis_run_tests.sh +++ b/travis_run_tests.sh @@ -13,4 +13,4 @@ KAFKA_VERSION=0.8.1 tox -e $TOX_ENV if [ $TOX_ENV != "pypy" ]; then USE_GEVENT=1 KAFKA_VERSION=0.8.0 tox -e $TOX_ENV USE_GEVENT=1 KAFKA_VERSION=0.8.1 tox -e $TOX_ENV -fi; \ No newline at end of file +fi;