@@ -112,7 +112,11 @@ def _read_bytes(self, num_bytes):
112
112
# TODO multiplex socket communication to allow for multi-threaded clients
113
113
114
114
def send (self , request_id , payload ):
115
- "Send a request to Kafka"
115
+ """
116
+ Send a request to Kafka
117
+ param: request_id -- can be any int (used only for debug logging...)
118
+ param: payload -- an encoded kafka packet (see KafkaProtocol)
119
+ """
116
120
117
121
log .debug ("About to send %d bytes to Kafka, request %d" % (len (payload ), request_id ))
118
122
@@ -128,12 +132,14 @@ def send(self, request_id, payload):
128
132
129
133
def recv (self , request_id ):
130
134
"""
131
- Get a response from Kafka
135
+ Get a response packet from Kafka
136
+ param: request_id -- can be any int (only used for debug logging...)
137
+ returns encoded kafka packet response from server as type str
132
138
"""
133
139
log .debug ("Reading response %d from Kafka" % request_id )
140
+
134
141
# Read the size off of the header
135
142
resp = self ._read_bytes (4 )
136
-
137
143
(size ,) = struct .unpack ('>i' , resp )
138
144
139
145
# Read the remainder of the response
@@ -144,14 +150,15 @@ def copy(self):
144
150
"""
145
151
Create an inactive copy of the connection object
146
152
A reinit() has to be done on the copy before it can be used again
153
+ return a new KafkaConnection object
147
154
"""
148
155
c = copy .deepcopy (self )
149
156
c ._sock = None
150
157
return c
151
158
152
159
def close (self ):
153
160
"""
154
- Close this connection
161
+ Shutdown and close the connection socket
155
162
"""
156
163
log .debug ("Closing socket connection for %s:%d" % (self .host , self .port ))
157
164
if self ._sock :
@@ -172,6 +179,9 @@ def close(self):
172
179
def reinit (self ):
173
180
"""
174
181
Re-initialize the socket connection
182
+ close current socket (if open)
183
+ and start a fresh connection
184
+ raise ConnectionError on error
175
185
"""
176
186
log .debug ("Reinitializing socket connection for %s:%d" % (self .host , self .port ))
177
187
0 commit comments