@@ -29,7 +29,7 @@ public static <T> org.reactivestreams.Publisher<T> toReactive(
29
29
if (flowPublisher instanceof FlowPublisherFromReactive ) {
30
30
return (org .reactivestreams .Publisher <T >)(((FlowPublisherFromReactive <T >)flowPublisher ).reactive );
31
31
}
32
- return new ReactivePublisherFromFlow <>(flowPublisher );
32
+ return new ReactivePublisherFromFlow <T >(flowPublisher );
33
33
}
34
34
35
35
/**
@@ -49,7 +49,7 @@ public static <T> Flow.Publisher<T> toFlow(
49
49
if (reactivePublisher instanceof ReactivePublisherFromFlow ) {
50
50
return (Flow .Publisher <T >)(((ReactivePublisherFromFlow <T >)reactivePublisher ).flow );
51
51
}
52
- return new FlowPublisherFromReactive <>(reactivePublisher );
52
+ return new FlowPublisherFromReactive <T >(reactivePublisher );
53
53
}
54
54
55
55
/**
@@ -70,7 +70,7 @@ public static <T, U> org.reactivestreams.Processor<T, U> toReactive(
70
70
if (flowProcessor instanceof FlowToReactiveProcessor ) {
71
71
return (org .reactivestreams .Processor <T , U >)(((FlowToReactiveProcessor <T , U >)flowProcessor ).reactive );
72
72
}
73
- return new ReactiveToFlowProcessor <>(flowProcessor );
73
+ return new ReactiveToFlowProcessor <T , U >(flowProcessor );
74
74
}
75
75
76
76
/**
@@ -91,7 +91,7 @@ public static <T, U> Flow.Processor<T, U> toFlow(
91
91
if (reactiveProcessor instanceof ReactiveToFlowProcessor ) {
92
92
return (Flow .Processor <T , U >)(((ReactiveToFlowProcessor <T , U >)reactiveProcessor ).flow );
93
93
}
94
- return new FlowToReactiveProcessor <>(reactiveProcessor );
94
+ return new FlowToReactiveProcessor <T , U >(reactiveProcessor );
95
95
}
96
96
97
97
@@ -303,7 +303,7 @@ public void onComplete() {
303
303
304
304
@ Override
305
305
public void subscribe (org .reactivestreams .Subscriber <? super U > s ) {
306
- flow .subscribe (new FlowToReactiveSubscriber <>(s ));
306
+ flow .subscribe (new FlowToReactiveSubscriber <U >(s ));
307
307
}
308
308
}
309
309
@@ -342,7 +342,7 @@ public void onComplete() {
342
342
343
343
@ Override
344
344
public void subscribe (Flow .Subscriber <? super U > s ) {
345
- reactive .subscribe (new ReactiveToFlowSubscriber <>(s ));
345
+ reactive .subscribe (new ReactiveToFlowSubscriber <U >(s ));
346
346
}
347
347
}
348
348
@@ -360,7 +360,7 @@ public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
360
360
361
361
@ Override
362
362
public void subscribe (org .reactivestreams .Subscriber <? super T > reactive ) {
363
- flow .subscribe (new FlowToReactiveSubscriber <>(reactive ));
363
+ flow .subscribe (new FlowToReactiveSubscriber <T >(reactive ));
364
364
}
365
365
}
366
366
@@ -378,7 +378,7 @@ public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reac
378
378
379
379
@ Override
380
380
public void subscribe (Flow .Subscriber <? super T > flow ) {
381
- reactive .subscribe (new ReactiveToFlowSubscriber <>(flow ));
381
+ reactive .subscribe (new ReactiveToFlowSubscriber <T >(flow ));
382
382
}
383
383
}
384
384
0 commit comments