@@ -280,6 +280,28 @@ type StartReplicationOptions struct {
280
280
PluginArgs []string
281
281
}
282
282
283
+ type errEndTimeline struct {
284
+ nextTli int64
285
+ nextTliStartpos LSN
286
+ }
287
+
288
+ func (e errEndTimeline ) Error () string {
289
+ return "start replication with a switch point"
290
+ }
291
+
292
+ func (e errEndTimeline ) ErrEndTimeline () (int64 , LSN ) {
293
+ return e .nextTli , e .nextTliStartpos
294
+ }
295
+
296
+ func IsErrEndTimeline (err error ) (int64 , LSN , bool ) {
297
+ e , ok := err .(interface { ErrEndTimeline () (int64 , LSN ) })
298
+ if ! ok {
299
+ return 0 , 0 , false
300
+ }
301
+ nextTli , nextTliStartpos := e .ErrEndTimeline ()
302
+ return nextTli , nextTliStartpos , true
303
+ }
304
+
283
305
// StartReplication begins the replication process by executing the START_REPLICATION command.
284
306
func StartReplication (ctx context.Context , conn * pgconn.PgConn , slotName string , startLSN LSN , options StartReplicationOptions ) error {
285
307
var timelineString string
@@ -303,6 +325,10 @@ func StartReplication(ctx context.Context, conn *pgconn.PgConn, slotName string,
303
325
return fmt .Errorf ("failed to send START_REPLICATION: %w" , err )
304
326
}
305
327
328
+ var (
329
+ nextTli int64
330
+ nextTliStartpos LSN
331
+ )
306
332
for {
307
333
msg , err := conn .ReceiveMessage (ctx )
308
334
if err != nil {
@@ -316,6 +342,32 @@ func StartReplication(ctx context.Context, conn *pgconn.PgConn, slotName string,
316
342
case * pgproto3.CopyBothResponse :
317
343
// This signals the start of the replication stream.
318
344
return nil
345
+ case * pgproto3.RowDescription :
346
+ if options .Mode != PhysicalReplication {
347
+ return fmt .Errorf ("received row RowDescription message in logical replication" )
348
+ }
349
+ if len (msg .Fields ) != 2 || string (msg .Fields [0 ].Name ) != "next_tli" || string (msg .Fields [1 ].Name ) != "next_tli_startpos" {
350
+ return fmt .Errorf ("expected next timeline row description message" )
351
+ }
352
+ case * pgproto3.DataRow :
353
+ if cnt := len (msg .Values ); cnt != 2 {
354
+ return fmt .Errorf ("expected next_tli and next_tli_startpos, got %d fields" , cnt )
355
+ }
356
+ tmpNextTli , tmpNextTliStartpos := string (msg .Values [0 ]), string (msg .Values [1 ])
357
+ nextTli , err = strconv .ParseInt (tmpNextTli , 10 , 64 )
358
+ if err != nil {
359
+ return err
360
+ }
361
+ nextTliStartpos , err = ParseLSN (tmpNextTliStartpos )
362
+ if err != nil {
363
+ return err
364
+ }
365
+ case * pgproto3.CommandComplete :
366
+ case * pgproto3.ReadyForQuery :
367
+ // if no next timeline switch result, maybe it was left on the connection
368
+ if nextTli > 0 && nextTliStartpos > 0 {
369
+ return errEndTimeline {nextTli : nextTli , nextTliStartpos : nextTliStartpos }
370
+ }
319
371
default :
320
372
return fmt .Errorf ("unexpected response type: %T" , msg )
321
373
}
0 commit comments