Skip to content

Commit fcca556

Browse files
authored
Run pyupgrade on everything. (dpkp#171)
1 parent deeccfa commit fcca556

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+290
-456
lines changed

kafka/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from __future__ import absolute_import
2-
31
__title__ = 'kafka'
42
from kafka.version import __version__
53
__author__ = 'Dana Powers'

kafka/admin/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from __future__ import absolute_import
2-
31
from kafka.admin.config_resource import ConfigResource, ConfigResourceType
42
from kafka.admin.client import KafkaAdminClient
53
from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,

kafka/admin/acl_resource.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from __future__ import absolute_import
21
from kafka.errors import IllegalArgumentError
32

43
# enum in stdlib as of py3.4
@@ -69,7 +68,7 @@ class ACLResourcePatternType(IntEnum):
6968
PREFIXED = 4
7069

7170

72-
class ACLFilter(object):
71+
class ACLFilter:
7372
"""Represents a filter to use with describing and deleting ACLs
7473
7574
The difference between this class and the ACL class is mainly that
@@ -161,7 +160,7 @@ def __init__(
161160
permission_type,
162161
resource_pattern
163162
):
164-
super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern)
163+
super().__init__(principal, host, operation, permission_type, resource_pattern)
165164
self.validate()
166165

167166
def validate(self):
@@ -173,7 +172,7 @@ def validate(self):
173172
raise IllegalArgumentError("resource_pattern must be a ResourcePattern object")
174173

175174

176-
class ResourcePatternFilter(object):
175+
class ResourcePatternFilter:
177176
def __init__(
178177
self,
179178
resource_type,
@@ -232,13 +231,13 @@ def __init__(
232231
resource_name,
233232
pattern_type=ACLResourcePatternType.LITERAL
234233
):
235-
super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type)
234+
super().__init__(resource_type, resource_name, pattern_type)
236235
self.validate()
237236

238237
def validate(self):
239238
if self.resource_type == ResourceType.ANY:
240239
raise IllegalArgumentError("resource_type cannot be ANY")
241240
if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]:
242241
raise IllegalArgumentError(
243-
"pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name)
242+
f"pattern_type cannot be {self.pattern_type.name} on a concrete ResourcePattern"
244243
)

kafka/admin/client.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from __future__ import absolute_import
2-
31
from collections import defaultdict
42
import copy
53
import logging
@@ -32,7 +30,7 @@
3230
log = logging.getLogger(__name__)
3331

3432

35-
class KafkaAdminClient(object):
33+
class KafkaAdminClient:
3634
"""A class for administering the Kafka cluster.
3735
3836
Warning:
@@ -194,7 +192,7 @@ def __init__(self, **configs):
194192
log.debug("Starting KafkaAdminClient with configuration: %s", configs)
195193
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
196194
if extra_configs:
197-
raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
195+
raise KafkaConfigurationError(f"Unrecognized configs: {extra_configs}")
198196

199197
self.config = copy.copy(self.DEFAULT_CONFIG)
200198
self.config.update(configs)
@@ -874,7 +872,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
874872
))
875873
else:
876874
raise NotImplementedError(
877-
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))
875+
f"Support for DescribeConfigs v{version} has not yet been added to KafkaAdminClient.")
878876

879877
self._wait_for_futures(futures)
880878
return [f.value for f in futures]
@@ -1197,7 +1195,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
11971195
topics_partitions_dict = defaultdict(set)
11981196
for topic, partition in partitions:
11991197
topics_partitions_dict[topic].add(partition)
1200-
topics_partitions = list(six.iteritems(topics_partitions_dict))
1198+
topics_partitions = list(topics_partitions_dict.items())
12011199
request = OffsetFetchRequest[version](group_id, topics_partitions)
12021200
else:
12031201
raise NotImplementedError(

kafka/admin/config_resource.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from __future__ import absolute_import
2-
31
# enum in stdlib as of py3.4
42
try:
53
from enum import IntEnum # pylint: disable=import-error
@@ -15,7 +13,7 @@ class ConfigResourceType(IntEnum):
1513
TOPIC = 2
1614

1715

18-
class ConfigResource(object):
16+
class ConfigResource:
1917
"""A class for specifying config resources.
2018
Arguments:
2119
resource_type (ConfigResourceType): the type of kafka resource

kafka/admin/new_partitions.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
from __future__ import absolute_import
2-
3-
4-
class NewPartitions(object):
1+
class NewPartitions:
52
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
63
must be the difference between the new total number of partitions and the existing number of partitions.
74
Arguments:

kafka/admin/new_topic.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
from __future__ import absolute_import
2-
31
from kafka.errors import IllegalArgumentError
42

53

6-
class NewTopic(object):
4+
class NewTopic:
75
""" A class for new topic creation
86
Arguments:
97
name (string): name of the topic

kafka/client_async.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from __future__ import absolute_import, division
2-
31
import collections
42
import copy
53
import logging
@@ -32,14 +30,10 @@
3230
from kafka.vendor import socketpair
3331
from kafka.version import __version__
3432

35-
if six.PY2:
36-
ConnectionError = None
37-
38-
3933
log = logging.getLogger('kafka.client')
4034

4135

42-
class KafkaClient(object):
36+
class KafkaClient:
4337
"""
4438
A network client for asynchronous request/response network I/O.
4539
@@ -374,7 +368,7 @@ def _maybe_connect(self, node_id):
374368

375369
if conn is None:
376370
broker = self.cluster.broker_metadata(node_id)
377-
assert broker, 'Broker id %s not in current metadata' % (node_id,)
371+
assert broker, 'Broker id {} not in current metadata'.format(node_id)
378372

379373
log.debug("Initiating connection to node %s at %s:%s",
380374
node_id, broker.host, broker.port)
@@ -686,7 +680,7 @@ def _poll(self, timeout):
686680
unexpected_data = key.fileobj.recv(1)
687681
if unexpected_data: # anything other than a 0-byte read means protocol issues
688682
log.warning('Protocol out of sync on %r, closing', conn)
689-
except socket.error:
683+
except OSError:
690684
pass
691685
conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests'))
692686
continue
@@ -701,7 +695,7 @@ def _poll(self, timeout):
701695
if conn not in processed and conn.connected() and conn._sock.pending():
702696
self._pending_completion.extend(conn.recv())
703697

704-
for conn in six.itervalues(self._conns):
698+
for conn in self._conns.values():
705699
if conn.requests_timed_out():
706700
log.warning('%s timed out after %s ms. Closing connection.',
707701
conn, conn.config['request_timeout_ms'])
@@ -941,7 +935,7 @@ def wakeup(self):
941935
except socket.timeout:
942936
log.warning('Timeout to send to wakeup socket!')
943937
raise Errors.KafkaTimeoutError()
944-
except socket.error as e:
938+
except OSError as e:
945939
log.warning('Unable to send to wakeup socket!')
946940
if self._raise_upon_socket_err_during_wakeup:
947941
raise e
@@ -951,7 +945,7 @@ def _clear_wake_fd(self):
951945
while True:
952946
try:
953947
self._wake_r.recv(1024)
954-
except socket.error:
948+
except OSError:
955949
break
956950

957951
def _maybe_close_oldest_connection(self):
@@ -981,7 +975,7 @@ def bootstrap_connected(self):
981975
OrderedDict = dict
982976

983977

984-
class IdleConnectionManager(object):
978+
class IdleConnectionManager:
985979
def __init__(self, connections_max_idle_ms):
986980
if connections_max_idle_ms > 0:
987981
self.connections_max_idle = connections_max_idle_ms / 1000
@@ -1043,7 +1037,7 @@ def poll_expired_connection(self):
10431037
return None
10441038

10451039

1046-
class KafkaClientMetrics(object):
1040+
class KafkaClientMetrics:
10471041
def __init__(self, metrics, metric_group_prefix, conns):
10481042
self.metrics = metrics
10491043
self.metric_group_name = metric_group_prefix + '-metrics'

kafka/cluster.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from __future__ import absolute_import
2-
31
import collections
42
import copy
53
import logging
@@ -16,7 +14,7 @@
1614
log = logging.getLogger(__name__)
1715

1816

19-
class ClusterMetadata(object):
17+
class ClusterMetadata:
2018
"""
2119
A class to manage kafka cluster metadata.
2220
@@ -128,9 +126,9 @@ def available_partitions_for_topic(self, topic):
128126
"""
129127
if topic not in self._partitions:
130128
return None
131-
return set([partition for partition, metadata
132-
in six.iteritems(self._partitions[topic])
133-
if metadata.leader != -1])
129+
return {partition for partition, metadata
130+
in self._partitions[topic].items()
131+
if metadata.leader != -1}
134132

135133
def leader_for_partition(self, partition):
136134
"""Return node_id of leader, -1 unavailable, None if unknown."""
@@ -361,7 +359,7 @@ def add_group_coordinator(self, group, response):
361359

362360
# Use a coordinator-specific node id so that group requests
363361
# get a dedicated connection
364-
node_id = 'coordinator-{}'.format(response.coordinator_id)
362+
node_id = f'coordinator-{response.coordinator_id}'
365363
coordinator = BrokerMetadata(
366364
node_id,
367365
response.host,

kafka/codec.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from __future__ import absolute_import
2-
31
import gzip
42
import io
53
import platform
@@ -149,10 +147,6 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32*1024):
149147
# buffer... likely a python-snappy bug, so just use a slice copy
150148
chunker = lambda payload, i, size: payload[i:size+i]
151149

152-
elif six.PY2:
153-
# Sliced buffer avoids additional copies
154-
# pylint: disable-msg=undefined-variable
155-
chunker = lambda payload, i, size: buffer(payload, i, size)
156150
else:
157151
# snappy.compress does not like raw memoryviews, so we have to convert
158152
# tobytes, which is a copy... oh well. it's the thought that counts.

0 commit comments

Comments
 (0)