Skip to content

Commit a7cbfd3

Browse files
committed
Fix bug in socket timeout per PR dpkp#161 by maciejkula, add test
1 parent 1984dab commit a7cbfd3

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

kafka/conn.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,6 @@ def reinit(self):
150150
"""
151151
self.close()
152152
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
153-
self._sock.connect((self.host, self.port))
154153
self._sock.settimeout(self.timeout)
154+
self._sock.connect((self.host, self.port))
155155
self._dirty = False

test/test_client_integration.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import unittest
22
import time
3+
import socket
4+
import random
35

4-
from kafka import * # noqa
5-
from kafka.common import * # noqa
6-
from kafka.codec import has_gzip, has_snappy
6+
import kafka
7+
from kafka.common import *
78
from .fixtures import ZookeeperFixture, KafkaFixture
89
from .testutil import *
910

@@ -19,6 +20,15 @@ def tearDownClass(cls): # noqa
1920
cls.server.close()
2021
cls.zk.close()
2122

23+
def test_timeout(self):
24+
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
25+
server_socket.bind(('localhost', 14567))
26+
27+
with Timer() as t:
28+
with self.assertRaises((socket.timeout, socket.error)):
29+
conn = kafka.conn.KafkaConnection("localhost", 14567, 1.0)
30+
self.assertGreaterEqual(t.interval, 1.0)
31+
2232
def test_consume_none(self):
2333
fetch = FetchRequest(self.topic, 0, 0, 1024)
2434

0 commit comments

Comments
 (0)