19
19
using System . Collections . Generic ;
20
20
using System . Diagnostics ;
21
21
using System . IO ;
22
- using System . Linq ;
23
22
using System . Threading ;
24
23
using System . Threading . Tasks ;
25
24
using Apache . Arrow . Types ;
@@ -115,38 +114,43 @@ private List<IArrowArray> BuildArrays(
115
114
ByteBuffer messageBuffer ,
116
115
Flatbuf . RecordBatch recordBatchMessage )
117
116
{
118
- return CreateInner ( ) . ToList ( ) ;
117
+ var arrays = new List < IArrowArray > ( recordBatchMessage . NodesLength ) ;
119
118
120
- IEnumerable < IArrowArray > CreateInner ( )
119
+ if ( recordBatchMessage . NodesLength == 0 )
121
120
{
122
- var recordBatchManipulator = new RecordBatchManipulator ( in recordBatchMessage ) ;
121
+ return arrays ;
122
+ }
123
+
124
+ var recordBatchEnumerator = new RecordBatchEnumerator ( in recordBatchMessage ) ;
123
125
124
- while ( ! recordBatchManipulator . IsAllNodeRead )
125
- {
126
- var field = schema . GetFieldByIndex ( recordBatchManipulator . CurrentNodeIndex ) ;
127
- Flatbuf . FieldNode fieldNode = recordBatchManipulator . ShiftNode ( ) ;
126
+ do
127
+ {
128
+ var field = schema . GetFieldByIndex ( recordBatchEnumerator . CurrentNodeIndex ) ;
129
+ var fieldNode = recordBatchEnumerator . CurrentNode ;
128
130
129
- var arrayData = field . DataType . IsFixedPrimitive ( ) ?
130
- LoadPrimitiveField ( recordBatchManipulator , field , in fieldNode , messageBuffer ) :
131
- LoadVariableField ( recordBatchManipulator , field , in fieldNode , messageBuffer ) ;
131
+ var arrayData = field . DataType . IsFixedPrimitive ( )
132
+ ? LoadPrimitiveField ( recordBatchEnumerator , field , in fieldNode , messageBuffer )
133
+ : LoadVariableField ( recordBatchEnumerator , field , in fieldNode , messageBuffer ) ;
132
134
133
- yield return ArrowArrayFactory . BuildArray ( arrayData ) ;
134
- }
135
- }
135
+ arrays . Add ( ArrowArrayFactory . BuildArray ( arrayData ) ) ;
136
+ } while ( recordBatchEnumerator . MoveNextNode ( ) ) ;
137
+
138
+ return arrays ;
136
139
}
137
140
138
141
139
142
private ArrayData LoadPrimitiveField (
140
- RecordBatchManipulator recordBatchManipulator ,
143
+ RecordBatchEnumerator recordBatchEnumerator ,
141
144
Field field ,
142
145
in Flatbuf . FieldNode fieldNode ,
143
146
ByteBuffer bodyData )
144
147
{
145
- var nullBitmapBuffer = recordBatchManipulator . ShiftBuffer ( ) ;
146
- var valueBuffer = recordBatchManipulator . ShiftBuffer ( ) ;
147
148
148
- ArrowBuffer nullArrowBuffer = BuildArrowBuffer ( bodyData , nullBitmapBuffer ) ;
149
- ArrowBuffer valueArrowBuffer = BuildArrowBuffer ( bodyData , valueBuffer ) ;
149
+ ArrowBuffer nullArrowBuffer = BuildArrowBuffer ( bodyData , recordBatchEnumerator . CurrentBuffer ) ;
150
+ recordBatchEnumerator . MoveNextBuffer ( ) ;
151
+ ArrowBuffer valueArrowBuffer = BuildArrowBuffer ( bodyData , recordBatchEnumerator . CurrentBuffer ) ;
152
+ recordBatchEnumerator . MoveNextBuffer ( ) ;
153
+
150
154
151
155
var fieldLength = ( int ) fieldNode . Length ;
152
156
var fieldNullCount = ( int ) fieldNode . NullCount ;
@@ -162,25 +166,25 @@ private ArrayData LoadPrimitiveField(
162
166
}
163
167
164
168
var arrowBuff = new [ ] { nullArrowBuffer , valueArrowBuffer } ;
165
- var offspring = GetOffspring ( recordBatchManipulator , field , bodyData ) ;
169
+ var children = GetChildren ( recordBatchEnumerator , field , bodyData ) ;
166
170
167
- return new ArrayData ( field . DataType , fieldLength , fieldNullCount , 0 , arrowBuff , offspring . ToArray ( ) ) ;
171
+ return new ArrayData ( field . DataType , fieldLength , fieldNullCount , 0 , arrowBuff , children ) ;
168
172
}
169
173
170
174
171
175
private ArrayData LoadVariableField (
172
- RecordBatchManipulator recordBatchManipulator ,
176
+ RecordBatchEnumerator recordBatchEnumerator ,
173
177
Field field ,
174
178
in Flatbuf . FieldNode fieldNode ,
175
179
ByteBuffer bodyData )
176
180
{
177
- var nullBitmapBuffer = recordBatchManipulator . ShiftBuffer ( ) ;
178
- var offsetBuffer = recordBatchManipulator . ShiftBuffer ( ) ;
179
- var valueBuffer = recordBatchManipulator . ShiftBuffer ( ) ;
180
181
181
- ArrowBuffer nullArrowBuffer = BuildArrowBuffer ( bodyData , nullBitmapBuffer ) ;
182
- ArrowBuffer offsetArrowBuffer = BuildArrowBuffer ( bodyData , offsetBuffer ) ;
183
- ArrowBuffer valueArrowBuffer = BuildArrowBuffer ( bodyData , valueBuffer ) ;
182
+ ArrowBuffer nullArrowBuffer = BuildArrowBuffer ( bodyData , recordBatchEnumerator . CurrentBuffer ) ;
183
+ recordBatchEnumerator . MoveNextBuffer ( ) ;
184
+ ArrowBuffer offsetArrowBuffer = BuildArrowBuffer ( bodyData , recordBatchEnumerator . CurrentBuffer ) ;
185
+ recordBatchEnumerator . MoveNextBuffer ( ) ;
186
+ ArrowBuffer valueArrowBuffer = BuildArrowBuffer ( bodyData , recordBatchEnumerator . CurrentBuffer ) ;
187
+ recordBatchEnumerator . MoveNextBuffer ( ) ;
184
188
185
189
var fieldLength = ( int ) fieldNode . Length ;
186
190
var fieldNullCount = ( int ) fieldNode . NullCount ;
@@ -196,24 +200,33 @@ private ArrayData LoadVariableField(
196
200
}
197
201
198
202
var arrowBuff = new [ ] { nullArrowBuffer , offsetArrowBuffer , valueArrowBuffer } ;
199
- var offspring = GetOffspring ( recordBatchManipulator , field , bodyData ) ;
203
+ var children = GetChildren ( recordBatchEnumerator , field , bodyData ) ;
200
204
201
- return new ArrayData ( field . DataType , fieldLength , fieldNullCount , 0 , arrowBuff , offspring . ToArray ( ) ) ;
205
+ return new ArrayData ( field . DataType , fieldLength , fieldNullCount , 0 , arrowBuff , children ) ;
202
206
}
203
207
204
- private IEnumerable < ArrayData > GetOffspring (
205
- RecordBatchManipulator recordBatchManipulator ,
208
+ private ArrayData [ ] GetChildren (
209
+ RecordBatchEnumerator recordBatchEnumerator ,
206
210
Field field ,
207
211
ByteBuffer bodyData )
208
212
{
209
- if ( ! ( field . DataType is NestedType type ) ) yield break ;
210
- foreach ( var childField in type . Children )
213
+ if ( ! ( field . DataType is NestedType type ) ) return null ;
214
+
215
+ var childrenCount = type . ChildrenCount ;
216
+ var children = new ArrayData [ childrenCount ] ;
217
+ for ( var index = 0 ; index < childrenCount ; index ++ )
211
218
{
212
- Flatbuf . FieldNode childFieldNode = recordBatchManipulator . ShiftNode ( ) ;
213
- yield return childField . DataType . IsFixedPrimitive ( )
214
- ? LoadPrimitiveField ( recordBatchManipulator , childField , in childFieldNode , bodyData )
215
- : LoadVariableField ( recordBatchManipulator , childField , in childFieldNode , bodyData ) ;
219
+ Flatbuf . FieldNode childFieldNode = recordBatchEnumerator . CurrentNode ;
220
+ recordBatchEnumerator . MoveNextNode ( ) ;
221
+
222
+ var childField = type . Children [ index ] ;
223
+ var child = childField . DataType . IsFixedPrimitive ( )
224
+ ? LoadPrimitiveField ( recordBatchEnumerator , childField , in childFieldNode , bodyData )
225
+ : LoadVariableField ( recordBatchEnumerator , childField , in childFieldNode , bodyData ) ;
226
+
227
+ children [ index ] = child ;
216
228
}
229
+ return children ;
217
230
}
218
231
219
232
private ArrowBuffer BuildArrowBuffer ( ByteBuffer bodyData , Flatbuf . Buffer buffer )
@@ -231,27 +244,29 @@ private ArrowBuffer BuildArrowBuffer(ByteBuffer bodyData, Flatbuf.Buffer buffer)
231
244
}
232
245
}
233
246
234
- class RecordBatchManipulator
247
+ internal class RecordBatchEnumerator
235
248
{
236
- private int CurrentBufferIndex { get ; set ; }
237
249
private Flatbuf . RecordBatch RecordBatch { get ; }
250
+ internal int CurrentBufferIndex { get ; set ; }
238
251
internal int CurrentNodeIndex { get ; set ; }
239
- internal bool IsAllNodeRead => CurrentNodeIndex >= RecordBatch . NodesLength ;
240
252
241
- internal RecordBatchManipulator ( in Flatbuf . RecordBatch recordBatch )
253
+ internal Flatbuf . Buffer CurrentBuffer => RecordBatch . Buffers ( CurrentBufferIndex ) . GetValueOrDefault ( ) ;
254
+
255
+ internal Flatbuf . FieldNode CurrentNode => RecordBatch . Nodes ( CurrentNodeIndex ) . GetValueOrDefault ( ) ;
256
+
257
+ internal bool MoveNextBuffer ( )
242
258
{
243
- RecordBatch = recordBatch ;
259
+ return ++ CurrentBufferIndex < RecordBatch . BuffersLength ;
244
260
}
245
261
246
- internal Flatbuf . Buffer ShiftBuffer ( )
262
+ internal bool MoveNextNode ( )
247
263
{
248
- return RecordBatch . Buffers ( CurrentBufferIndex ++ ) . GetValueOrDefault ( ) ;
264
+ return ++ CurrentNodeIndex < RecordBatch . NodesLength ;
249
265
}
250
266
251
- internal Flatbuf . FieldNode ShiftNode ( )
267
+ internal RecordBatchEnumerator ( in Flatbuf . RecordBatch recordBatch )
252
268
{
253
- return RecordBatch . Nodes ( CurrentNodeIndex ++ ) . GetValueOrDefault ( ) ;
269
+ RecordBatch = recordBatch ;
254
270
}
255
-
256
271
}
257
272
}
0 commit comments