18
18
19
19
import static com .google .cloud .spanner .TransactionRunner .TransactionCallable ;
20
20
21
+ import com .google .api .gax .core .FixedCredentialsProvider ;
21
22
import com .google .api .gax .longrunning .OperationFuture ;
22
23
import com .google .api .gax .paging .Page ;
23
24
import com .google .api .gax .retrying .RetrySettings ;
70
71
import com .google .cloud .spanner .TimestampBound ;
71
72
import com .google .cloud .spanner .TransactionContext ;
72
73
import com .google .cloud .spanner .TransactionRunner ;
74
+ import com .google .cloud .spanner .TransactionRunner .TransactionCallable ;
73
75
import com .google .cloud .spanner .Type ;
74
76
import com .google .cloud .spanner .Value ;
75
77
import com .google .cloud .spanner .encryption .CustomerManagedEncryption ;
76
78
import com .google .cloud .spanner .v1 .stub .SpannerStubSettings ;
79
+ import com .google .cloud .trace .v1 .TraceServiceClient ;
80
+ import com .google .cloud .trace .v1 .TraceServiceSettings ;
77
81
import com .google .common .base .Function ;
78
82
import com .google .common .base .Joiner ;
79
83
import com .google .common .base .Preconditions ;
80
84
import com .google .common .collect .Lists ;
81
85
import com .google .common .util .concurrent .ThreadFactoryBuilder ;
86
+ import com .google .devtools .cloudtrace .v1 .GetTraceRequest ;
87
+ import com .google .devtools .cloudtrace .v1 .Trace ;
88
+ import com .google .devtools .cloudtrace .v1 .TraceSpan ;
82
89
import com .google .longrunning .Operation ;
83
90
import com .google .protobuf .ByteString ;
84
91
import com .google .protobuf .util .Timestamps ;
152
159
import com .google .spanner .v1 .TypeCode ;
153
160
import io .grpc .Status ;
154
161
import io .grpc .stub .StreamObserver ;
162
+ import io .opentelemetry .api .trace .Span ;
163
+ import io .opentelemetry .api .trace .Tracer ;
164
+ import io .opentelemetry .context .Scope ;
155
165
import java .io .ByteArrayInputStream ;
156
166
import java .io .File ;
157
167
import java .io .IOException ;
166
176
import java .util .Objects ;
167
177
import java .util .concurrent .ExecutionException ;
168
178
import java .util .concurrent .Executor ;
179
+ import java .util .concurrent .ExecutorService ;
169
180
import java .util .concurrent .Executors ;
181
+ import java .util .concurrent .Future ;
170
182
import java .util .concurrent .TimeUnit ;
171
183
import java .util .logging .Level ;
172
184
import java .util .logging .Logger ;
@@ -332,24 +344,28 @@ public void startRWTransaction() throws Exception {
332
344
// Try to commit
333
345
return null ;
334
346
};
347
+ io .opentelemetry .context .Context context = io .opentelemetry .context .Context .current ();
335
348
Runnable runnable =
336
- () -> {
337
- try {
338
- runner =
339
- optimistic
340
- ? dbClient .readWriteTransaction (Options .optimisticLock ())
341
- : dbClient .readWriteTransaction ();
342
- LOGGER .log (Level .INFO , String .format ("Ready to run callable %s\n " , transactionSeed ));
343
- runner .run (callable );
344
- transactionSucceeded (runner .getCommitTimestamp ().toProto ());
345
- } catch (SpannerException e ) {
346
- LOGGER .log (
347
- Level .WARNING ,
348
- String .format ("Transaction runnable failed with exception %s\n " , e .getMessage ()),
349
- e );
350
- transactionFailed (e );
351
- }
352
- };
349
+ context .wrap (
350
+ () -> {
351
+ try {
352
+ runner =
353
+ optimistic
354
+ ? dbClient .readWriteTransaction (Options .optimisticLock ())
355
+ : dbClient .readWriteTransaction ();
356
+ LOGGER .log (
357
+ Level .INFO , String .format ("Ready to run callable %s\n " , transactionSeed ));
358
+ runner .run (callable );
359
+ transactionSucceeded (runner .getCommitTimestamp ().toProto ());
360
+ } catch (SpannerException e ) {
361
+ LOGGER .log (
362
+ Level .WARNING ,
363
+ String .format (
364
+ "Transaction runnable failed with exception %s\n " , e .getMessage ()),
365
+ e );
366
+ transactionFailed (e );
367
+ }
368
+ });
353
369
LOGGER .log (
354
370
Level .INFO ,
355
371
String .format ("Callable and Runnable created, ready to execute %s\n " , transactionSeed ));
@@ -753,6 +769,11 @@ public synchronized void closeBatchTxn() throws SpannerException {
753
769
Executors .newCachedThreadPool (
754
770
new ThreadFactoryBuilder ().setNameFormat ("action-pool-%d" ).build ());
755
771
772
+ // Thread pool to verify end to end traces.
773
+ private static final ExecutorService endToEndTracesThreadPool =
774
+ Executors .newCachedThreadPool (
775
+ new ThreadFactoryBuilder ().setNameFormat ("end-to-end-traces-pool-%d" ).build ());
776
+
756
777
private synchronized Spanner getClientWithTimeout (
757
778
long timeoutSeconds , boolean useMultiplexedSession ) throws IOException {
758
779
if (clientWithTimeout != null ) {
@@ -818,6 +839,8 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
818
839
.setHost (HOST_PREFIX + WorkerProxy .spannerPort )
819
840
.setCredentials (credentials )
820
841
.setChannelProvider (channelProvider )
842
+ .setEnableEndToEndTracing (true )
843
+ .setOpenTelemetry (WorkerProxy .openTelemetrySdk )
821
844
.setSessionPoolOption (sessionPoolOptions );
822
845
823
846
SpannerStubSettings .Builder stubSettingsBuilder =
@@ -841,6 +864,88 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
841
864
return optionsBuilder .build ().getService ();
842
865
}
843
866
867
+ private TraceServiceClient traceServiceClient ;
868
+
869
+ // Return the trace service client, create one if not exists.
870
+ private synchronized TraceServiceClient getTraceServiceClient () throws IOException {
871
+ if (traceServiceClient != null ) {
872
+ return traceServiceClient ;
873
+ }
874
+ // Create a trace service client
875
+ Credentials credentials ;
876
+ if (WorkerProxy .serviceKeyFile .isEmpty ()) {
877
+ credentials = NoCredentials .getInstance ();
878
+ } else {
879
+ credentials =
880
+ GoogleCredentials .fromStream (
881
+ new ByteArrayInputStream (
882
+ FileUtils .readFileToByteArray (new File (WorkerProxy .serviceKeyFile ))),
883
+ HTTP_TRANSPORT_FACTORY );
884
+ }
885
+
886
+ TraceServiceSettings traceServiceSettings =
887
+ TraceServiceSettings .newBuilder ()
888
+ .setEndpoint (WorkerProxy .CLOUD_TRACE_ENDPOINT )
889
+ .setCredentialsProvider (FixedCredentialsProvider .create (credentials ))
890
+ .build ();
891
+
892
+ traceServiceClient = TraceServiceClient .create (traceServiceSettings );
893
+ return traceServiceClient ;
894
+ }
895
+
896
+ public Future <Boolean > getEndToEndTraceVerificationTask (String traceId ) {
897
+ return endToEndTracesThreadPool .submit (
898
+ () -> {
899
+ try {
900
+ // Wait for 10 seconds before verifying to ensure traces are exported.
901
+ long sleepDuration = TimeUnit .SECONDS .toMillis (10 );
902
+ LOGGER .log (
903
+ Level .INFO ,
904
+ String .format (
905
+ "Sleeping for %d milliseconds before verifying end to end trace" ,
906
+ sleepDuration ));
907
+ Thread .sleep (sleepDuration );
908
+ } catch (InterruptedException e ) {
909
+ Thread .currentThread ().interrupt (); // Handle interruption
910
+ LOGGER .log (Level .INFO , String .format ("Thread interrupted." ));
911
+ return false ; // Return false if interrupted
912
+ }
913
+ return isExportedEndToEndTraceValid (traceId );
914
+ });
915
+ }
916
+
917
+ private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction" ;
918
+ private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction" ;
919
+
920
+ /* Returns whether a exported trace is valid. */
921
+ public boolean isExportedEndToEndTraceValid (String traceId ) {
922
+ try {
923
+ GetTraceRequest getTraceRequest =
924
+ GetTraceRequest .newBuilder ()
925
+ .setProjectId (WorkerProxy .PROJECT_ID )
926
+ .setTraceId (traceId )
927
+ .build ();
928
+ Trace trace = getTraceServiceClient ().getTrace (getTraceRequest );
929
+ boolean readWriteOrReadOnlyTxnPresent = false , spannerServerSideSpanPresent = false ;
930
+ for (TraceSpan span : trace .getSpansList ()) {
931
+ if (span .getName ().contains (READ_ONLY_TRANSACTION )
932
+ || span .getName ().contains (READ_WRITE_TRANSACTION )) {
933
+ readWriteOrReadOnlyTxnPresent = true ;
934
+ }
935
+ if (span .getName ().startsWith ("Spanner." )) {
936
+ spannerServerSideSpanPresent = true ;
937
+ }
938
+ }
939
+ if (readWriteOrReadOnlyTxnPresent && !spannerServerSideSpanPresent ) {
940
+ return false ;
941
+ }
942
+ } catch (Exception e ) {
943
+ LOGGER .log (Level .WARNING , "Failed to verify end to end trace." , e );
944
+ return false ;
945
+ }
946
+ return true ;
947
+ }
948
+
844
949
/** Handle actions. */
845
950
public Status startHandlingRequest (
846
951
SpannerAsyncActionRequest req , ExecutionFlowContext executionContext ) {
@@ -865,17 +970,20 @@ public Status startHandlingRequest(
865
970
useMultiplexedSession = false ;
866
971
}
867
972
973
+ io .opentelemetry .context .Context context = io .opentelemetry .context .Context .current ();
868
974
actionThreadPool .execute (
869
- () -> {
870
- Status status =
871
- executeAction (outcomeSender , action , dbPath , useMultiplexedSession , executionContext );
872
- if (!status .isOk ()) {
873
- LOGGER .log (
874
- Level .WARNING ,
875
- String .format ("Failed to execute action with error: %s\n %s" , status , action ));
876
- executionContext .onError (status .getCause ());
877
- }
878
- });
975
+ context .wrap (
976
+ () -> {
977
+ Status status =
978
+ executeAction (
979
+ outcomeSender , action , dbPath , useMultiplexedSession , executionContext );
980
+ if (!status .isOk ()) {
981
+ LOGGER .log (
982
+ Level .WARNING ,
983
+ String .format ("Failed to execute action with error: %s\n %s" , status , action ));
984
+ executionContext .onError (status .getCause ());
985
+ }
986
+ }));
879
987
return Status .OK ;
880
988
}
881
989
@@ -886,7 +994,10 @@ private Status executeAction(
886
994
String dbPath ,
887
995
boolean useMultiplexedSession ,
888
996
ExecutionFlowContext executionContext ) {
889
-
997
+ Tracer tracer = WorkerProxy .openTelemetrySdk .getTracer (CloudClientExecutor .class .getName ());
998
+ String actionType = action .getActionCase ().toString ();
999
+ Span span = tracer .spanBuilder (String .format ("performaction_%s" , actionType )).startSpan ();
1000
+ Scope scope = span .makeCurrent ();
890
1001
try {
891
1002
if (action .hasAdmin ()) {
892
1003
return executeAdminAction (useMultiplexedSession , action .getAdmin (), outcomeSender );
@@ -959,11 +1070,15 @@ private Status executeAction(
959
1070
ErrorCode .UNIMPLEMENTED , "Not implemented yet: \n " + action )));
960
1071
}
961
1072
} catch (Exception e ) {
1073
+ span .recordException (e );
962
1074
LOGGER .log (Level .WARNING , "Unexpected error: " + e .getMessage ());
963
1075
return outcomeSender .finishWithError (
964
1076
toStatus (
965
1077
SpannerExceptionFactory .newSpannerException (
966
1078
ErrorCode .INVALID_ARGUMENT , "Unexpected error: " + e .getMessage ())));
1079
+ } finally {
1080
+ scope .close ();
1081
+ span .end ();
967
1082
}
968
1083
}
969
1084
0 commit comments