Skip to content

Commit 123d9b1

Browse files
bkietzpitrou
authored andcommitted
ARROW-7415: [C++][Dataset] implement IpcFormat
adds a basic IpcFormat and unit tests Closes #6162 from bkietz/7415-Dataset-Implement-IpcForm and squashes the following commits: 30e5007 <Benjamin Kietzman> review comments, clang-format 118787d <Benjamin Kietzman> lint fixes 3e7a7d4 <Benjamin Kietzman> ARROW-7415: Implement IpcFormat Authored-by: Benjamin Kietzman <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
1 parent 91114cf commit 123d9b1

File tree

10 files changed

+387
-21
lines changed

10 files changed

+387
-21
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ perf.data
4545
perf.data.old
4646

4747
cpp/.idea/
48+
.clangd/
4849
cpp/.clangd/
4950
cpp/apidoc/xml/
5051
docs/example.gz

cpp/src/arrow/dataset/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ set(ARROW_DATASET_SRCS
2323
dataset.cc
2424
discovery.cc
2525
file_base.cc
26+
file_ipc.cc
2627
filter.cc
2728
partition.cc
2829
projector.cc
@@ -100,6 +101,7 @@ endfunction()
100101
if(NOT WIN32)
101102
add_arrow_dataset_test(dataset_test)
102103
add_arrow_dataset_test(discovery_test)
104+
add_arrow_dataset_test(file_ipc_test)
103105
add_arrow_dataset_test(file_test)
104106
add_arrow_dataset_test(filter_test)
105107
add_arrow_dataset_test(partition_test)

cpp/src/arrow/dataset/file_ipc.cc

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/dataset/file_ipc.h"
19+
20+
#include <memory>
21+
#include <unordered_set>
22+
#include <utility>
23+
#include <vector>
24+
25+
#include "arrow/dataset/dataset_internal.h"
26+
#include "arrow/dataset/filter.h"
27+
#include "arrow/dataset/scanner.h"
28+
#include "arrow/ipc/reader.h"
29+
#include "arrow/table.h"
30+
#include "arrow/util/iterator.h"
31+
#include "arrow/util/range.h"
32+
33+
namespace arrow {
34+
namespace dataset {
35+
36+
Status WrapErrorWithSource(Status status, const FileSource& source) {
37+
if (status.ok()) {
38+
return Status::OK();
39+
}
40+
41+
return status.WithMessage("Could not open IPC input source '", source.path(),
42+
"': ", status.message());
43+
}
44+
45+
/// \brief A ScanTask backed by an Ipc file.
46+
class IpcScanTask : public ScanTask {
47+
public:
48+
IpcScanTask(FileSource source, std::shared_ptr<ScanOptions> options,
49+
std::shared_ptr<ScanContext> context)
50+
: ScanTask(std::move(options), std::move(context)), source_(std::move(source)) {}
51+
52+
Result<RecordBatchIterator> Execute() override {
53+
ARROW_ASSIGN_OR_RAISE(auto input, source_.Open());
54+
std::shared_ptr<RecordBatchReader> reader;
55+
RETURN_NOT_OK(
56+
WrapErrorWithSource(ipc::RecordBatchStreamReader::Open(input, &reader), source_));
57+
return MakeFunctionIterator([reader] { return reader->Next(); });
58+
}
59+
60+
private:
61+
FileSource source_;
62+
};
63+
64+
class IpcScanTaskIterator {
65+
public:
66+
static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,
67+
std::shared_ptr<ScanContext> context,
68+
FileSource source) {
69+
return ScanTaskIterator(
70+
IpcScanTaskIterator(std::move(options), std::move(context), std::move(source)));
71+
}
72+
73+
Result<std::shared_ptr<ScanTask>> Next() {
74+
if (once_) {
75+
// Iteration is done.
76+
return nullptr;
77+
}
78+
79+
once_ = true;
80+
return std::shared_ptr<ScanTask>(new IpcScanTask(source_, options_, context_));
81+
}
82+
83+
private:
84+
IpcScanTaskIterator(std::shared_ptr<ScanOptions> options,
85+
std::shared_ptr<ScanContext> context, FileSource source)
86+
: options_(std::move(options)),
87+
context_(std::move(context)),
88+
source_(std::move(source)) {}
89+
90+
bool once_ = false;
91+
std::shared_ptr<ScanOptions> options_;
92+
std::shared_ptr<ScanContext> context_;
93+
FileSource source_;
94+
};
95+
96+
Result<bool> IpcFileFormat::IsSupported(const FileSource& source) const {
97+
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
98+
ipc::DictionaryMemo dictionary_memo;
99+
std::shared_ptr<Schema> schema;
100+
return ipc::ReadSchema(input.get(), &dictionary_memo, &schema).ok();
101+
}
102+
103+
Result<std::shared_ptr<Schema>> IpcFileFormat::Inspect(const FileSource& source) const {
104+
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
105+
ipc::DictionaryMemo dictionary_memo;
106+
std::shared_ptr<Schema> schema;
107+
RETURN_NOT_OK(WrapErrorWithSource(
108+
ipc::ReadSchema(input.get(), &dictionary_memo, &schema), source));
109+
return schema;
110+
}
111+
112+
Result<ScanTaskIterator> IpcFileFormat::ScanFile(
113+
const FileSource& source, std::shared_ptr<ScanOptions> options,
114+
std::shared_ptr<ScanContext> context) const {
115+
return IpcScanTaskIterator::Make(options, context, source);
116+
}
117+
118+
Result<std::shared_ptr<DataFragment>> IpcFileFormat::MakeFragment(
119+
const FileSource& source, std::shared_ptr<ScanOptions> options) {
120+
return std::make_shared<IpcFragment>(source, options);
121+
}
122+
123+
} // namespace dataset
124+
} // namespace arrow

cpp/src/arrow/dataset/file_ipc.h

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <memory>
21+
#include <string>
22+
23+
#include "arrow/dataset/file_base.h"
24+
#include "arrow/dataset/type_fwd.h"
25+
#include "arrow/dataset/visibility.h"
26+
27+
namespace arrow {
28+
namespace dataset {
29+
30+
class ARROW_DS_EXPORT IpcScanOptions : public FileScanOptions {
31+
public:
32+
std::string file_type() const override { return "ipc"; }
33+
};
34+
35+
class ARROW_DS_EXPORT IpcWriteOptions : public FileWriteOptions {
36+
public:
37+
std::string file_type() const override { return "ipc"; }
38+
};
39+
40+
/// \brief A FileFormat implementation that reads from Ipc files
41+
class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
42+
public:
43+
std::string type_name() const override { return "ipc"; }
44+
45+
Result<bool> IsSupported(const FileSource& source) const override;
46+
47+
/// \brief Return the schema of the file if possible.
48+
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;
49+
50+
/// \brief Open a file for scanning
51+
Result<ScanTaskIterator> ScanFile(const FileSource& source,
52+
std::shared_ptr<ScanOptions> options,
53+
std::shared_ptr<ScanContext> context) const override;
54+
55+
Result<std::shared_ptr<DataFragment>> MakeFragment(
56+
const FileSource& source, std::shared_ptr<ScanOptions> options) override;
57+
};
58+
59+
class ARROW_DS_EXPORT IpcFragment : public FileDataFragment {
60+
public:
61+
IpcFragment(const FileSource& source, std::shared_ptr<ScanOptions> options)
62+
: FileDataFragment(source, std::make_shared<IpcFileFormat>(), options) {}
63+
64+
bool splittable() const override { return true; }
65+
};
66+
67+
} // namespace dataset
68+
} // namespace arrow

0 commit comments

Comments
 (0)