Skip to content

Commit 7204e68

Browse files
lidavidmwesm
authored andcommitted
ARROW-7477: [Java][FlightRPC] set up gRPC reflection metadata
This sets up the gRPC/Protobuf reflection metadata, so that [gRPC reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) works. This is useful for debugging. Additionally, it means interceptors can do generic introspection on gRPC and Flight methods, without having to special-case them. (You might want to do this to implement path-based access control, for instance.) Closes #6114 from lidavidm/arrow-7477-reflection and squashes the following commits: 8b763dd <David Li> ARROW-7477: set up gRPC reflection metadata Authored-by: David Li <[email protected]> Signed-off-by: Wes McKinney <[email protected]>
1 parent 4634c89 commit 7204e68

File tree

2 files changed

+64
-2
lines changed

2 files changed

+64
-2
lines changed

java/flight/src/main/java/org/apache/arrow/flight/FlightBindingService.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.arrow.flight.auth.ServerAuthHandler;
2424
import org.apache.arrow.flight.impl.Flight;
2525
import org.apache.arrow.flight.impl.Flight.PutResult;
26+
import org.apache.arrow.flight.impl.FlightServiceGrpc;
2627
import org.apache.arrow.memory.BufferAllocator;
2728

2829
import com.google.common.collect.ImmutableSet;
@@ -32,6 +33,7 @@
3233
import io.grpc.MethodDescriptor.MethodType;
3334
import io.grpc.ServerMethodDefinition;
3435
import io.grpc.ServerServiceDefinition;
36+
import io.grpc.ServiceDescriptor;
3537
import io.grpc.protobuf.ProtoUtils;
3638
import io.grpc.stub.ServerCalls;
3739
import io.grpc.stub.StreamObserver;
@@ -61,6 +63,7 @@ public static MethodDescriptor<Flight.Ticket, ArrowMessage> getDoGetDescriptor(B
6163
.setSampledToLocalTracing(false)
6264
.setRequestMarshaller(ProtoUtils.marshaller(Flight.Ticket.getDefaultInstance()))
6365
.setResponseMarshaller(ArrowMessage.createMarshaller(allocator))
66+
.setSchemaDescriptor(FlightServiceGrpc.getDoGetMethod().getSchemaDescriptor())
6467
.build();
6568
}
6669

@@ -71,6 +74,7 @@ public static MethodDescriptor<ArrowMessage, Flight.PutResult> getDoPutDescripto
7174
.setSampledToLocalTracing(false)
7275
.setRequestMarshaller(ArrowMessage.createMarshaller(allocator))
7376
.setResponseMarshaller(ProtoUtils.marshaller(Flight.PutResult.getDefaultInstance()))
77+
.setSchemaDescriptor(FlightServiceGrpc.getDoPutMethod().getSchemaDescriptor())
7478
.build();
7579
}
7680

@@ -82,7 +86,21 @@ public ServerServiceDefinition bindService() {
8286

8387
final MethodDescriptor<ArrowMessage, Flight.PutResult> doPutDescriptor = getDoPutDescriptor(allocator);
8488

85-
ServerServiceDefinition.Builder serviceBuilder = ServerServiceDefinition.builder(FlightConstants.SERVICE);
89+
// Make sure we preserve SchemaDescriptor fields on methods so that gRPC reflection still works.
90+
final ServiceDescriptor.Builder serviceDescriptorBuilder = ServiceDescriptor.newBuilder(FlightConstants.SERVICE)
91+
.setSchemaDescriptor(baseDefinition.getServiceDescriptor().getSchemaDescriptor());
92+
serviceDescriptorBuilder.addMethod(doGetDescriptor);
93+
serviceDescriptorBuilder.addMethod(doPutDescriptor);
94+
for (MethodDescriptor<?, ?> definition : baseDefinition.getServiceDescriptor().getMethods()) {
95+
if (OVERRIDE_METHODS.contains(definition.getFullMethodName())) {
96+
continue;
97+
}
98+
99+
serviceDescriptorBuilder.addMethod(definition);
100+
}
101+
102+
final ServiceDescriptor serviceDescriptor = serviceDescriptorBuilder.build();
103+
ServerServiceDefinition.Builder serviceBuilder = ServerServiceDefinition.builder(serviceDescriptor);
86104
serviceBuilder.addMethod(doGetDescriptor, ServerCalls.asyncServerStreamingCall(new DoGetMethod(delegate)));
87105
serviceBuilder.addMethod(doPutDescriptor, ServerCalls.asyncBidiStreamingCall(new DoPutMethod(delegate)));
88106

java/flight/src/test/java/org/apache/arrow/flight/TestServerOptions.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,20 @@
1717

1818
package org.apache.arrow.flight;
1919

20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertNotNull;
22+
2023
import java.io.File;
24+
import java.util.HashMap;
25+
import java.util.Map;
2126
import java.util.concurrent.ExecutorService;
2227
import java.util.concurrent.Executors;
2328
import java.util.concurrent.atomic.AtomicBoolean;
2429
import java.util.function.Consumer;
2530

2631
import org.apache.arrow.flight.TestBasicOperation.Producer;
32+
import org.apache.arrow.flight.auth.ServerAuthHandler;
33+
import org.apache.arrow.flight.impl.FlightServiceGrpc;
2734
import org.apache.arrow.memory.BufferAllocator;
2835
import org.apache.arrow.memory.RootAllocator;
2936
import org.apache.arrow.vector.IntVector;
@@ -34,6 +41,8 @@
3441
import org.junit.runner.RunWith;
3542
import org.junit.runners.JUnit4;
3643

44+
import io.grpc.MethodDescriptor;
45+
import io.grpc.ServerServiceDefinition;
3746
import io.grpc.netty.NettyServerBuilder;
3847

3948
@RunWith(JUnit4.class)
@@ -69,7 +78,7 @@ public void defaultExecutorClosed() throws Exception {
6978
(location) -> FlightServer.builder(a, location, new NoOpFlightProducer())
7079
.build()
7180
)) {
72-
Assert.assertNotNull(server.grpcExecutor);
81+
assertNotNull(server.grpcExecutor);
7382
executor = server.grpcExecutor;
7483
}
7584
Assert.assertTrue(executor.isShutdown());
@@ -128,4 +137,39 @@ public void domainSocket() throws Exception {
128137
}
129138
}
130139
}
140+
141+
@Test
142+
public void checkReflectionMetadata() {
143+
// This metadata is needed for gRPC reflection to work.
144+
final ExecutorService executorService = Executors.newSingleThreadExecutor();
145+
try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) {
146+
final FlightBindingService service = new FlightBindingService(allocator, new NoOpFlightProducer(),
147+
ServerAuthHandler.NO_OP, executorService);
148+
final ServerServiceDefinition definition = service.bindService();
149+
assertEquals(FlightServiceGrpc.getServiceDescriptor().getSchemaDescriptor(),
150+
definition.getServiceDescriptor().getSchemaDescriptor());
151+
152+
final Map<String, MethodDescriptor<?, ?>> definedMethods = new HashMap<>();
153+
final Map<String, MethodDescriptor<?, ?>> serviceMethods = new HashMap<>();
154+
155+
// Make sure that the reflection metadata object is identical across all the places where it's accessible
156+
definition.getMethods().forEach(
157+
method -> definedMethods.put(method.getMethodDescriptor().getFullMethodName(), method.getMethodDescriptor()));
158+
definition.getServiceDescriptor().getMethods().forEach(
159+
method -> serviceMethods.put(method.getFullMethodName(), method));
160+
161+
for (final MethodDescriptor<?, ?> descriptor : FlightServiceGrpc.getServiceDescriptor().getMethods()) {
162+
final String methodName = descriptor.getFullMethodName();
163+
Assert.assertTrue("Method is missing from ServerServiceDefinition: " + methodName,
164+
definedMethods.containsKey(methodName));
165+
Assert.assertTrue("Method is missing from ServiceDescriptor: " + methodName,
166+
definedMethods.containsKey(methodName));
167+
168+
assertEquals(descriptor.getSchemaDescriptor(), definedMethods.get(methodName).getSchemaDescriptor());
169+
assertEquals(descriptor.getSchemaDescriptor(), serviceMethods.get(methodName).getSchemaDescriptor());
170+
}
171+
} finally {
172+
executorService.shutdown();
173+
}
174+
}
131175
}

0 commit comments

Comments
 (0)