Skip to content

Commit 83104a4

Browse files
committed
Implement string accumulations with nanoarrow
1 parent a3650a9 commit 83104a4

12 files changed

+278
-49
lines changed

.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,9 @@ doc/source/savefig/
137137
# Interactive terminal generated files #
138138
########################################
139139
.jupyterlite.doit.db
140+
141+
# meson subproject files #
142+
##########################
143+
subprojects/*
144+
!subprojects/packagefiles
145+
!subprojects/*.wrap

.pre-commit-config.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ repos:
9898
rev: v19.1.6
9999
hooks:
100100
- id: clang-format
101-
files: ^pandas/_libs/src|^pandas/_libs/include
101+
files: ^pandas/_libs|pandas/_libs/src|^pandas/_libs/include
102102
args: [-i]
103103
types_or: [c, c++]
104104
- repo: local

environment.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ dependencies:
99
# build dependencies
1010
- versioneer
1111
- cython~=3.0.5
12-
- meson=1.2.1
13-
- meson-python=0.13.1
12+
- meson>=1.3.0
13+
- meson-python>=0.13.1
1414

1515
# test dependencies
1616
- pytest>=7.3.2

meson.build

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ project(
44
'c', 'cpp', 'cython',
55
version: run_command(['generate_version.py', '--print'], check: true).stdout().strip(),
66
license: 'BSD-3',
7-
meson_version: '>=1.2.1',
7+
meson_version: '>=1.3.0',
88
default_options: [
99
'buildtype=release',
1010
'c_std=c11',
11+
'cpp_std=c++20',
1112
'warning_level=2',
13+
'default_library=static',
1214
]
1315
)
1416

+214
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
#include <functional>
2+
#include <optional>
3+
#include <sstream>
4+
#include <string_view>
5+
#include <tuple>
6+
7+
#include <nanoarrow/nanoarrow.hpp>
8+
#include <nanobind/nanobind.h>
9+
#include <nanobind/stl/pair.h>
10+
#include <nanobind/stl/string.h>
11+
12+
using namespace nanoarrow::literals;
13+
namespace nb = nanobind;
14+
15+
static auto ReleaseArrowArray(void *ptr) noexcept -> void {
16+
auto array = static_cast<struct ArrowArray *>(ptr);
17+
if (array->release != nullptr) {
18+
ArrowArrayRelease(array);
19+
}
20+
21+
delete array;
22+
}
23+
24+
static auto ReleaseArrowSchema(void *ptr) noexcept -> void {
25+
auto schema = static_cast<struct ArrowSchema *>(ptr);
26+
if (schema->release != nullptr) {
27+
ArrowSchemaRelease(schema);
28+
}
29+
30+
delete schema;
31+
}
32+
33+
static auto CumSum(const struct ArrowArrayView *array_view,
34+
struct ArrowArray *out, bool skipna) {
35+
bool seen_na = false;
36+
std::stringstream ss{};
37+
38+
for (int64_t i = 0; i < array_view->length; i++) {
39+
const bool isna = ArrowArrayViewIsNull(array_view, i);
40+
if (!skipna && (seen_na || isna)) {
41+
seen_na = true;
42+
ArrowArrayAppendNull(out, 1);
43+
} else {
44+
if (!isna) {
45+
const auto std_sv = ArrowArrayViewGetStringUnsafe(array_view, i);
46+
ss << std::string_view{std_sv.data,
47+
static_cast<size_t>(std_sv.size_bytes)};
48+
}
49+
const auto str = ss.str();
50+
const ArrowStringView asv{str.c_str(), static_cast<int64_t>(str.size())};
51+
NANOARROW_THROW_NOT_OK(ArrowArrayAppendString(out, asv));
52+
}
53+
}
54+
}
55+
56+
template <typename T>
57+
concept MinOrMaxOp =
58+
std::same_as<T, std::less<>> || std::same_as<T, std::greater<>>;
59+
60+
template <auto Op>
61+
requires MinOrMaxOp<decltype(Op)>
62+
static auto CumMinOrMax(const struct ArrowArrayView *array_view,
63+
struct ArrowArray *out, bool skipna) {
64+
bool seen_na = false;
65+
std::optional<std::string> current_str{};
66+
67+
for (int64_t i = 0; i < array_view->length; i++) {
68+
const bool isna = ArrowArrayViewIsNull(array_view, i);
69+
if (!skipna && (seen_na || isna)) {
70+
seen_na = true;
71+
ArrowArrayAppendNull(out, 1);
72+
} else {
73+
if (!isna || current_str) {
74+
if (!isna) {
75+
const auto asv = ArrowArrayViewGetStringUnsafe(array_view, i);
76+
const nb::str pyval{asv.data, static_cast<size_t>(asv.size_bytes)};
77+
78+
if (current_str) {
79+
const nb::str pycurrent{current_str->data(), current_str->size()};
80+
if (Op(pyval, pycurrent)) {
81+
current_str =
82+
std::string{asv.data, static_cast<size_t>(asv.size_bytes)};
83+
}
84+
} else {
85+
current_str =
86+
std::string{asv.data, static_cast<size_t>(asv.size_bytes)};
87+
}
88+
}
89+
90+
struct ArrowStringView out_sv{
91+
current_str->data(), static_cast<int64_t>(current_str->size())};
92+
NANOARROW_THROW_NOT_OK(ArrowArrayAppendString(out, out_sv));
93+
} else {
94+
ArrowArrayAppendEmpty(out, 1);
95+
}
96+
}
97+
}
98+
}
99+
100+
class ArrowStringAccumulation {
101+
public:
102+
ArrowStringAccumulation(nb::object array_object, std::string accumulation,
103+
bool skipna)
104+
: skipna_(skipna) {
105+
if ((accumulation == "cumsum") || (accumulation == "cummin") ||
106+
(accumulation == "cummax")) {
107+
accumulation_ = std::move(accumulation);
108+
} else {
109+
const auto error_message =
110+
std::string("Unsupported accumulation: ") + accumulation;
111+
throw nb::value_error(error_message.c_str());
112+
}
113+
114+
const auto obj = nb::getattr(array_object, "__arrow_c_stream__")();
115+
const auto pycapsule_obj = nb::cast<nb::capsule>(obj);
116+
117+
const auto stream = static_cast<struct ArrowArrayStream *>(
118+
PyCapsule_GetPointer(pycapsule_obj.ptr(), "arrow_array_stream"));
119+
if (stream == nullptr) {
120+
throw std::invalid_argument("Invalid Arrow Stream capsule provided!");
121+
}
122+
123+
if (stream->get_schema(stream, schema_.get()) != 0) {
124+
std::string error_msg{stream->get_last_error(stream)};
125+
throw std::runtime_error("Could not read from arrow schema:" + error_msg);
126+
}
127+
struct ArrowSchemaView schema_view{};
128+
NANOARROW_THROW_NOT_OK(
129+
ArrowSchemaViewInit(&schema_view, schema_.get(), nullptr));
130+
131+
switch (schema_view.type) {
132+
case NANOARROW_TYPE_STRING:
133+
case NANOARROW_TYPE_LARGE_STRING:
134+
case NANOARROW_TYPE_STRING_VIEW:
135+
break;
136+
default:
137+
const auto error_message =
138+
std::string("Expected a string-like array type, got: ") +
139+
ArrowTypeString(schema_view.type);
140+
throw std::invalid_argument(error_message);
141+
}
142+
143+
ArrowArrayStreamMove(stream, stream_.get());
144+
}
145+
146+
std::pair<nb::capsule, nb::capsule> Accumulate(nb::object requested_schema) {
147+
struct ArrowSchemaView schema_view{};
148+
NANOARROW_THROW_NOT_OK(
149+
ArrowSchemaViewInit(&schema_view, schema_.get(), nullptr));
150+
auto uschema = nanoarrow::UniqueSchema{};
151+
ArrowSchemaInit(uschema.get());
152+
NANOARROW_THROW_NOT_OK(ArrowSchemaSetType(uschema.get(), schema_view.type));
153+
154+
// TODO: even though we are reading a stream we are returning an array
155+
// We should return a like sized stream of data in the future
156+
auto uarray_out = nanoarrow::UniqueArray{};
157+
NANOARROW_THROW_NOT_OK(
158+
ArrowArrayInitFromSchema(uarray_out.get(), uschema.get(), nullptr));
159+
160+
NANOARROW_THROW_NOT_OK(ArrowArrayStartAppending(uarray_out.get()));
161+
162+
nanoarrow::UniqueArray chunk{};
163+
int errcode{};
164+
165+
while ((errcode = ArrowArrayStreamGetNext(stream_.get(), chunk.get(),
166+
nullptr) == 0) &&
167+
chunk->release != nullptr) {
168+
struct ArrowArrayView array_view{};
169+
NANOARROW_THROW_NOT_OK(
170+
ArrowArrayViewInitFromSchema(&array_view, schema_.get(), nullptr));
171+
172+
NANOARROW_THROW_NOT_OK(
173+
ArrowArrayViewSetArray(&array_view, chunk.get(), nullptr));
174+
175+
if (accumulation_ == "cumsum") {
176+
CumSum(&array_view, uarray_out.get(), skipna_);
177+
} else if (accumulation_ == "cummin") {
178+
CumMinOrMax<std::less{}>(&array_view, uarray_out.get(), skipna_);
179+
} else if (accumulation_ == "cummax") {
180+
CumMinOrMax<std::greater{}>(&array_view, uarray_out.get(), skipna_);
181+
} else {
182+
throw std::runtime_error("Unexpected branch");
183+
}
184+
185+
chunk.reset();
186+
}
187+
188+
NANOARROW_THROW_NOT_OK(
189+
ArrowArrayFinishBuildingDefault(uarray_out.get(), nullptr));
190+
191+
auto out_schema = new struct ArrowSchema;
192+
ArrowSchemaMove(uschema.get(), out_schema);
193+
nb::capsule schema_capsule{out_schema, "arrow_schema", &ReleaseArrowSchema};
194+
195+
auto out_array = new struct ArrowArray;
196+
ArrowArrayMove(uarray_out.get(), out_array);
197+
nb::capsule array_capsule{out_array, "arrow_array", &ReleaseArrowArray};
198+
199+
return std::pair<nb::capsule, nb::capsule>{schema_capsule, array_capsule};
200+
}
201+
202+
private:
203+
nanoarrow::UniqueArrayStream stream_;
204+
nanoarrow::UniqueSchema schema_;
205+
std::string accumulation_;
206+
bool skipna_;
207+
};
208+
209+
NB_MODULE(arrow_string_accumulations, m) {
210+
nb::class_<ArrowStringAccumulation>(m, "ArrowStringAccumulation")
211+
.def(nb::init<nb::object, std::string, bool>())
212+
.def("__arrow_c_array__", &ArrowStringAccumulation::Accumulate,
213+
nb::arg("requested_schema") = nb::none());
214+
}

pandas/_libs/meson.build

+10
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,16 @@ foreach ext_name, ext_dict : libs_sources
122122
)
123123
endforeach
124124

125+
nanobind_dep = dependency('nanobind')
126+
nanoarrow_dep = dependency('nanoarrow')
127+
py.extension_module(
128+
'arrow_string_accumulations',
129+
sources: ['arrow_string_accumulations.cc'],
130+
dependencies: [nanobind_dep, nanoarrow_dep],
131+
subdir: 'pandas/_libs',
132+
install: true,
133+
)
134+
125135
# Basically just __init__.py and the .pyi files
126136
sources_to_install = [
127137
'__init__.py',

pandas/core/arrays/arrow/array.py

+3-43
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import numpy as np
1717

1818
from pandas._libs import lib
19+
import pandas._libs.arrow_string_accumulations as sa
1920
from pandas._libs.tslibs import (
2021
Timedelta,
2122
Timestamp,
@@ -1670,49 +1671,8 @@ def _str_accumulate(
16701671
msg = f"operation '{name}' not supported for dtype '{self.dtype}'"
16711672
raise TypeError(msg)
16721673

1673-
# We may need to strip out leading / trailing NA values
1674-
head: pa.array | None = None
1675-
tail: pa.array | None = None
1676-
pa_array = self._pa_array
1677-
np_func = {
1678-
"cumsum": np.cumsum,
1679-
"cummin": np.minimum.accumulate,
1680-
"cummax": np.maximum.accumulate,
1681-
}[name]
1682-
1683-
if self._hasna:
1684-
if skipna:
1685-
if name == "cumsum":
1686-
pa_array = pc.fill_null(pa_array, "")
1687-
else:
1688-
# After the first non-NA value we can retain the running min/max
1689-
# by forward filling.
1690-
pa_array = pc.fill_null_forward(pa_array)
1691-
# But any leading NA values should result in "".
1692-
nulls = pc.is_null(pa_array)
1693-
idx = pc.index(nulls, False).as_py()
1694-
if idx == -1:
1695-
idx = len(pa_array)
1696-
if idx > 0:
1697-
head = pa.array([""] * idx, type=pa_array.type)
1698-
pa_array = pa_array[idx:].combine_chunks()
1699-
else:
1700-
# When not skipping NA values, the result should be null from
1701-
# the first NA value onward.
1702-
nulls = pc.is_null(pa_array)
1703-
idx = pc.index(nulls, True).as_py()
1704-
tail = pa.nulls(len(pa_array) - idx, type=pa_array.type)
1705-
pa_array = pa_array[:idx].combine_chunks()
1706-
1707-
# error: Cannot call function of unknown type
1708-
pa_result = pa.array(np_func(pa_array), type=pa_array.type) # type: ignore[operator]
1709-
1710-
assert head is None or tail is None
1711-
if head is not None:
1712-
pa_result = pa.concat_arrays([head, pa_result])
1713-
elif tail is not None:
1714-
pa_result = pa.concat_arrays([pa_result, tail])
1715-
1674+
# TODO: we can use arrow_c_stream instead of arrow_c_array
1675+
pa_result = pa.array(sa.ArrowStringAccumulation(self._pa_array, name, skipna))
17161676
result = type(self)(pa_result)
17171677
return result
17181678

pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ parentdir_prefix = "pandas-"
145145

146146
[tool.meson-python.args]
147147
setup = ['--vsenv'] # For Windows
148+
install = ['--skip-subprojects']
148149

149150
[tool.cibuildwheel]
150151
skip = "cp36-* cp37-* cp38-* cp39-* pp* *_i686 *_ppc64le *_s390x"

requirements-dev.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
pip
55
versioneer[toml]
66
cython~=3.0.5
7-
meson[ninja]==1.2.1
8-
meson-python==0.13.1
7+
meson[ninja]>=1.3.0
8+
meson-python>=0.13.1
99
pytest>=7.3.2
1010
pytest-cov
1111
pytest-xdist>=3.4.0

subprojects/nanoarrow.wrap

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
[wrap-file]
2+
directory = apache-arrow-nanoarrow-0.6.0
3+
source_url = https://www.apache.org/dyn/closer.lua?action=download&filename=arrow/apache-arrow-nanoarrow-0.6.0/apache-arrow-nanoarrow-0.6.0.tar.gz
4+
source_fallback_url = https://github.com/mesonbuild/wrapdb/releases/download/nanoarrow_0.6.0-1/apache-arrow-nanoarrow-0.6.0.tar.gz
5+
source_filename = apache-arrow-nanoarrow-0.6.0.tar.gz
6+
source_hash = e4a02ac51002ad1875bf09317e70adb959005fad52b240ff59f73b970fa485d1
7+
wrapdb_version = 0.6.0-1
8+
9+
[provide]
10+
nanoarrow = nanoarrow_dep

subprojects/nanobind.wrap

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[wrap-file]
2+
directory = nanobind-2.4.0
3+
source_url = https://github.com/wjakob/nanobind/archive/refs/tags/v2.4.0.tar.gz
4+
source_filename = nanobind-2.4.0.tar.gz
5+
source_hash = bb35deaed7efac5029ed1e33880a415638352f757d49207a8e6013fefb6c49a7
6+
patch_filename = nanobind_2.4.0-2_patch.zip
7+
patch_url = https://wrapdb.mesonbuild.com/v2/nanobind_2.4.0-2/get_patch
8+
patch_hash = cf493bda0b11ea4e8d9dd42229c3bbdd52af88cc4aedac75a1eccb102b86dd4a
9+
source_fallback_url = https://github.com/mesonbuild/wrapdb/releases/download/nanobind_2.4.0-2/nanobind-2.4.0.tar.gz
10+
wrapdb_version = 2.4.0-2
11+
12+
[provide]
13+
nanobind = nanobind_dep

subprojects/robin-map.wrap

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[wrap-file]
2+
directory = robin-map-1.3.0
3+
source_url = https://github.com/Tessil/robin-map/archive/refs/tags/v1.3.0.tar.gz
4+
source_filename = robin-map-1.3.0.tar.gz
5+
source_hash = a8424ad3b0affd4c57ed26f0f3d8a29604f0e1f2ef2089f497f614b1c94c7236
6+
patch_filename = robin-map_1.3.0-1_patch.zip
7+
patch_url = https://wrapdb.mesonbuild.com/v2/robin-map_1.3.0-1/get_patch
8+
patch_hash = 6d090f988541ffb053512607e0942cbd0dbc2a4fa0563e44ff6a37e810b8c739
9+
source_fallback_url = https://github.com/mesonbuild/wrapdb/releases/download/robin-map_1.3.0-1/robin-map-1.3.0.tar.gz
10+
wrapdb_version = 1.3.0-1
11+
12+
[provide]
13+
robin-map = robin_map_dep

0 commit comments

Comments
 (0)