@@ -847,8 +847,9 @@ def _read_packet(self, packet_type=MysqlPacket):
847
847
packet .check_error ()
848
848
raise gen .Return (packet )
849
849
850
+ @gen .coroutine
850
851
def _write_bytes (self , data ):
851
- return self ._stream .write (data )
852
+ yield self ._stream .write (data )
852
853
853
854
@gen .coroutine
854
855
def _read_query_result (self , unbuffered = False ):
@@ -891,7 +892,7 @@ def _execute_command(self, command, sql):
891
892
chunk_size = min (MAX_PACKET_LEN , len (sql ) + 1 ) # +1 is for command
892
893
893
894
prelude = struct .pack ('<iB' , chunk_size , command )
894
- self ._write_bytes (prelude + sql [:chunk_size - 1 ])
895
+ yield self ._write_bytes (prelude + sql [:chunk_size - 1 ])
895
896
if DEBUG : dump_packet (prelude + sql )
896
897
897
898
if chunk_size < MAX_PACKET_LEN :
@@ -903,7 +904,7 @@ def _execute_command(self, command, sql):
903
904
chunk_size = min (MAX_PACKET_LEN , len (sql ))
904
905
prelude = struct .pack ('<i' , chunk_size )[:3 ]
905
906
data = prelude + int2byte (seq_id % 256 ) + sql [:chunk_size ]
906
- self ._write_bytes (data )
907
+ yield self ._write_bytes (data )
907
908
if DEBUG : dump_packet (data )
908
909
sql = sql [chunk_size :]
909
910
if not sql and chunk_size < MAX_PACKET_LEN :
@@ -955,7 +956,7 @@ def _request_authentication(self):
955
956
956
957
if DEBUG : dump_packet (data )
957
958
958
- self ._write_bytes (data )
959
+ yield self ._write_bytes (data )
959
960
960
961
auth_packet = yield self ._read_packet ()
961
962
@@ -966,7 +967,7 @@ def _request_authentication(self):
966
967
# send legacy handshake
967
968
data = _scramble_323 (self .password .encode ('latin1' ), self .salt ) + b'\0 '
968
969
data = pack_int24 (len (data )) + int2byte (next_packet ) + data
969
- self ._write_bytes (data )
970
+ yield self ._write_bytes (data )
970
971
auth_packet = self ._read_packet ()
971
972
972
973
# _mysql support
@@ -1105,7 +1106,7 @@ def _read_ok_packet(self, first_packet):
1105
1106
def _read_load_local_packet (self , first_packet ):
1106
1107
load_packet = LoadLocalPacketWrapper (first_packet )
1107
1108
sender = LoadLocalFile (load_packet .filename , self .connection )
1108
- sender .send_data ()
1109
+ yield sender .send_data ()
1109
1110
1110
1111
ok_packet = yield self .connection ._read_packet ()
1111
1112
if not ok_packet .is_ok_packet ():
@@ -1221,35 +1222,35 @@ class LoadLocalFile(object):
1221
1222
def __init__ (self , filename , connection ):
1222
1223
self .filename = filename
1223
1224
self .connection = connection
1225
+ self .seq_id = 1
1224
1226
1227
+ @gen .coroutine
1228
+ def handle_chunk (self , chunk ):
1229
+ self .seq_id = (self .seq_id + 1 ) % 256
1230
+ if not chunk :
1231
+ # send the empty packet to signify we are done sending data
1232
+ packet = struct .pack ('<i' , 0 )[:3 ] + int2byte (self .seq_id )
1233
+ yield self .connection ._write_bytes (packet )
1234
+ else :
1235
+ packet = struct .pack ('<i' , len (chunk ))[:3 ] + int2byte (self .seq_id )
1236
+ format_str = '!{0}s' .format (len (chunk ))
1237
+ packet += struct .pack (format_str , chunk )
1238
+ yield self .connection ._write_bytes (packet )
1239
+
1240
+ @gen .coroutine
1225
1241
def send_data (self ):
1226
1242
"""Send data packets from the local file to the server"""
1227
1243
if not self .connection ._stream :
1228
1244
raise InterfaceError ("(0, '')" )
1229
1245
1230
- # sequence id is 2 as we already sent a query packet
1231
- seq_id = 2
1232
1246
try :
1233
- with open (self .filename , 'rb' ) as open_file :
1234
- chunk_size = MAX_PACKET_LEN
1235
- prelude = b""
1236
- packet = b""
1237
- packet_size = 0
1238
-
1239
- while True :
1240
- chunk = open_file .read (chunk_size )
1241
- if not chunk :
1242
- break
1243
- packet = struct .pack ('<i' , len (chunk ))[:3 ] + int2byte (seq_id )
1244
- format_str = '!{0}s' .format (len (chunk ))
1245
- packet += struct .pack (format_str , chunk )
1246
- self .connection ._write_bytes (packet )
1247
- seq_id += 1
1248
- except IOError :
1247
+ fd = os .open (self .filename , os .O_RDONLY ) # O_NONBLOCK applied automatically
1248
+ except OSError :
1249
+ yield self .handle_chunk (None )
1249
1250
raise OperationalError (1017 , "Can't find file '{0}'" .format (self .filename ))
1250
- finally :
1251
- # send the empty packet to signify we are done sending data
1252
- packet = struct . pack ( '<i' , 0 )[: 3 ] + int2byte ( seq_id )
1253
- self . connection . _write_bytes ( packet )
1251
+ else :
1252
+ stream = iostream . PipeIOStream ( fd , max_buffer_size = MAX_PACKET_LEN )
1253
+ stream . read_until_close ( streaming_callback = self . handle_chunk , callback = self . handle_chunk )
1254
+ stream . close ( )
1254
1255
1255
1256
# g:khuno_ignore='E226,E301,E701'
0 commit comments