Skip to content

Commit ee6b9cb

Browse files
author
Dana Powers
committed
Fix python3 / python2 comments re queue/Queue
1 parent cfbdc05 commit ee6b9cb

File tree

2 files changed

+10
-10
lines changed

2 files changed

+10
-10
lines changed

kafka/consumer/multiprocess.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
import logging
55
from multiprocessing import Process, Manager as MPManager
66
try:
7-
from Queue import Empty, Full # python 3
7+
import queue # python 3
88
except ImportError:
9-
from queue import Empty, Full # python 2
9+
import Queue as queue # python 2
1010
import time
1111

1212
from ..common import KafkaError
@@ -71,7 +71,7 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
7171
try:
7272
queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
7373
break
74-
except Full:
74+
except queue.Full:
7575
if events.exit.is_set(): break
7676

7777
count += 1
@@ -220,7 +220,7 @@ def __iter__(self):
220220
# TODO: This is a hack and will make the consumer block for
221221
# at least one second. Need to find a better way of doing this
222222
partition, message = self.queue.get(block=True, timeout=1)
223-
except Empty:
223+
except queue.Empty:
224224
break
225225

226226
# Count, check and commit messages if necessary
@@ -270,7 +270,7 @@ def get_messages(self, count=1, block=True, timeout=10):
270270
try:
271271
partition, message = self.queue.get(block_next_call,
272272
timeout)
273-
except Empty:
273+
except queue.Empty:
274274
break
275275

276276
_msg = (partition, message) if self.partition_info else message

kafka/consumer/simple.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
from itertools import izip_longest as izip_longest, repeat # python 2
77
import logging
88
try:
9-
from Queue import Empty, Queue # python 3
9+
import queue # python 3
1010
except ImportError:
11-
from queue import Empty, Queue # python 2
11+
import Queue as queue # python 2
1212
import sys
1313
import time
1414

@@ -136,7 +136,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
136136
self.fetch_offsets = self.offsets.copy()
137137
self.iter_timeout = iter_timeout
138138
self.auto_offset_reset = auto_offset_reset
139-
self.queue = Queue()
139+
self.queue = queue.Queue()
140140

141141
def __repr__(self):
142142
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
@@ -257,7 +257,7 @@ def seek(self, offset, whence=None, partition=None):
257257
if self.auto_commit:
258258
self.commit()
259259

260-
self.queue = Queue()
260+
self.queue = queue.Queue()
261261

262262
def get_messages(self, count=1, block=True, timeout=0.1):
263263
"""
@@ -341,7 +341,7 @@ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
341341
return partition, message
342342
else:
343343
return message
344-
except Empty:
344+
except queue.Empty:
345345
log.debug('internal queue empty after fetch - returning None')
346346
return None
347347

0 commit comments

Comments
 (0)