-
Notifications
You must be signed in to change notification settings - Fork 5.8k
/
Copy pathAbstractPipeline.java
796 lines (716 loc) · 31.2 KB
/
AbstractPipeline.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
/*
* Copyright (c) 2012, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Objects;
import java.util.Spliterator;
import java.util.function.IntFunction;
import java.util.function.Supplier;
/**
* Abstract base class for "pipeline" classes, which are the core
* implementations of the Stream interface and its primitive specializations.
* Manages construction and evaluation of stream pipelines.
*
* <p>An {@code AbstractPipeline} represents an initial portion of a stream
* pipeline, encapsulating a stream source and zero or more intermediate
* operations. The individual {@code AbstractPipeline} objects are often
* referred to as <em>stages</em>, where each stage describes either the stream
* source or an intermediate operation.
*
* <p>A concrete intermediate stage is generally built from an
* {@code AbstractPipeline}, a shape-specific pipeline class which extends it
* (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific
* concrete class which extends that. {@code AbstractPipeline} contains most of
* the mechanics of evaluating the pipeline, and implements methods that will be
* used by the operation; the shape-specific classes add helper methods for
* dealing with collection of results into the appropriate shape-specific
* containers.
*
* <p>After chaining a new intermediate operation, or executing a terminal
* operation, the stream is considered to be consumed, and no more intermediate
* or terminal operations are permitted on this stream instance.
*
* @implNote
* <p>For sequential streams, and parallel streams without
* <a href="package-summary.html#StreamOps">stateful intermediate
* operations</a>, parallel streams, pipeline evaluation is done in a single
* pass that "jams" all the operations together. For parallel streams with
* stateful operations, execution is divided into segments, where each
* stateful operations marks the end of a segment, and each segment is
* evaluated separately and the result used as the input to the next
* segment. In all cases, the source data is not consumed until a terminal
* operation begins.
*
* @param <E_IN> type of input elements
* @param <E_OUT> type of output elements
* @param <S> type of the subclass implementing {@code BaseStream}
* @since 1.8
*/
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
private static final String MSG_CONSUMED = "source already consumed or closed";
/**
* Backlink to the head of the pipeline chain (self if this is the source
* stage).
*/
@SuppressWarnings("rawtypes")
private final AbstractPipeline sourceStage;
/**
* The "upstream" pipeline, or null if this is the source stage.
*/
@SuppressWarnings("rawtypes")
protected final AbstractPipeline previousStage;
/**
* The operation flags for the intermediate operation represented by this
* pipeline object.
*/
protected final int sourceOrOpFlags;
/**
* The next stage in the pipeline, or null if this is the last stage.
* Effectively final at the point of linking to the next pipeline.
*/
@SuppressWarnings("rawtypes")
private AbstractPipeline nextStage;
/**
* The number of intermediate operations between this pipeline object
* and the stream source if sequential, or the previous stateful if parallel.
* Valid at the point of pipeline preparation for evaluation.
*/
private int depth;
/**
* The combined source and operation flags for the source and all operations
* up to and including the operation represented by this pipeline object.
* Valid at the point of pipeline preparation for evaluation.
*/
private int combinedFlags;
/**
* The source spliterator. Only valid for the head pipeline.
* Before the pipeline is consumed if non-null then {@code sourceSupplier}
* must be null. After the pipeline is consumed if non-null then is set to
* null.
*/
private Spliterator<?> sourceSpliterator;
/**
* The source supplier. Only valid for the head pipeline. Before the
* pipeline is consumed if non-null then {@code sourceSpliterator} must be
* null. After the pipeline is consumed if non-null then is set to null.
*/
private Supplier<? extends Spliterator<?>> sourceSupplier;
/**
* True if this pipeline has been linked or consumed
*/
private boolean linkedOrConsumed;
private Runnable sourceCloseAction;
/**
* True if pipeline is parallel, otherwise the pipeline is sequential; only
* valid for the source stage.
*/
private boolean parallel;
/**
* Constructor for the head of a stream pipeline.
*
* @param source {@code Supplier<Spliterator>} describing the stream source
* @param sourceFlags The source flags for the stream source, described in
* {@link StreamOpFlag}
* @param parallel True if the pipeline is parallel
*/
AbstractPipeline(Supplier<? extends Spliterator<?>> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSupplier = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
/**
* Constructor for the head of a stream pipeline.
*
* @param source {@code Spliterator} describing the stream source
* @param sourceFlags the source flags for the stream source, described in
* {@link StreamOpFlag}
* @param parallel {@code true} if the pipeline is parallel
*/
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
/**
* Constructor for appending an intermediate operation stage onto an
* existing pipeline.
*
* The previous stage must be unlinked and unconsumed.
*
* @param previousStage the upstream pipeline stage
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
* @throws IllegalStateException if previousStage is already linked or
* consumed
*/
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
this.depth = previousStage.depth + 1;
}
/**
* Constructor for replacing an intermediate operation stage onto an
* existing pipeline.
*
* @param previousPreviousStage the upstream pipeline stage of the upstream pipeline stage
* @param previousStage the upstream pipeline stage
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
* @throws IllegalStateException if previousStage is already linked or
* consumed
*/
protected AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousPreviousStage, AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed || !previousPreviousStage.linkedOrConsumed || previousPreviousStage.nextStage != previousStage || previousStage.previousStage != previousPreviousStage)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousPreviousStage.nextStage = this;
this.previousStage = previousPreviousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousPreviousStage.combinedFlags);
this.sourceStage = previousPreviousStage.sourceStage;
this.depth = previousPreviousStage.depth + 1;
}
/**
* Checks that the current stage has not been already linked or consumed,
* and then sets this stage as being linked or consumed.
*/
protected void linkOrConsume() {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
}
// Terminal evaluation methods
/**
* Evaluate the pipeline with a terminal operation to produce a result.
*
* @param <R> the type of result
* @param terminalOp the terminal operation to be applied to the pipeline.
* @return the result
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
/**
* Collect the elements output from the pipeline stage.
*
* @param generator the array generator to be used to create array instances
* @return a flat array-backed Node that holds the collected output elements
*/
@SuppressWarnings("unchecked")
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// If the last intermediate operation is stateful then
// evaluate directly to avoid an extra collection step
if (isParallel() && previousStage != null && opIsStateful()) {
// Set the depth of this, last, pipeline stage to zero to slice the
// pipeline such that this operation will not be included in the
// upstream slice and upstream operations will not be included
// in this slice
depth = 0;
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
}
else {
return evaluate(sourceSpliterator(0), true, generator);
}
}
/**
* Gets the source stage spliterator if this pipeline stage is the source
* stage. The pipeline is consumed after this method is called and
* returns successfully.
*
* @return the source stage spliterator
* @throws IllegalStateException if this pipeline stage is not the source
* stage.
*/
final Spliterator<E_OUT> sourceStageSpliterator() {
// Ensures that this method is only ever called on the sourceStage
if (this != sourceStage)
throw new IllegalStateException();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
if (sourceSpliterator != null) {
@SuppressWarnings("unchecked")
Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSpliterator;
sourceSpliterator = null;
return s;
}
else if (sourceSupplier != null) {
@SuppressWarnings("unchecked")
Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSupplier.get();
sourceSupplier = null;
return s;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
}
// BaseStream
@Override
@SuppressWarnings("unchecked")
public final S sequential() {
sourceStage.parallel = false;
return (S) this;
}
@Override
@SuppressWarnings("unchecked")
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
@Override
public void close() {
linkedOrConsumed = true;
sourceSupplier = null;
sourceSpliterator = null;
Runnable closeAction = sourceStage.sourceCloseAction;
if (closeAction != null) {
sourceStage.sourceCloseAction = null;
closeAction.run();
}
}
@Override
@SuppressWarnings("unchecked")
public S onClose(Runnable closeHandler) {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
Objects.requireNonNull(closeHandler);
Runnable existingHandler = sourceStage.sourceCloseAction;
sourceStage.sourceCloseAction =
(existingHandler == null)
? closeHandler
: Streams.composeWithExceptions(existingHandler, closeHandler);
return (S) this;
}
// Primitive specialization use co-variant overrides, hence is not final
@Override
@SuppressWarnings("unchecked")
public Spliterator<E_OUT> spliterator() {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
if (this == sourceStage) {
if (sourceStage.sourceSpliterator != null) {
@SuppressWarnings("unchecked")
Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
return s;
}
else if (sourceStage.sourceSupplier != null) {
@SuppressWarnings("unchecked")
Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
sourceStage.sourceSupplier = null;
return lazySpliterator(s);
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
}
else {
return wrap(this, () -> sourceSpliterator(0), isParallel());
}
}
@Override
public final boolean isParallel() {
return sourceStage.parallel;
}
/**
* Returns the composition of stream flags of the stream source and all
* intermediate operations.
*
* @return the composition of stream flags of the stream source and all
* intermediate operations
* @see StreamOpFlag
*/
final int getStreamFlags() {
return StreamOpFlag.toStreamFlags(combinedFlags);
}
/**
* Returns whether any of the stages of the current segment is stateful
* or not.
* @return {@code true} if any stage in this segment is stateful,
* {@code false} if not.
*/
protected final boolean hasAnyStateful() {
var result = false;
for (var u = sourceStage.nextStage;
u != null && !(result = u.opIsStateful()) && u != this;
u = u.nextStage) {
}
return result;
}
/**
* Returns whether any of the stages in the (entire) pipeline is short-circuiting
* or not.
* @return {@code true} if any stage in this pipeline is short-circuiting,
* {@code false} if not.
*/
protected final boolean isShortCircuitingPipeline() {
for (var u = sourceStage.nextStage; u != null; u = u.nextStage) {
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(u.combinedFlags))
return true;
}
return false;
}
/**
* Get the source spliterator for this pipeline stage. For a sequential or
* stateless parallel pipeline, this is the source spliterator. For a
* stateful parallel pipeline, this is a spliterator describing the results
* of all computations up to and including the most recent stateful
* operation.
*/
@SuppressWarnings("unchecked")
protected Spliterator<?> sourceSpliterator(int terminalFlags) {
// Get the source spliterator of the pipeline
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
spliterator = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
}
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
if (isParallel() && hasAnyStateful()) {
// Adapt the source spliterator, evaluating each stateful op
// in the pipeline up to and including this pipeline stage.
// The depth and flags of each pipeline stage are adjusted accordingly.
int depth = 1;
for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {
int thisOpFlags = p.sourceOrOpFlags;
if (p.opIsStateful()) {
depth = 0;
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
// Clear the short circuit flag for next pipeline stage
// This stage encapsulates short-circuiting, the next
// stage may not have any short-circuit operations, and
// if so spliterator.forEachRemaining should be used
// for traversal
thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
}
spliterator = p.opEvaluateParallelLazy(u, spliterator);
// Inject or clear SIZED on the source pipeline stage
// based on the stage's spliterator
thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
}
p.depth = depth++;
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
}
}
if (terminalFlags != 0) {
// Apply flags from the terminal operation to last pipeline stage
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
return spliterator;
}
// PipelineHelper
@Override
final StreamShape getSourceShape() {
@SuppressWarnings("rawtypes")
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) {
p = p.previousStage;
}
return p.getOutputShape();
}
@Override
final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
int flags = getStreamAndOpFlags();
long size = StreamOpFlag.SIZED.isKnown(flags) ? spliterator.getExactSizeIfKnown() : -1;
// Currently, we have no stateless SIZE_ADJUSTING intermediate operations,
// so we can simply ignore SIZE_ADJUSTING in parallel streams, since adjustments
// are already accounted in the input spliterator.
//
// If we ever have a stateless SIZE_ADJUSTING intermediate operation,
// we would need step back until depth == 0, then call exactOutputSize() for
// the subsequent stages.
if (size != -1 && StreamOpFlag.SIZE_ADJUSTING.isKnown(flags) && !isParallel()) {
// Skip the source stage as it's never SIZE_ADJUSTING
for (AbstractPipeline<?, ?, ?> stage = sourceStage.nextStage; stage != null; stage = stage.nextStage) {
size = stage.exactOutputSize(size);
}
}
return size;
}
/**
* Returns the exact output size of the pipeline given the exact size reported by the previous stage.
*
* @param previousSize the exact size reported by the previous stage
* @return the output size of this stage
*/
long exactOutputSize(long previousSize) {
return previousSize;
}
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
@Override
@SuppressWarnings("unchecked")
final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
@SuppressWarnings("rawtypes")
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) {
p = p.previousStage;
}
wrappedSink.begin(spliterator.getExactSizeIfKnown());
boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
return cancelled;
}
@Override
final int getStreamAndOpFlags() {
return combinedFlags;
}
final boolean isOrdered() {
return StreamOpFlag.ORDERED.isKnown(combinedFlags);
}
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
@Override
@SuppressWarnings("unchecked")
final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {
if (depth == 0) {
return (Spliterator<E_OUT>) sourceSpliterator;
}
else {
return wrap(this, () -> sourceSpliterator, isParallel());
}
}
@Override
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
boolean flatten,
IntFunction<E_OUT[]> generator) {
if (isParallel()) {
// @@@ Optimize if op of this pipeline stage is a stateful op
return evaluateToNode(this, spliterator, flatten, generator);
}
else {
Node.Builder<E_OUT> nb = makeNodeBuilder(
exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();
}
}
// Shape-specific abstract methods, implemented by XxxPipeline classes
/**
* Get the output shape of the pipeline. If the pipeline is the head,
* then its output shape corresponds to the shape of the source.
* Otherwise, its output shape corresponds to the output shape of the
* associated operation.
*
* @return the output shape
*/
abstract StreamShape getOutputShape();
/**
* Collect elements output from a pipeline into a Node that holds elements
* of this shape.
*
* @param helper the pipeline helper describing the pipeline stages
* @param spliterator the source spliterator
* @param flattenTree true if the returned node should be flattened
* @param generator the array generator
* @return a Node holding the output of the pipeline
*/
abstract <P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator,
boolean flattenTree,
IntFunction<E_OUT[]> generator);
/**
* Create a spliterator that wraps a source spliterator, compatible with
* this stream shape, and operations associated with a {@link
* PipelineHelper}.
*
* @param ph the pipeline helper describing the pipeline stages
* @param supplier the supplier of a spliterator
* @return a wrapping spliterator compatible with this shape
*/
abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,
Supplier<Spliterator<P_IN>> supplier,
boolean isParallel);
/**
* Create a lazy spliterator that wraps and obtains the supplied the
* spliterator when a method is invoked on the lazy spliterator.
* @param supplier the supplier of a spliterator
*/
abstract Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier);
/**
* Traverse the elements of a spliterator compatible with this stream shape,
* pushing those elements into a sink. If the sink requests cancellation,
* no further elements will be pulled or pushed.
*
* @param spliterator the spliterator to pull elements from
* @param sink the sink to push elements to
* @return true if the cancellation was requested
*/
abstract boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
/**
* Make a node builder compatible with this stream shape.
*
* @param exactSizeIfKnown if {@literal >=0}, then a node builder will be
* created that has a fixed capacity of at most sizeIfKnown elements. If
* {@literal < 0}, then the node builder has an unfixed capacity. A fixed
* capacity node builder will throw exceptions if an element is added after
* builder has reached capacity, or is built before the builder has reached
* capacity.
*
* @param generator the array generator to be used to create instances of a
* T[] array. For implementations supporting primitive nodes, this parameter
* may be ignored.
* @return a node builder
*/
@Override
abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
IntFunction<E_OUT[]> generator);
// Op-specific abstract methods, implemented by the operation class
/**
* Returns whether this operation is stateful or not. If it is stateful,
* then the method
* {@link #opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}
* must be overridden.
*
* @return {@code true} if this operation is stateful
*/
abstract boolean opIsStateful();
/**
* Accepts a {@code Sink} which will receive the results of this operation,
* and return a {@code Sink} which accepts elements of the input type of
* this operation and which performs the operation, passing the results to
* the provided {@code Sink}.
*
* @apiNote
* The implementation may use the {@code flags} parameter to optimize the
* sink wrapping. For example, if the input is already {@code DISTINCT},
* the implementation for the {@code Stream#distinct()} method could just
* return the sink it was passed.
*
* @param flags The combined stream and operation flags up to, but not
* including, this operation
* @param sink sink to which elements should be sent after processing
* @return a sink which accepts elements, perform the operation upon
* each element, and passes the results (if any) to the provided
* {@code Sink}.
*/
abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
/**
* Performs a parallel evaluation of the operation using the specified
* {@code PipelineHelper} which describes the upstream intermediate
* operations. Only called on stateful operations. If {@link
* #opIsStateful()} returns true then implementations must override the
* default implementation.
*
* @implSpec The default implementation always throw
* {@code UnsupportedOperationException}.
*
* @param helper the pipeline helper describing the pipeline stages
* @param spliterator the source {@code Spliterator}
* @param generator the array generator
* @return a {@code Node} describing the result of the evaluation
*/
<P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<E_OUT[]> generator) {
throw new UnsupportedOperationException("Parallel evaluation is not supported");
}
/**
* Returns a {@code Spliterator} describing a parallel evaluation of the
* operation, using the specified {@code PipelineHelper} which describes the
* upstream intermediate operations. Only called on stateful operations.
* It is not necessary (though acceptable) to do a full computation of the
* result here; it is preferable, if possible, to describe the result via a
* lazily evaluated spliterator.
*
* @implSpec The default implementation behaves as if:
* <pre>{@code
* return evaluateParallel(helper, i -> (E_OUT[]) new
* Object[i]).spliterator();
* }</pre>
* and is suitable for implementations that cannot do better than a full
* synchronous evaluation.
*
* @param helper the pipeline helper
* @param spliterator the source {@code Spliterator}
* @return a {@code Spliterator} describing the result of the evaluation
*/
<P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator) {
return opEvaluateParallel(helper, spliterator, Nodes.castingArray()).spliterator();
}
}