Skip to content

Commit 3e7a7d4

Browse files
committed
ARROW-7415: [C++][Dataset] Implement IpcFormat
1 parent bef87ef commit 3e7a7d4

File tree

8 files changed

+385
-18
lines changed

8 files changed

+385
-18
lines changed

cpp/src/arrow/dataset/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ set(ARROW_DATASET_SRCS
2323
dataset.cc
2424
discovery.cc
2525
file_base.cc
26+
file_ipc.cc
2627
filter.cc
2728
partition.cc
2829
projector.cc
@@ -104,6 +105,7 @@ if(NOT WIN32)
104105
add_arrow_dataset_test(filter_test)
105106
add_arrow_dataset_test(partition_test)
106107
add_arrow_dataset_test(scanner_test)
108+
add_arrow_dataset_test(file_ipc_test)
107109

108110
if(ARROW_PARQUET)
109111
add_arrow_dataset_test(file_parquet_test)

cpp/src/arrow/dataset/file_ipc.cc

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/dataset/file_ipc.h"
19+
20+
#include <memory>
21+
#include <unordered_set>
22+
#include <utility>
23+
#include <vector>
24+
25+
#include "arrow/dataset/dataset_internal.h"
26+
#include "arrow/dataset/filter.h"
27+
#include "arrow/dataset/scanner.h"
28+
#include "arrow/ipc/reader.h"
29+
#include "arrow/table.h"
30+
#include "arrow/util/iterator.h"
31+
#include "arrow/util/range.h"
32+
33+
namespace arrow {
34+
namespace dataset {
35+
36+
Status WrapErrorWithSource(Status status, const FileSource& source) {
37+
if (status.ok()) {
38+
return Status::OK();
39+
}
40+
41+
return status.WithMessage("Could not open parquet input source '", source.path(),
42+
"': ", status.message());
43+
}
44+
45+
/// \brief A ScanTask backed by an Ipc file.
46+
class IpcScanTask : public ScanTask {
47+
public:
48+
IpcScanTask(FileSource source, std::shared_ptr<ScanOptions> options,
49+
std::shared_ptr<ScanContext> context)
50+
: ScanTask(std::move(options), std::move(context)), source_(std::move(source)) {}
51+
52+
Result<RecordBatchIterator> Execute() {
53+
ARROW_ASSIGN_OR_RAISE(auto input, source_.Open());
54+
std::shared_ptr<RecordBatchReader> reader;
55+
RETURN_NOT_OK(
56+
WrapErrorWithSource(ipc::RecordBatchStreamReader::Open(input, &reader), source_));
57+
return MakeFunctionIterator([reader] { return reader->Next(); });
58+
}
59+
60+
private:
61+
FileSource source_;
62+
};
63+
64+
class IpcScanTaskIterator {
65+
public:
66+
static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,
67+
std::shared_ptr<ScanContext> context,
68+
FileSource source) {
69+
return ScanTaskIterator(
70+
IpcScanTaskIterator(std::move(options), std::move(context), std::move(source)));
71+
}
72+
73+
Result<std::shared_ptr<ScanTask>> Next() {
74+
if (once_) {
75+
// Iteration is done.
76+
return nullptr;
77+
}
78+
79+
once_ = true;
80+
return std::shared_ptr<ScanTask>(new IpcScanTask(source_, options_, context_));
81+
}
82+
83+
private:
84+
IpcScanTaskIterator(std::shared_ptr<ScanOptions> options,
85+
std::shared_ptr<ScanContext> context, FileSource source)
86+
: options_(std::move(options)),
87+
context_(std::move(context)),
88+
source_(std::move(source)) {}
89+
90+
bool once_ = false;
91+
std::shared_ptr<ScanOptions> options_;
92+
std::shared_ptr<ScanContext> context_;
93+
FileSource source_;
94+
};
95+
96+
Result<bool> IpcFileFormat::IsSupported(const FileSource& source) const {
97+
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
98+
ipc::DictionaryMemo dictionary_memo;
99+
std::shared_ptr<Schema> schema;
100+
return ipc::ReadSchema(input.get(), &dictionary_memo, &schema).ok();
101+
}
102+
103+
Result<std::shared_ptr<Schema>> IpcFileFormat::Inspect(const FileSource& source) const {
104+
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
105+
ipc::DictionaryMemo dictionary_memo;
106+
std::shared_ptr<Schema> schema;
107+
RETURN_NOT_OK(WrapErrorWithSource(
108+
ipc::ReadSchema(input.get(), &dictionary_memo, &schema), source));
109+
return schema;
110+
}
111+
112+
Result<ScanTaskIterator> IpcFileFormat::ScanFile(
113+
const FileSource& source, std::shared_ptr<ScanOptions> options,
114+
std::shared_ptr<ScanContext> context) const {
115+
return IpcScanTaskIterator::Make(options, context, source);
116+
}
117+
118+
Result<std::shared_ptr<DataFragment>> IpcFileFormat::MakeFragment(
119+
const FileSource& source, std::shared_ptr<ScanOptions> options) {
120+
return std::make_shared<IpcFragment>(source, options);
121+
}
122+
123+
} // namespace dataset
124+
} // namespace arrow
125+

cpp/src/arrow/dataset/file_ipc.h

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <memory>
21+
#include <string>
22+
23+
#include "arrow/dataset/file_base.h"
24+
#include "arrow/dataset/type_fwd.h"
25+
#include "arrow/dataset/visibility.h"
26+
27+
namespace arrow {
28+
namespace dataset {
29+
30+
class ARROW_DS_EXPORT IpcScanOptions : public FileScanOptions {
31+
public:
32+
std::string file_type() const override { return "ipc"; }
33+
};
34+
35+
class ARROW_DS_EXPORT IpcWriteOptions : public FileWriteOptions {
36+
public:
37+
std::string file_type() const override { return "ipc"; }
38+
};
39+
40+
/// \brief A FileFormat implementation that reads from Ipc files
41+
class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
42+
public:
43+
std::string type_name() const override { return "ipc"; }
44+
45+
Result<bool> IsSupported(const FileSource& source) const override;
46+
47+
/// \brief Return the schema of the file if possible.
48+
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;
49+
50+
/// \brief Open a file for scanning
51+
Result<ScanTaskIterator> ScanFile(const FileSource& source,
52+
std::shared_ptr<ScanOptions> options,
53+
std::shared_ptr<ScanContext> context) const override;
54+
55+
Result<std::shared_ptr<DataFragment>> MakeFragment(
56+
const FileSource& source, std::shared_ptr<ScanOptions> options) override;
57+
};
58+
59+
class ARROW_DS_EXPORT IpcFragment : public FileDataFragment {
60+
public:
61+
IpcFragment(const FileSource& source, std::shared_ptr<ScanOptions> options)
62+
: FileDataFragment(source, std::make_shared<IpcFileFormat>(), options) {}
63+
64+
bool splittable() const override { return true; }
65+
};
66+
67+
} // namespace dataset
68+
} // namespace arrow
69+
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/dataset/file_ipc.h"
19+
20+
#include <memory>
21+
#include <utility>
22+
#include <vector>
23+
24+
#include "arrow/dataset/dataset_internal.h"
25+
#include "arrow/dataset/filter.h"
26+
#include "arrow/dataset/test_util.h"
27+
#include "arrow/io/memory.h"
28+
#include "arrow/ipc/writer.h"
29+
#include "arrow/record_batch.h"
30+
#include "arrow/testing/generator.h"
31+
#include "arrow/testing/gtest_util.h"
32+
#include "arrow/testing/util.h"
33+
34+
namespace arrow {
35+
namespace dataset {
36+
37+
constexpr int64_t kDefaultOutputStreamSize = 1024;
38+
constexpr int64_t kBatchSize = 1UL << 12;
39+
constexpr int64_t kBatchRepetitions = 1 << 5;
40+
constexpr int64_t kNumRows = kBatchSize * kBatchRepetitions;
41+
42+
class ArrowIpcWriterMixin : public ::testing::Test {
43+
public:
44+
std::shared_ptr<Buffer> Write(std::vector<RecordBatchReader*> readers) {
45+
EXPECT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create(
46+
kDefaultOutputStreamSize, default_memory_pool()));
47+
auto writer_schema = readers[0]->schema();
48+
EXPECT_OK_AND_ASSIGN(auto writer,
49+
ipc::RecordBatchStreamWriter::Open(sink.get(), writer_schema));
50+
51+
for (auto reader : readers) {
52+
std::vector<std::shared_ptr<RecordBatch>> batches;
53+
ARROW_EXPECT_OK(reader->ReadAll(&batches));
54+
for (auto batch : batches) {
55+
AssertSchemaEqual(batch->schema(), writer_schema);
56+
ARROW_EXPECT_OK(writer->WriteRecordBatch(*batch));
57+
}
58+
}
59+
60+
EXPECT_OK_AND_ASSIGN(auto out, sink->Finish());
61+
return out;
62+
}
63+
64+
std::shared_ptr<Buffer> Write(RecordBatchReader* reader) {
65+
return Write(std::vector<RecordBatchReader*>{reader});
66+
}
67+
68+
std::shared_ptr<Buffer> Write(const Table& table) {
69+
EXPECT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create(
70+
kDefaultOutputStreamSize, default_memory_pool()));
71+
EXPECT_OK_AND_ASSIGN(auto writer,
72+
ipc::RecordBatchStreamWriter::Open(sink.get(), table.schema()));
73+
ARROW_EXPECT_OK(writer->WriteTable(table));
74+
EXPECT_OK_AND_ASSIGN(auto out, sink->Finish());
75+
return out;
76+
}
77+
};
78+
79+
class IpcBufferFixtureMixin : public ArrowIpcWriterMixin {
80+
public:
81+
std::unique_ptr<FileSource> GetFileSource(RecordBatchReader* reader) {
82+
auto buffer = Write(reader);
83+
return internal::make_unique<FileSource>(std::move(buffer));
84+
}
85+
86+
std::unique_ptr<FileSource> GetFileSource(std::vector<RecordBatchReader*> readers) {
87+
auto buffer = Write(std::move(readers));
88+
return internal::make_unique<FileSource>(std::move(buffer));
89+
}
90+
91+
std::unique_ptr<RecordBatchReader> GetRecordBatchReader() {
92+
auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
93+
int64_t i = 0;
94+
return MakeGeneratedRecordBatch(
95+
batch->schema(), [batch, i](std::shared_ptr<RecordBatch>* out) mutable {
96+
*out = i++ < kBatchRepetitions ? batch : nullptr;
97+
return Status::OK();
98+
});
99+
}
100+
101+
protected:
102+
std::shared_ptr<Schema> schema_ = schema({field("f64", float64())});
103+
};
104+
105+
class TestIpcFileFormat : public IpcBufferFixtureMixin {
106+
protected:
107+
std::shared_ptr<ScanOptions> opts_;
108+
std::shared_ptr<ScanContext> ctx_ = std::make_shared<ScanContext>();
109+
};
110+
111+
TEST_F(TestIpcFileFormat, ScanRecordBatchReader) {
112+
auto reader = GetRecordBatchReader();
113+
auto source = GetFileSource(reader.get());
114+
115+
opts_ = ScanOptions::Make(reader->schema());
116+
auto fragment = std::make_shared<IpcFragment>(*source, opts_);
117+
118+
ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_));
119+
int64_t row_count = 0;
120+
121+
for (auto maybe_task : scan_task_it) {
122+
ASSERT_OK_AND_ASSIGN(auto task, std::move(maybe_task));
123+
ASSERT_OK_AND_ASSIGN(auto rb_it, task->Execute());
124+
for (auto maybe_batch : rb_it) {
125+
ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch));
126+
row_count += batch->num_rows();
127+
}
128+
}
129+
130+
ASSERT_EQ(row_count, kNumRows);
131+
}
132+
133+
TEST_F(TestIpcFileFormat, OpenFailureWithRelevantError) {
134+
auto format = IpcFileFormat();
135+
136+
std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
137+
auto result = format.Inspect({buf});
138+
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("<Buffer>"),
139+
result.status());
140+
141+
constexpr auto file_name = "herp/derp";
142+
ASSERT_OK_AND_ASSIGN(
143+
auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
144+
result = format.Inspect({file_name, fs.get()});
145+
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr(file_name),
146+
result.status());
147+
}
148+
149+
// TODO(bkietz) extend IpcFileFormat to support projection pushdown
150+
// TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjected)
151+
// TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols)
152+
153+
TEST_F(TestIpcFileFormat, Inspect) {
154+
auto reader = GetRecordBatchReader();
155+
auto source = GetFileSource(reader.get());
156+
auto format = IpcFileFormat();
157+
158+
ASSERT_OK_AND_ASSIGN(auto actual, format.Inspect(*source.get()));
159+
EXPECT_EQ(*actual, *schema_);
160+
}
161+
162+
TEST_F(TestIpcFileFormat, IsSupported) {
163+
auto reader = GetRecordBatchReader();
164+
auto source = GetFileSource(reader.get());
165+
auto format = IpcFileFormat();
166+
167+
bool supported = false;
168+
169+
std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
170+
ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(FileSource(buf)));
171+
ASSERT_EQ(supported, false);
172+
173+
buf = std::make_shared<Buffer>(util::string_view("corrupted"));
174+
ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(FileSource(buf)));
175+
ASSERT_EQ(supported, false);
176+
177+
ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(*source));
178+
EXPECT_EQ(supported, true);
179+
}
180+
181+
} // namespace dataset
182+
} // namespace arrow

0 commit comments

Comments
 (0)