@@ -41,7 +41,7 @@ Connection.prototype.connect = function(port, host) {
41
41
} ) ;
42
42
43
43
this . stream . on ( 'error' , function ( error ) {
44
- //don't raise ECONNRESET errors - they can & should be ignored
44
+ //don't raise ECONNRESET errors - they can & should be ignored
45
45
//during disconnect
46
46
if ( self . _ending && error . code == 'ECONNRESET' ) {
47
47
return ;
@@ -81,8 +81,10 @@ Connection.prototype.connect = function(port, host) {
81
81
82
82
Connection . prototype . attachListeners = function ( stream ) {
83
83
var self = this ;
84
- stream . on ( 'data' , function ( buffer ) {
85
- self . setBuffer ( buffer ) ;
84
+ stream . on ( 'readable' , function ( ) {
85
+ var buff = stream . read ( ) ;
86
+ if ( ! buff ) return ;
87
+ self . setBuffer ( buff ) ;
86
88
var msg = self . parseMessage ( ) ;
87
89
while ( msg ) {
88
90
self . emit ( 'message' , msg ) ;
@@ -322,8 +324,9 @@ Connection.prototype.parseMessage = function() {
322
324
323
325
//read message id code
324
326
var id = this . buffer [ this . offset ++ ] ;
327
+ var buffer = this . buffer ;
325
328
//read message length
326
- var length = this . parseInt32 ( ) ;
329
+ var length = this . parseInt32 ( buffer ) ;
327
330
328
331
if ( remaining <= length ) {
329
332
this . lastBuffer = this . buffer ;
@@ -340,95 +343,106 @@ Connection.prototype.parseMessage = function() {
340
343
341
344
case 0x52 : //R
342
345
msg . name = 'authenticationOk' ;
343
- return this . parseR ( msg ) ;
346
+ msg = this . parseR ( msg ) ;
347
+ break ;
344
348
345
349
case 0x53 : //S
346
350
msg . name = 'parameterStatus' ;
347
- return this . parseS ( msg ) ;
351
+ msg = this . parseS ( msg ) ;
352
+ break ;
348
353
349
354
case 0x4b : //K
350
355
msg . name = 'backendKeyData' ;
351
- return this . parseK ( msg ) ;
356
+ msg = this . parseK ( msg ) ;
357
+ break ;
352
358
353
359
case 0x43 : //C
354
360
msg . name = 'commandComplete' ;
355
- return this . parseC ( msg ) ;
361
+ msg = this . parseC ( msg ) ;
362
+ break ;
356
363
357
364
case 0x5a : //Z
358
365
msg . name = 'readyForQuery' ;
359
- return this . parseZ ( msg ) ;
366
+ msg = this . parseZ ( msg ) ;
367
+ break ;
360
368
361
369
case 0x54 : //T
362
370
msg . name = 'rowDescription' ;
363
- return this . parseT ( msg ) ;
371
+ msg = this . parseT ( msg ) ;
372
+ break ;
364
373
365
374
case 0x44 : //D
366
- msg . name = 'dataRow' ;
367
- return this . parseD ( msg ) ;
375
+ msg = this . parseD ( buffer , length ) ;
376
+ break ;
368
377
369
378
case 0x45 : //E
370
379
msg . name = 'error' ;
371
- return this . parseE ( msg ) ;
380
+ msg = this . parseE ( msg ) ;
381
+ break ;
372
382
373
383
case 0x4e : //N
374
384
msg . name = 'notice' ;
375
- return this . parseN ( msg ) ;
385
+ msg = this . parseN ( msg ) ;
386
+ break ;
376
387
377
388
case 0x31 : //1
378
389
msg . name = 'parseComplete' ;
379
- return msg ;
390
+ break ;
380
391
381
392
case 0x32 : //2
382
393
msg . name = 'bindComplete' ;
383
- return msg ;
394
+ break ;
384
395
385
396
case 0x41 : //A
386
397
msg . name = 'notification' ;
387
- return this . parseA ( msg ) ;
398
+ msg = this . parseA ( msg ) ;
399
+ break ;
388
400
389
401
case 0x6e : //n
390
402
msg . name = 'noData' ;
391
- return msg ;
403
+ break ;
392
404
393
405
case 0x49 : //I
394
406
msg . name = 'emptyQuery' ;
395
- return msg ;
407
+ break ;
396
408
397
409
case 0x73 : //s
398
410
msg . name = 'portalSuspended' ;
399
- return msg ;
411
+ break ;
400
412
401
413
case 0x47 : //G
402
414
msg . name = 'copyInResponse' ;
403
- return this . parseGH ( msg ) ;
415
+ msg = this . parseGH ( msg ) ;
416
+ break ;
404
417
405
418
case 0x48 : //H
406
419
msg . name = 'copyOutResponse' ;
407
- return this . parseGH ( msg ) ;
420
+ msg = this . parseGH ( msg ) ;
421
+ break ;
408
422
case 0x63 : //c
409
423
msg . name = 'copyDone' ;
410
- return msg ;
424
+ break ;
411
425
412
426
case 0x64 : //d
413
427
msg . name = 'copyData' ;
414
- return this . parsed ( msg ) ;
415
-
416
- default :
417
- throw new Error ( "Unrecognized message code " + id ) ;
428
+ msg = this . parsed ( msg ) ;
429
+ break ;
418
430
}
431
+ return msg ;
419
432
} ;
420
433
421
434
Connection . prototype . parseR = function ( msg ) {
422
435
var code = 0 ;
436
+ var buffer = this . buffer ;
423
437
if ( msg . length === 8 ) {
424
- code = this . parseInt32 ( ) ;
438
+ code = this . parseInt32 ( buffer ) ;
425
439
if ( code === 3 ) {
426
440
msg . name = 'authenticationCleartextPassword' ;
427
441
}
428
442
return msg ;
429
443
}
430
444
if ( msg . length === 12 ) {
431
- code = this . parseInt32 ( ) ;
445
+ code = this . parseInt32 ( buffer ) ;
432
446
if ( code === 5 ) { //md5 required
433
447
msg . name = 'authenticationMD5Password' ;
434
448
msg . salt = new Buffer ( 4 ) ;
@@ -441,85 +455,103 @@ Connection.prototype.parseR = function(msg) {
441
455
} ;
442
456
443
457
Connection . prototype . parseS = function ( msg ) {
444
- msg . parameterName = this . parseCString ( ) ;
445
- msg . parameterValue = this . parseCString ( ) ;
458
+ var buffer = this . buffer ;
459
+ msg . parameterName = this . parseCString ( buffer ) ;
460
+ msg . parameterValue = this . parseCString ( buffer ) ;
446
461
return msg ;
447
462
} ;
448
463
449
464
Connection . prototype . parseK = function ( msg ) {
450
- msg . processID = this . parseInt32 ( ) ;
451
- msg . secretKey = this . parseInt32 ( ) ;
465
+ var buffer = this . buffer ;
466
+ msg . processID = this . parseInt32 ( buffer ) ;
467
+ msg . secretKey = this . parseInt32 ( buffer ) ;
452
468
return msg ;
453
469
} ;
454
470
455
471
Connection . prototype . parseC = function ( msg ) {
456
- msg . text = this . parseCString ( ) ;
472
+ var buffer = this . buffer ;
473
+ msg . text = this . parseCString ( buffer ) ;
457
474
return msg ;
458
475
} ;
459
476
460
477
Connection . prototype . parseZ = function ( msg ) {
461
- msg . status = this . readChar ( ) ;
478
+ var buffer = this . buffer ;
479
+ msg . status = this . readString ( buffer , 1 ) ;
462
480
return msg ;
463
481
} ;
464
482
465
483
Connection . prototype . parseT = function ( msg ) {
466
- msg . fieldCount = this . parseInt16 ( ) ;
484
+ var buffer = this . buffer ;
485
+ msg . fieldCount = this . parseInt16 ( buffer ) ;
467
486
var fields = [ ] ;
468
487
for ( var i = 0 ; i < msg . fieldCount ; i ++ ) {
469
- fields . push ( this . parseField ( ) ) ;
488
+ fields . push ( this . parseField ( buffer ) ) ;
470
489
}
471
490
msg . fields = fields ;
472
491
return msg ;
473
492
} ;
474
493
475
- Connection . prototype . parseField = function ( ) {
494
+ Connection . prototype . parseField = function ( buffer ) {
476
495
var field = {
477
- name : this . parseCString ( ) ,
478
- tableID : this . parseInt32 ( ) ,
479
- columnID : this . parseInt16 ( ) ,
480
- dataTypeID : this . parseInt32 ( ) ,
481
- dataTypeSize : this . parseInt16 ( ) ,
482
- dataTypeModifier : this . parseInt32 ( ) ,
496
+ name : this . parseCString ( buffer ) ,
497
+ tableID : this . parseInt32 ( buffer ) ,
498
+ columnID : this . parseInt16 ( buffer ) ,
499
+ dataTypeID : this . parseInt32 ( buffer ) ,
500
+ dataTypeSize : this . parseInt16 ( buffer ) ,
501
+ dataTypeModifier : this . parseInt32 ( buffer ) ,
483
502
format : undefined
484
503
} ;
485
- if ( this . parseInt16 ( ) === TEXT_MODE ) {
504
+ if ( this . parseInt16 ( buffer ) === TEXT_MODE ) {
486
505
this . _mode = TEXT_MODE ;
487
506
field . format = 'text' ;
488
507
} else {
489
508
this . _mode = BINARY_MODE ;
509
+ this . readField = this . readBytes ;
490
510
field . format = 'binary' ;
491
511
}
492
512
return field ;
493
513
} ;
494
514
495
- Connection . prototype . parseD = function ( msg ) {
496
- var fieldCount = this . parseInt16 ( ) ;
497
- var fields = [ ] ;
515
+ var Message = function ( name , length ) {
516
+ this . name = name ;
517
+ this . length = length ;
518
+ } ;
519
+
520
+ var DataRowMessage = function ( name , length , fieldCount ) {
521
+ this . name = name ;
522
+ this . length = length ;
523
+ this . fieldCount = fieldCount ;
524
+ this . fields = [ ] ;
525
+ }
526
+
527
+ Connection . prototype . parseD = function ( buffer , length ) {
528
+ var fieldCount = this . parseInt16 ( buffer ) ;
529
+ var msg = new DataRowMessage ( 'dataRow' , length , fieldCount ) ;
498
530
for ( var i = 0 ; i < fieldCount ; i ++ ) {
499
- var length = this . parseInt32 ( ) ;
500
- var value = null ;
501
- if ( length !== - 1 ) {
502
- if ( this . _mode === TEXT_MODE ) {
503
- value = this . readString ( length ) ;
504
- } else {
505
- value = this . readBytes ( length ) ;
506
- }
507
- }
508
- fields . push ( value ) ;
531
+ var value = this . _readValue ( buffer ) ;
532
+ msg . fields . push ( value ) ;
509
533
}
510
- msg . fieldCount = fieldCount ;
511
- msg . fields = fields ;
512
534
return msg ;
513
535
} ;
514
536
537
+ Connection . prototype . _readValue = function ( buffer ) {
538
+ var length = this . parseInt32 ( buffer ) ;
539
+ if ( length === - 1 ) return null ;
540
+ if ( this . _mode === TEXT_MODE ) {
541
+ return this . readString ( buffer , length ) ;
542
+ }
543
+ return this . readBytes ( buffer , length ) ;
544
+ } ;
545
+
515
546
//parses error
516
547
Connection . prototype . parseE = function ( input ) {
548
+ var buffer = this . buffer ;
517
549
var fields = { } ;
518
550
var msg , item ;
519
- var fieldType = this . readString ( 1 ) ;
551
+ var fieldType = this . readString ( buffer , 1 ) ;
520
552
while ( fieldType != '\0' ) {
521
- fields [ fieldType ] = this . parseCString ( ) ;
522
- fieldType = this . readString ( 1 ) ;
553
+ fields [ fieldType ] = this . parseCString ( buffer ) ;
554
+ fieldType = this . readString ( buffer , 1 ) ;
523
555
}
524
556
if ( input . name === 'error' ) {
525
557
// the msg is an Error instance
@@ -553,57 +585,56 @@ Connection.prototype.parseE = function(input) {
553
585
Connection . prototype . parseN = Connection . prototype . parseE ;
554
586
555
587
Connection . prototype . parseA = function ( msg ) {
556
- msg . processId = this . parseInt32 ( ) ;
557
- msg . channel = this . parseCString ( ) ;
558
- msg . payload = this . parseCString ( ) ;
588
+ var buffer = this . buffer ;
589
+ msg . processId = this . parseInt32 ( buffer ) ;
590
+ msg . channel = this . parseCString ( buffer ) ;
591
+ msg . payload = this . parseCString ( buffer ) ;
559
592
return msg ;
560
593
} ;
561
594
562
595
Connection . prototype . parseGH = function ( msg ) {
596
+ var buffer = this . buffer ;
563
597
var isBinary = this . buffer [ this . offset ] !== 0 ;
564
598
this . offset ++ ;
565
599
msg . binary = isBinary ;
566
- var columnCount = this . parseInt16 ( ) ;
600
+ var columnCount = this . parseInt16 ( buffer ) ;
567
601
msg . columnTypes = [ ] ;
568
602
for ( var i = 0 ; i < columnCount ; i ++ ) {
569
- msg . columnTypes . push ( this . parseInt16 ( ) ) ;
603
+ msg . columnTypes . push ( this . parseInt16 ( buffer ) ) ;
570
604
}
571
605
return msg ;
572
606
} ;
573
607
574
- Connection . prototype . readChar = function ( ) {
575
- return this . readString ( 1 ) ;
576
- } ;
577
-
578
- Connection . prototype . parseInt32 = function ( ) {
579
- var value = this . buffer . readInt32BE ( this . offset , true ) ;
608
+ Connection . prototype . parseInt32 = function ( buffer ) {
609
+ var value = buffer . readInt32BE ( this . offset , true ) ;
580
610
this . offset += 4 ;
581
611
return value ;
582
612
} ;
583
613
584
- Connection . prototype . parseInt16 = function ( ) {
585
- var value = this . buffer . readInt16BE ( this . offset , true ) ;
614
+ Connection . prototype . parseInt16 = function ( buffer ) {
615
+ var value = buffer . readInt16BE ( this . offset , true ) ;
586
616
this . offset += 2 ;
587
617
return value ;
588
618
} ;
589
619
590
- Connection . prototype . readString = function ( length ) {
591
- return this . buffer . toString ( this . encoding , this . offset , ( this . offset += length ) ) ;
620
+ Connection . prototype . readString = function ( buffer , length ) {
621
+ return buffer . toString ( this . encoding , this . offset , ( this . offset += length ) ) ;
592
622
} ;
593
623
594
- Connection . prototype . readBytes = function ( length ) {
595
- return this . buffer . slice ( this . offset , this . offset += length ) ;
624
+ Connection . prototype . readBytes = function ( buffer , length ) {
625
+ return buffer . slice ( this . offset , this . offset += length ) ;
596
626
} ;
597
627
598
- Connection . prototype . parseCString = function ( ) {
628
+ Connection . prototype . parseCString = function ( buffer ) {
599
629
var start = this . offset ;
600
- while ( this . buffer [ this . offset ++ ] !== 0 ) { }
601
- return this . buffer . toString ( this . encoding , start , this . offset - 1 ) ;
630
+ while ( buffer [ this . offset ++ ] !== 0 ) { }
631
+ return buffer . toString ( this . encoding , start , this . offset - 1 ) ;
602
632
} ;
603
633
604
634
Connection . prototype . parsed = function ( msg ) {
635
+ this . buffer = buffer ;
605
636
//exclude length field
606
- msg . chunk = this . readBytes ( msg . length - 4 ) ;
637
+ msg . chunk = this . readBytes ( buffer , msg . length - 4 ) ;
607
638
return msg ;
608
639
} ;
609
640
//end parsing methods
0 commit comments