@@ -71,37 +71,46 @@ export function createChannelConnection (
71
71
const channel = new Channel ( channelConfig )
72
72
73
73
return Bolt . handshake ( channel )
74
- . then ( ( { protocolVersion, consumeRemainingBuffer } ) => {
74
+ . then ( ( { protocolVersion : version , consumeRemainingBuffer } ) => {
75
+ const chunker = new Chunker ( channel )
76
+ const dechunker = new Dechunker ( )
77
+
75
78
const connection = new ChannelConnection (
76
79
channel ,
77
80
errorHandler ,
78
81
address ,
79
82
log ,
80
83
config . disableLosslessIntegers ,
81
- serversideRouting
84
+ serversideRouting ,
85
+ {
86
+ newChunker : ( ) => chunker ,
87
+ newDechunker : ( ) => dechunker
88
+ }
82
89
)
83
90
84
- connection . _protocol = Bolt . create ( {
85
- version : protocolVersion ,
91
+ const protocol = Bolt . create ( {
92
+ version,
86
93
connection,
87
- chunker : connection . _chunker ,
94
+ chunker,
88
95
disableLosslessIntegers : config . disableLosslessIntegers ,
89
96
serversideRouting
90
97
} )
91
98
99
+ connection . _protocol = protocol
100
+
92
101
// reset the error handler to just handle errors and forget about the handshake promise
93
102
channel . onerror = connection . _handleFatalError . bind ( connection )
94
103
95
104
// Ok, protocol running. Simply forward all messages to the dechunker
96
- channel . onmessage = buf => connection . _dechunker . write ( buf )
105
+ channel . onmessage = buf => dechunker . write ( buf )
97
106
98
107
// setup dechunker to dechunk messages and forward them to the message handler
99
- connection . _dechunker . onmessage = buf => {
100
- connection . _handleMessage ( connection . _protocol . unpacker ( ) . unpack ( buf ) )
108
+ dechunker . onmessage = buf => {
109
+ connection . _handleMessage ( protocol . unpacker ( ) . unpack ( buf ) )
101
110
}
102
111
103
112
// forward all pending bytes to the dechunker
104
- consumeRemainingBuffer ( buffer => connection . _dechunker . write ( buffer ) )
113
+ consumeRemainingBuffer ( buffer => dechunker . write ( buffer ) )
105
114
106
115
return connection
107
116
} )
@@ -127,7 +136,11 @@ export default class ChannelConnection extends Connection {
127
136
address ,
128
137
log ,
129
138
disableLosslessIntegers = false ,
130
- serversideRouting = null
139
+ serversideRouting = null ,
140
+ {
141
+ newChunker = channel => new Chunker ( channel ) ,
142
+ newDechunker = ( ) => new Dechunker ( )
143
+ } = { }
131
144
) {
132
145
super ( errorHandler )
133
146
@@ -139,8 +152,8 @@ export default class ChannelConnection extends Connection {
139
152
this . _pendingObservers = [ ]
140
153
this . _currentObserver = undefined
141
154
this . _ch = channel
142
- this . _dechunker = new Dechunker ( )
143
- this . _chunker = new Chunker ( channel )
155
+ this . _dechunker = newDechunker ( )
156
+ this . _chunker = newChunker ( channel )
144
157
this . _log = log
145
158
this . _serversideRouting = serversideRouting
146
159
0 commit comments