@@ -50,31 +50,29 @@ class ManagedAggregateState {
50
50
};
51
51
52
52
Status AggregateUnaryKernel::Call (FunctionContext* ctx, const Datum& input, Datum* out) {
53
- if (input.is_arraylike ()) {
54
- auto state = ManagedAggregateState::Make (aggregate_function_, ctx->memory_pool ());
55
- if (!state) return Status::OutOfMemory (" AggregateState allocation failed" );
53
+ if (!input.is_arraylike ()) {
54
+ return Status::Invalid (" AggregateKernel expects Array or ChunkedArray datum" );
55
+ }
56
+ auto state = ManagedAggregateState::Make (aggregate_function_, ctx->memory_pool ());
57
+ if (!state) return Status::OutOfMemory (" AggregateState allocation failed" );
56
58
57
- if (input.is_array ()) {
58
- auto array = input.make_array ();
59
- RETURN_NOT_OK (aggregate_function_->Consume (*array, state->mutable_data ()));
60
- } else {
61
- auto chunked_array = input.chunked_array ();
62
- for (int i = 0 ; i < chunked_array->num_chunks (); i++) {
63
- auto tmp_state =
64
- ManagedAggregateState::Make (aggregate_function_, ctx->memory_pool ());
65
- if (!tmp_state) return Status::OutOfMemory (" AggregateState allocation failed" );
66
- RETURN_NOT_OK (aggregate_function_->Consume (*chunked_array->chunk (i),
67
- tmp_state->mutable_data ()));
68
- RETURN_NOT_OK (
69
- aggregate_function_->Merge (tmp_state->mutable_data (), state->mutable_data ()));
70
- }
71
- }
72
- RETURN_NOT_OK (aggregate_function_->Finalize (state->mutable_data (), out));
59
+ if (input.is_array ()) {
60
+ auto array = input.make_array ();
61
+ RETURN_NOT_OK (aggregate_function_->Consume (*array, state->mutable_data ()));
73
62
} else {
74
- return Status::Invalid (" AggregateKernel expects Array or ChunkedArray datum" );
63
+ auto chunked_array = input.chunked_array ();
64
+ for (int i = 0 ; i < chunked_array->num_chunks (); i++) {
65
+ auto tmp_state =
66
+ ManagedAggregateState::Make (aggregate_function_, ctx->memory_pool ());
67
+ if (!tmp_state) return Status::OutOfMemory (" AggregateState allocation failed" );
68
+ RETURN_NOT_OK (aggregate_function_->Consume (*chunked_array->chunk (i),
69
+ tmp_state->mutable_data ()));
70
+ RETURN_NOT_OK (
71
+ aggregate_function_->Merge (tmp_state->mutable_data (), state->mutable_data ()));
72
+ }
75
73
}
76
74
77
- return Status::OK ( );
75
+ return aggregate_function_-> Finalize (state-> mutable_data (), out );
78
76
}
79
77
80
78
std::shared_ptr<DataType> AggregateUnaryKernel::out_type () const {
0 commit comments