Skip to content

Commit 7f905a6

Browse files
takashi hashidaHashidaTKS
authored andcommitted
ARROW-7387: [C#] Support ListType Serialization
Closes #6030 from HashidaTKS/support_ListType_serialization and squashes the following commits: f2add6e <takashi hashida> Respond to changes of master branch bed012b <takashi hashida> Merge remote-tracking branch 'remotes/upstream/master' into support_ListType_serialization 35ff97b <takashi hashida> Format codes Remove extra brace f026d5a <takashi hashida> Respond to PR feedback cd6ad25 <takashi hashida> Don't use Linq Avoid unnecessary allocation Change RecordBatchManipulator to RecordBatchEnumerator 9b6dd27 <takashi hashida> Fix names 46ef481 <takashi hashida> Support ListType serialization / deserialization Lead-authored-by: takashi hashida <[email protected]> Co-authored-by: takashi hashida <[email protected]> Signed-off-by: Eric Erhardt <[email protected]>
1 parent 7204e68 commit 7f905a6

File tree

9 files changed

+262
-51
lines changed

9 files changed

+262
-51
lines changed

csharp/src/Apache.Arrow/Arrays/ListArray.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,21 @@ public ListArray(IArrowType dataType, int length,
3030
ArrowBuffer valueOffsetsBuffer, IArrowArray values,
3131
ArrowBuffer nullBitmapBuffer, int nullCount = 0, int offset = 0)
3232
: this(new ArrayData(dataType, length, nullCount, offset,
33-
new[] { nullBitmapBuffer, valueOffsetsBuffer }, new[] { values.Data }))
33+
new[] { nullBitmapBuffer, valueOffsetsBuffer }, new[] { values.Data }),
34+
values)
3435
{
35-
Values = values;
3636
}
3737

3838
public ListArray(ArrayData data)
39-
: base(data)
39+
: this(data, ArrowArrayFactory.BuildArray(data.Children[0]))
40+
{
41+
}
42+
43+
private ListArray(ArrayData data, IArrowArray values) : base(data)
4044
{
4145
data.EnsureBufferCount(2);
4246
data.EnsureDataType(ArrowTypeId.List);
47+
Values = values;
4348
}
4449

4550
public override void Accept(IArrowArrayVisitor visitor) => Accept(this, visitor);

csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs

Lines changed: 90 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
using System.IO;
2222
using System.Threading;
2323
using System.Threading.Tasks;
24+
using Apache.Arrow.Types;
2425

2526
namespace Apache.Arrow.Ipc
2627
{
@@ -114,35 +115,42 @@ private List<IArrowArray> BuildArrays(
114115
Flatbuf.RecordBatch recordBatchMessage)
115116
{
116117
var arrays = new List<IArrowArray>(recordBatchMessage.NodesLength);
117-
int bufferIndex = 0;
118118

119-
for (var n = 0; n < recordBatchMessage.NodesLength; n++)
119+
if (recordBatchMessage.NodesLength == 0)
120120
{
121-
Field field = schema.GetFieldByIndex(n);
122-
Flatbuf.FieldNode fieldNode = recordBatchMessage.Nodes(n).GetValueOrDefault();
121+
return arrays;
122+
}
123+
124+
var recordBatchEnumerator = new RecordBatchEnumerator(in recordBatchMessage);
125+
126+
do
127+
{
128+
var field = schema.GetFieldByIndex(recordBatchEnumerator.CurrentNodeIndex);
129+
var fieldNode = recordBatchEnumerator.CurrentNode;
123130

124-
ArrayData arrayData = field.DataType.IsFixedPrimitive() ?
125-
LoadPrimitiveField(field, fieldNode, recordBatchMessage, messageBuffer, ref bufferIndex) :
126-
LoadVariableField(field, fieldNode, recordBatchMessage, messageBuffer, ref bufferIndex);
131+
var arrayData = field.DataType.IsFixedPrimitive()
132+
? LoadPrimitiveField(ref recordBatchEnumerator, field, in fieldNode, messageBuffer)
133+
: LoadVariableField(ref recordBatchEnumerator, field, in fieldNode, messageBuffer);
127134

128135
arrays.Add(ArrowArrayFactory.BuildArray(arrayData));
129-
}
136+
} while (recordBatchEnumerator.MoveNextNode());
130137

131138
return arrays;
132139
}
133140

141+
134142
private ArrayData LoadPrimitiveField(
143+
ref RecordBatchEnumerator recordBatchEnumerator,
135144
Field field,
136-
Flatbuf.FieldNode fieldNode,
137-
Flatbuf.RecordBatch recordBatch,
138-
ByteBuffer bodyData,
139-
ref int bufferIndex)
145+
in Flatbuf.FieldNode fieldNode,
146+
ByteBuffer bodyData)
140147
{
141-
var nullBitmapBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
142-
var valueBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
143148

144-
ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, nullBitmapBuffer);
145-
ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, valueBuffer);
149+
ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
150+
recordBatchEnumerator.MoveNextBuffer();
151+
ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
152+
recordBatchEnumerator.MoveNextBuffer();
153+
146154

147155
var fieldLength = (int)fieldNode.Length;
148156
var fieldNullCount = (int)fieldNode.NullCount;
@@ -158,24 +166,25 @@ private ArrayData LoadPrimitiveField(
158166
}
159167

160168
var arrowBuff = new[] { nullArrowBuffer, valueArrowBuffer };
169+
var children = GetChildren(ref recordBatchEnumerator, field, bodyData);
161170

162-
return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff);
171+
return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff, children);
163172
}
164173

174+
165175
private ArrayData LoadVariableField(
176+
ref RecordBatchEnumerator recordBatchEnumerator,
166177
Field field,
167-
Flatbuf.FieldNode fieldNode,
168-
Flatbuf.RecordBatch recordBatch,
169-
ByteBuffer bodyData,
170-
ref int bufferIndex)
178+
in Flatbuf.FieldNode fieldNode,
179+
ByteBuffer bodyData)
171180
{
172-
var nullBitmapBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
173-
var offsetBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
174-
var valueBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
175181

176-
ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, nullBitmapBuffer);
177-
ArrowBuffer offsetArrowBuffer = BuildArrowBuffer(bodyData, offsetBuffer);
178-
ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, valueBuffer);
182+
ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
183+
recordBatchEnumerator.MoveNextBuffer();
184+
ArrowBuffer offsetArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
185+
recordBatchEnumerator.MoveNextBuffer();
186+
ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
187+
recordBatchEnumerator.MoveNextBuffer();
179188

180189
var fieldLength = (int)fieldNode.Length;
181190
var fieldNullCount = (int)fieldNode.NullCount;
@@ -191,8 +200,33 @@ private ArrayData LoadVariableField(
191200
}
192201

193202
var arrowBuff = new[] { nullArrowBuffer, offsetArrowBuffer, valueArrowBuffer };
203+
var children = GetChildren(ref recordBatchEnumerator, field, bodyData);
194204

195-
return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff);
205+
return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff, children);
206+
}
207+
208+
private ArrayData[] GetChildren(
209+
ref RecordBatchEnumerator recordBatchEnumerator,
210+
Field field,
211+
ByteBuffer bodyData)
212+
{
213+
if (!(field.DataType is NestedType type)) return null;
214+
215+
var childrenCount = type.Children.Count;
216+
var children = new ArrayData[childrenCount];
217+
for (var index = 0; index < childrenCount; index++)
218+
{
219+
Flatbuf.FieldNode childFieldNode = recordBatchEnumerator.CurrentNode;
220+
recordBatchEnumerator.MoveNextNode();
221+
222+
var childField = type.Children[index];
223+
var child = childField.DataType.IsFixedPrimitive()
224+
? LoadPrimitiveField(ref recordBatchEnumerator, childField, in childFieldNode, bodyData)
225+
: LoadVariableField(ref recordBatchEnumerator, childField, in childFieldNode, bodyData);
226+
227+
children[index] = child;
228+
}
229+
return children;
196230
}
197231

198232
private ArrowBuffer BuildArrowBuffer(ByteBuffer bodyData, Flatbuf.Buffer buffer)
@@ -209,4 +243,32 @@ private ArrowBuffer BuildArrowBuffer(ByteBuffer bodyData, Flatbuf.Buffer buffer)
209243
return new ArrowBuffer(data);
210244
}
211245
}
246+
247+
internal struct RecordBatchEnumerator
248+
{
249+
private Flatbuf.RecordBatch RecordBatch { get; }
250+
internal int CurrentBufferIndex { get; private set; }
251+
internal int CurrentNodeIndex { get; private set; }
252+
253+
internal Flatbuf.Buffer CurrentBuffer => RecordBatch.Buffers(CurrentBufferIndex).GetValueOrDefault();
254+
255+
internal Flatbuf.FieldNode CurrentNode => RecordBatch.Nodes(CurrentNodeIndex).GetValueOrDefault();
256+
257+
internal bool MoveNextBuffer()
258+
{
259+
return ++CurrentBufferIndex < RecordBatch.BuffersLength;
260+
}
261+
262+
internal bool MoveNextNode()
263+
{
264+
return ++CurrentNodeIndex < RecordBatch.NodesLength;
265+
}
266+
267+
internal RecordBatchEnumerator(in Flatbuf.RecordBatch recordBatch)
268+
{
269+
RecordBatch = recordBatch;
270+
CurrentBufferIndex = 0;
271+
CurrentNodeIndex = 0;
272+
}
273+
}
212274
}

csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
using System.IO;
2121
using System.Threading;
2222
using System.Threading.Tasks;
23+
using Apache.Arrow.Types;
2324
using FlatBuffers;
2425

2526
namespace Apache.Arrow.Ipc
@@ -108,7 +109,7 @@ private void CreateBuffers(BooleanArray array)
108109
}
109110

110111
private void CreateBuffers<T>(PrimitiveArray<T> array)
111-
where T: struct
112+
where T : struct
112113
{
113114
_buffers.Add(CreateBuffer(array.NullBitmapBuffer));
114115
_buffers.Add(CreateBuffer(array.ValueBuffer));
@@ -175,6 +176,42 @@ public ArrowStreamWriter(Stream baseStream, Schema schema, bool leaveOpen, IpcOp
175176
_options = options ?? IpcOptions.Default;
176177
}
177178

179+
180+
private void CreateSelfAndChildrenFieldNodes(ArrayData data)
181+
{
182+
if (data.DataType is NestedType)
183+
{
184+
// flatbuffer struct vectors have to be created in reverse order
185+
for (var i = data.Children.Length - 1; i >= 0; i--)
186+
{
187+
CreateSelfAndChildrenFieldNodes(data.Children[i]);
188+
}
189+
}
190+
Flatbuf.FieldNode.CreateFieldNode(Builder, data.Length, data.NullCount);
191+
}
192+
193+
private int CountAllNodes()
194+
{
195+
var count = 0;
196+
foreach (var arrowArray in Schema.Fields.Values)
197+
{
198+
CountSelfAndChildrenNodes(arrowArray.DataType, ref count);
199+
}
200+
return count;
201+
}
202+
203+
private void CountSelfAndChildrenNodes(IArrowType type, ref int count)
204+
{
205+
if (type is NestedType nestedType)
206+
{
207+
foreach (var childField in nestedType.Children)
208+
{
209+
CountSelfAndChildrenNodes(childField.DataType, ref count);
210+
}
211+
}
212+
count++;
213+
}
214+
178215
private protected async Task WriteRecordBatchInternalAsync(RecordBatch recordBatch,
179216
CancellationToken cancellationToken = default)
180217
{
@@ -192,13 +229,12 @@ private protected async Task WriteRecordBatchInternalAsync(RecordBatch recordBat
192229

193230
var fieldCount = Schema.Fields.Count;
194231

195-
Flatbuf.RecordBatch.StartNodesVector(Builder, fieldCount);
232+
Flatbuf.RecordBatch.StartNodesVector(Builder, CountAllNodes());
196233

197234
// flatbuffer struct vectors have to be created in reverse order
198235
for (var i = fieldCount - 1; i >= 0; i--)
199236
{
200-
var fieldArray = recordBatch.Column(i);
201-
Flatbuf.FieldNode.CreateFieldNode(Builder, fieldArray.Length, fieldArray.NullCount);
237+
CreateSelfAndChildrenFieldNodes(recordBatch.Column(i).Data);
202238
}
203239

204240
var fieldNodesVectorOffset = Builder.EndVector();
@@ -285,7 +321,7 @@ public virtual Task WriteRecordBatchAsync(RecordBatch recordBatch, CancellationT
285321
{
286322
return WriteRecordBatchInternalAsync(recordBatch, cancellationToken);
287323
}
288-
324+
289325
public async Task WriteEndAsync(CancellationToken cancellationToken = default)
290326
{
291327
if (!HasWrittenEnd)
@@ -307,19 +343,18 @@ private ValueTask WriteBufferAsync(ArrowBuffer arrowBuffer, CancellationToken ca
307343
// Build fields
308344

309345
var fieldOffsets = new Offset<Flatbuf.Field>[schema.Fields.Count];
310-
var fieldChildren = new List<Offset<Flatbuf.Field>>();
311346

312347
for (var i = 0; i < fieldOffsets.Length; i++)
313348
{
314349
var field = schema.GetFieldByIndex(i);
315350
var fieldNameOffset = Builder.CreateString(field.Name);
316351
var fieldType = _fieldTypeBuilder.BuildFieldType(field);
317352

318-
var fieldChildrenOffsets = Builder.CreateVectorOfTables(fieldChildren.ToArray());
353+
var fieldChildrenVectorOffset = Builder.CreateVectorOfTables(GetChildrenFieldOffsets(field));
319354

320355
fieldOffsets[i] = Flatbuf.Field.CreateField(Builder,
321356
fieldNameOffset, field.IsNullable, fieldType.Type, fieldType.Offset,
322-
default, fieldChildrenOffsets, default);
357+
default, fieldChildrenVectorOffset, default);
323358
}
324359

325360
var fieldsVectorOffset = Flatbuf.Schema.CreateFieldsVector(Builder, fieldOffsets);
@@ -332,6 +367,30 @@ private ValueTask WriteBufferAsync(ArrowBuffer arrowBuffer, CancellationToken ca
332367
Builder, endianness, fieldsVectorOffset);
333368
}
334369

370+
private protected Offset<Flatbuf.Field>[] GetChildrenFieldOffsets(Field field)
371+
{
372+
if (!(field.DataType is NestedType type))
373+
{
374+
return System.Array.Empty<Offset<Flatbuf.Field>>();
375+
}
376+
377+
var childrenCount = type.Children.Count;
378+
var children = new Offset<Flatbuf.Field>[childrenCount];
379+
380+
for (var i = 0; i < childrenCount; i++)
381+
{
382+
var childField = type.Children[i];
383+
var childFieldNameOffset = Builder.CreateString(childField.Name);
384+
var childFieldType = _fieldTypeBuilder.BuildFieldType(childField);
385+
var childFieldChildrenVectorOffset = Builder.CreateVectorOfTables(GetChildrenFieldOffsets(childField));
386+
387+
children[i] = Flatbuf.Field.CreateField(Builder,
388+
childFieldNameOffset, childField.IsNullable, childFieldType.Type, childFieldType.Offset,
389+
default, childFieldChildrenVectorOffset, default);
390+
}
391+
return children;
392+
}
393+
335394
private async ValueTask<Offset<Flatbuf.Schema>> WriteSchemaAsync(Schema schema, CancellationToken cancellationToken)
336395
{
337396
Builder.Clear();
@@ -357,7 +416,7 @@ await WriteMessageAsync(Flatbuf.MessageHeader.Schema, schemaOffset, 0, cancellat
357416
private async ValueTask<long> WriteMessageAsync<T>(
358417
Flatbuf.MessageHeader headerType, Offset<T> headerOffset, int bodyLength,
359418
CancellationToken cancellationToken)
360-
where T: struct
419+
where T : struct
361420
{
362421
var messageOffset = Flatbuf.Message.CreateMessage(
363422
Builder, CurrentMetadataVersion, headerType, headerOffset.Value,

csharp/src/Apache.Arrow/Ipc/ArrowTypeFlatbufferBuilder.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,10 @@ public void Visit(BinaryType type)
9898

9999
public void Visit(ListType type)
100100
{
101-
throw new NotImplementedException();
101+
Flatbuf.List.StartList(Builder);
102+
Result = FieldType.Build(
103+
Flatbuf.Type.List,
104+
Flatbuf.List.EndList(Builder));
102105
}
103106

104107
public void Visit(UnionType type)

csharp/src/Apache.Arrow/Ipc/MessageSerializer.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ private static Types.IArrowType GetFieldArrowType(Flatbuf.Field field)
125125
return new Types.StringType();
126126
case Flatbuf.Type.Binary:
127127
return Types.BinaryType.Default;
128+
case Flatbuf.Type.List:
129+
if (field.ChildrenLength != 1)
130+
{
131+
throw new InvalidDataException($"List type must have only one child.");
132+
}
133+
return new Types.ListType(GetFieldArrowType(field.Children(0).GetValueOrDefault()));
128134
default:
129135
throw new InvalidDataException($"Arrow primitive '{field.TypeType}' is unsupported.");
130136
}

csharp/src/Apache.Arrow/Types/ListType.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,19 @@
1717

1818
namespace Apache.Arrow.Types
1919
{
20-
public sealed class ListType : ArrowType
20+
public sealed class ListType : NestedType
2121
{
2222
public override ArrowTypeId TypeId => ArrowTypeId.List;
2323
public override string Name => "list";
2424

25-
public Field ValueField { get; }
26-
public IArrowType ValueDataType { get; }
25+
public Field ValueField => Children[0];
26+
27+
public IArrowType ValueDataType => Children[0].DataType;
2728

2829
public ListType(Field valueField)
29-
{
30-
ValueField = valueField ?? throw new ArgumentNullException(nameof(valueField));
31-
ValueDataType = ValueField.DataType;
32-
}
30+
: base(valueField) { }
3331

34-
public ListType(IArrowType valueDataType)
32+
public ListType(IArrowType valueDataType)
3533
: this(new Field("item", valueDataType, true)) { }
3634

3735
public override void Accept(IArrowTypeVisitor visitor) => Accept(this, visitor);

0 commit comments

Comments
 (0)