Skip to content

Commit 11299bb

Browse files
[Networking] logging dial addresses (#1492)
* adds addresses as return value for creating streams * fixes lint in middleware * adds dial addresses in libp2p node * removes chainid from network * reverts back dht logs * refactors log * Update network/p2p/libp2pNode.go Co-authored-by: Vishal <[email protected]> * refactors invocation of peerstore Co-authored-by: Vishal <[email protected]>
1 parent 4233145 commit 11299bb

File tree

3 files changed

+31
-24
lines changed

3 files changed

+31
-24
lines changed

cmd/scaffold.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,12 +199,11 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
199199
}
200200

201201
libP2PNodeFactory, err := p2p.DefaultLibP2PNodeFactory(
202-
fnb.Logger.Level(zerolog.ErrorLevel),
202+
fnb.Logger,
203203
fnb.Me.NodeID(),
204204
myAddr,
205205
fnb.NetworkKey,
206206
fnb.RootBlock.ID(),
207-
fnb.RootChainID,
208207
fnb.IdentityProvider,
209208
p2p.DefaultMaxPubSubMsgSize,
210209
fnb.Metrics.Network,

network/p2p/libp2pNode.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ func DefaultLibP2PNodeFactory(
9999
address string,
100100
flowKey fcrypto.PrivateKey,
101101
rootBlockID flow.Identifier,
102-
chainID flow.ChainID,
103102
idProvider id.IdentityProvider,
104103
maxPubSubMsgSize int,
105104
metrics module.NetworkMetrics,
@@ -445,11 +444,13 @@ func (n *Node) RemovePeer(ctx context.Context, peerID peer.ID) error {
445444

446445
// CreateStream returns an existing stream connected to the peer if it exists, or creates a new stream with it.
447446
func (n *Node) CreateStream(ctx context.Context, peerID peer.ID) (libp2pnet.Stream, error) {
447+
lg := n.logger.With().Str("peer_id", peerID.Pretty()).Logger()
448+
448449
// If we do not currently have any addresses for the given peer, stream creation will almost
449-
// certainly fail. If this Node was configure with a DHT, we can try to lookup the address of
450+
// certainly fail. If this Node was configured with a DHT, we can try to look up the address of
450451
// the peer in the DHT as a last resort.
451452
if len(n.host.Peerstore().Addrs(peerID)) == 0 && n.dht != nil {
452-
n.logger.Info().Str("peerID", peerID.Pretty()).Msg("address not found in peerstore, searching for peer in dht")
453+
lg.Info().Msg("address not found in peer store, searching for peer in dht")
453454

454455
var err error
455456
func() {
@@ -460,36 +461,41 @@ func (n *Node) CreateStream(ctx context.Context, peerID peer.ID) (libp2pnet.Stre
460461
}()
461462

462463
if err != nil {
463-
n.logger.Warn().Err(err).Str("peerID", peerID.Pretty()).Msg("could not find addresses")
464+
lg.Warn().Err(err).Msg("address not found in both peer store and dht")
464465
} else {
465-
n.logger.Info().Str("peerID", peerID.Pretty()).Msg("addresses found")
466+
lg.Debug().Msg("address not found in peer store, but found in dht search")
466467
}
467468
}
468-
// Open libp2p Stream with the remote peer (will use an existing TCP connection underneath if it exists)
469-
stream, err := n.tryCreateNewStream(ctx, peerID, maxConnectAttempt)
469+
stream, dialAddrs, err := n.tryCreateNewStream(ctx, peerID, maxConnectAttempt)
470470
if err != nil {
471-
return nil, flownet.NewPeerUnreachableError(fmt.Errorf("could not create stream (peer_id: %s): %w", peerID, err))
471+
return nil, flownet.NewPeerUnreachableError(fmt.Errorf("could not create stream (peer_id: %s, dialing address(s): %v): %w", peerID,
472+
dialAddrs, err))
472473
}
474+
475+
lg.Debug().Str("dial_address", fmt.Sprintf("%v", dialAddrs)).Msg("stream successfully created to remote peer")
473476
return stream, nil
474477
}
475478

476-
// tryCreateNewStream makes at most maxAttempts to create a stream with the peer.
479+
// tryCreateNewStream makes at most `maxAttempts` to create a stream with the peer.
477480
// This was put in as a fix for #2416. PubSub and 1-1 communication compete with each other when trying to connect to
478-
// remote nodes and once in a while NewStream returns an error 'both yamux endpoints are clients'
479-
func (n *Node) tryCreateNewStream(ctx context.Context, peerID peer.ID, maxAttempts int) (libp2pnet.Stream, error) {
481+
// remote nodes and once in a while NewStream returns an error 'both yamux endpoints are clients'.
482+
//
483+
// Note that in case an existing TCP connection underneath to `peerID` exists, that connection is utilized for creating a new stream.
484+
// The multiaddr.Multiaddr return value represents the addresses of `peerID` we dial while trying to create a stream to it.
485+
func (n *Node) tryCreateNewStream(ctx context.Context, peerID peer.ID, maxAttempts int) (libp2pnet.Stream, []multiaddr.Multiaddr, error) {
480486
// protect the underlying connection from being inadvertently pruned by the peer manager while the stream and
481-
// connection creation is being attempted
487+
// connection creation is being attempted, and remove it from protected list once stream created.
482488
n.connMgr.ProtectPeer(peerID)
483-
// unprotect it once done
484489
defer n.connMgr.UnprotectPeer(peerID)
485490

486491
var errs error
487492
var s libp2pnet.Stream
488493
var retries = 0
494+
var dialAddr []multiaddr.Multiaddr // address on which we dial peerID
489495
for ; retries < maxAttempts; retries++ {
490496
select {
491497
case <-ctx.Done():
492-
return nil, fmt.Errorf("context done before stream could be created (retry attempt: %d, errors: %w)", retries, errs)
498+
return nil, nil, fmt.Errorf("context done before stream could be created (retry attempt: %d, errors: %w)", retries, errs)
493499
default:
494500
}
495501

@@ -501,6 +507,7 @@ func (n *Node) tryCreateNewStream(ctx context.Context, peerID peer.ID, maxAttemp
501507

502508
// cancel the dial back off (if any), since we want to connect immediately
503509
network := n.host.Network()
510+
dialAddr = network.Peerstore().Addrs(peerID)
504511
if swm, ok := network.(*swarm.Swarm); ok {
505512
swm.Backoff().Clear(peerID)
506513
}
@@ -518,12 +525,12 @@ func (n *Node) tryCreateNewStream(ctx context.Context, peerID peer.ID, maxAttemp
518525

519526
// if the connection was rejected due to invalid node id, skip the re-attempt
520527
if strings.Contains(err.Error(), "failed to negotiate security protocol") {
521-
return s, fmt.Errorf("invalid node id: %w", err)
528+
return s, dialAddr, fmt.Errorf("invalid node id: %w", err)
522529
}
523530

524531
// if the connection was rejected due to allowlisting, skip the re-attempt
525532
if errors.Is(err, swarm.ErrGaterDisallowedConnection) {
526-
return s, fmt.Errorf("target node is not on the approved list of nodes: %w", err)
533+
return s, dialAddr, fmt.Errorf("target node is not on the approved list of nodes: %w", err)
527534
}
528535

529536
errs = multierror.Append(errs, err)
@@ -535,7 +542,8 @@ func (n *Node) tryCreateNewStream(ctx context.Context, peerID peer.ID, maxAttemp
535542
if err != nil {
536543
// if the stream creation failed due to invalid protocol id, skip the re-attempt
537544
if strings.Contains(err.Error(), "protocol not supported") {
538-
return nil, fmt.Errorf("remote node is running on a different spork: %w, protocol attempted: %s", err, n.flowLibP2PProtocolID)
545+
return nil, dialAddr, fmt.Errorf("remote node is running on a different spork: %w, protocol attempted: %s", err,
546+
n.flowLibP2PProtocolID)
539547
}
540548
errs = multierror.Append(errs, err)
541549
continue
@@ -545,15 +553,15 @@ func (n *Node) tryCreateNewStream(ctx context.Context, peerID peer.ID, maxAttemp
545553
}
546554

547555
if retries == maxAttempts {
548-
return s, errs
556+
return s, dialAddr, errs
549557
}
550558

551559
s, err := n.compressedStream(s)
552560
if err != nil {
553-
return nil, fmt.Errorf("could not create compressed stream: %w", err)
561+
return nil, dialAddr, fmt.Errorf("could not create compressed stream: %w", err)
554562
}
555563

556-
return s, nil
564+
return s, dialAddr, nil
557565
}
558566

559567
// GetIPPort returns the IP and Port the libp2p node is listening on.

network/p2p/middleware.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,10 +335,10 @@ func (m *Middleware) SendDirect(msg *message.Message, targetID flow.Identifier)
335335
// create new stream
336336
// (streams don't need to be reused and are fairly inexpensive to be created for each send.
337337
// A stream creation does NOT incur an RTT as stream negotiation happens as part of the first message
338-
// sent out the the receiver
338+
// sent out the receiver
339339
stream, err := m.libP2PNode.CreateStream(ctx, peerID)
340340
if err != nil {
341-
return fmt.Errorf("failed to create stream for %s :%w", targetID, err)
341+
return fmt.Errorf("failed to create stream for %s: %w", targetID, err)
342342
}
343343

344344
// create a gogo protobuf writer

0 commit comments

Comments
 (0)