Skip to content

Commit af71cc7

Browse files
committed
Adding yield where necessary; unblocking file read
1 parent 9781463 commit af71cc7

File tree

1 file changed

+28
-28
lines changed

1 file changed

+28
-28
lines changed

tornado_mysql/connections.py

+28-28
Original file line numberDiff line numberDiff line change
@@ -847,8 +847,9 @@ def _read_packet(self, packet_type=MysqlPacket):
847847
packet.check_error()
848848
raise gen.Return(packet)
849849

850+
@gen.coroutine
850851
def _write_bytes(self, data):
851-
return self._stream.write(data)
852+
yield self._stream.write(data)
852853

853854
@gen.coroutine
854855
def _read_query_result(self, unbuffered=False):
@@ -891,7 +892,7 @@ def _execute_command(self, command, sql):
891892
chunk_size = min(MAX_PACKET_LEN, len(sql) + 1) # +1 is for command
892893

893894
prelude = struct.pack('<iB', chunk_size, command)
894-
self._write_bytes(prelude + sql[:chunk_size-1])
895+
yield self._write_bytes(prelude + sql[:chunk_size-1])
895896
if DEBUG: dump_packet(prelude + sql)
896897

897898
if chunk_size < MAX_PACKET_LEN:
@@ -903,7 +904,7 @@ def _execute_command(self, command, sql):
903904
chunk_size = min(MAX_PACKET_LEN, len(sql))
904905
prelude = struct.pack('<i', chunk_size)[:3]
905906
data = prelude + int2byte(seq_id%256) + sql[:chunk_size]
906-
self._write_bytes(data)
907+
yield self._write_bytes(data)
907908
if DEBUG: dump_packet(data)
908909
sql = sql[chunk_size:]
909910
if not sql and chunk_size < MAX_PACKET_LEN:
@@ -955,7 +956,7 @@ def _request_authentication(self):
955956

956957
if DEBUG: dump_packet(data)
957958

958-
self._write_bytes(data)
959+
yield self._write_bytes(data)
959960

960961
auth_packet = yield self._read_packet()
961962

@@ -966,7 +967,7 @@ def _request_authentication(self):
966967
# send legacy handshake
967968
data = _scramble_323(self.password.encode('latin1'), self.salt) + b'\0'
968969
data = pack_int24(len(data)) + int2byte(next_packet) + data
969-
self._write_bytes(data)
970+
yield self._write_bytes(data)
970971
auth_packet = self._read_packet()
971972

972973
# _mysql support
@@ -1105,7 +1106,7 @@ def _read_ok_packet(self, first_packet):
11051106
def _read_load_local_packet(self, first_packet):
11061107
load_packet = LoadLocalPacketWrapper(first_packet)
11071108
sender = LoadLocalFile(load_packet.filename, self.connection)
1108-
sender.send_data()
1109+
yield sender.send_data()
11091110

11101111
ok_packet = yield self.connection._read_packet()
11111112
if not ok_packet.is_ok_packet():
@@ -1221,35 +1222,34 @@ class LoadLocalFile(object):
12211222
def __init__(self, filename, connection):
12221223
self.filename = filename
12231224
self.connection = connection
1225+
self.seq_id = 1
1226+
1227+
@gen.coroutine
1228+
def handle_chunk(self, chunk):
1229+
self.seq_id = (self.seq_id + 1) % 256
1230+
if not chunk:
1231+
# send the empty packet to signify we are done sending data
1232+
packet = struct.pack('<i', 0)[:3] + int2byte(self.seq_id)
1233+
yield self.connection._write_bytes(packet)
1234+
else:
1235+
packet = struct.pack('<i', len(chunk))[:3] + int2byte(self.seq_id)
1236+
format_str = '!{0}s'.format(len(chunk))
1237+
packet += struct.pack(format_str, chunk)
1238+
yield self.connection._write_bytes(packet)
12241239

1240+
@gen.coroutine
12251241
def send_data(self):
12261242
"""Send data packets from the local file to the server"""
12271243
if not self.connection._stream:
12281244
raise InterfaceError("(0, '')")
12291245

1230-
# sequence id is 2 as we already sent a query packet
1231-
seq_id = 2
12321246
try:
1233-
with open(self.filename, 'rb') as open_file:
1234-
chunk_size = MAX_PACKET_LEN
1235-
prelude = b""
1236-
packet = b""
1237-
packet_size = 0
1238-
1239-
while True:
1240-
chunk = open_file.read(chunk_size)
1241-
if not chunk:
1242-
break
1243-
packet = struct.pack('<i', len(chunk))[:3] + int2byte(seq_id)
1244-
format_str = '!{0}s'.format(len(chunk))
1245-
packet += struct.pack(format_str, chunk)
1246-
self.connection._write_bytes(packet)
1247-
seq_id += 1
1248-
except IOError:
1247+
fd = os.open(self.filename, os.O_RDONLY) # O_NONBLOCK applied automatically
1248+
stream = iostream.PipeIOStream(fd, max_buffer_size=MAX_PACKET_LEN)
1249+
stream.read_until_close(streaming_callback=self.handle_chunk, callback=self.handle_chunk)
1250+
stream.close()
1251+
except FileNotFoundError:
1252+
yield self.handle_chunk(None)
12491253
raise OperationalError(1017, "Can't find file '{0}'".format(self.filename))
1250-
finally:
1251-
# send the empty packet to signify we are done sending data
1252-
packet = struct.pack('<i', 0)[:3] + int2byte(seq_id)
1253-
self.connection._write_bytes(packet)
12541254

12551255
# g:khuno_ignore='E226,E301,E701'

0 commit comments

Comments
 (0)