From a92fc641e4d87eb7c7e9de1d286c18f6587de18a Mon Sep 17 00:00:00 2001 From: Hayarobi Park Date: Tue, 3 Jun 2025 18:24:54 +0900 Subject: [PATCH] Fix sync fails on speicfic situation --- config/types.go | 1 + p2p/p2pcommon/peermanager.go | 1 + p2p/peermanager.go | 12 +++++++++++- p2p/remotepeer.go | 2 +- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/config/types.go b/config/types.go index b3da4904a..cee7a9133 100644 --- a/config/types.go +++ b/config/types.go @@ -93,6 +93,7 @@ type P2PConfig struct { InternalZones []string `mapstructure:"internalzones" description:"List of address ranges that are recognised as inner zone of agent. defined by CIDR notation."` Agent string `mapstructure:"agent" description:"Peer id of agent that delegates this producer, only available when local peer is producer"` AllowLegacy bool `mapstructure:"allowlegacy" description:"Whether to allow legacy security protocols"` + MsgBufSize int `mapstructure:"msgbufsize" description:"Size of message buffer for each peer"` } // AuthConfig defines configuration for auditing diff --git a/p2p/p2pcommon/peermanager.go b/p2p/p2pcommon/peermanager.go index 6dfe4121e..6e8fb5625 100644 --- a/p2p/p2pcommon/peermanager.go +++ b/p2p/p2pcommon/peermanager.go @@ -44,4 +44,5 @@ type PeerManager interface { AddDesignatedPeer(meta PeerMeta) RemoveDesignatedPeer(peerID types.PeerID) ListDesignatedPeers() []PeerMeta + MsgBufSize() int } diff --git a/p2p/peermanager.go b/p2p/peermanager.go index ac4c48ba9..8418bd387 100644 --- a/p2p/peermanager.go +++ b/p2p/peermanager.go @@ -75,7 +75,8 @@ type peerManager struct { designatedPeers map[types.PeerID]p2pcommon.PeerMeta hiddenPeerSet map[types.PeerID]bool - logger *log.Logger + msgBufSize int + logger *log.Logger } // getPeerTask is struct to get peer for concurrent use @@ -90,6 +91,10 @@ var _ p2pcommon.PeerManager = (*peerManager)(nil) func NewPeerManager(is p2pcommon.InternalService, hsFactory p2pcommon.HSHandlerFactory, actor p2pcommon.ActorService, pf p2pcommon.PeerFactory, nt p2pcommon.NetworkTransport, mm metric.MetricsManager, lm p2pcommon.ListManager, logger *log.Logger, cfg *cfg.Config, skipHandshakeSync bool) p2pcommon.PeerManager { p2pConf := cfg.P2P //logger.SetLevel("debug") + msgBufSize := p2pConf.MsgBufSize + if msgBufSize < writeMsgBufferSize { + msgBufSize = writeMsgBufferSize + } pm := &peerManager{ is: is, nt: nt, @@ -125,6 +130,7 @@ func NewPeerManager(is p2pcommon.InternalService, hsFactory p2pcommon.HSHandlerF eventListeners: make([]p2pcommon.PeerEventListener, 0, 4), taskChannel: make(chan pmTask, 4), finishChannel: make(chan struct{}), + msgBufSize: msgBufSize, } // additional initializations @@ -471,6 +477,10 @@ func (pm *peerManager) GetPeerBlockInfos() []types.PeerBlockInfo { return infos } +func (pm *peerManager) MsgBufSize() int { + return pm.msgBufSize +} + func (pm *peerManager) GetPeerAddresses(noHidden bool, showSelf bool) []*message.PeerInfo { peers := make([]*message.PeerInfo, 0, len(pm.peerCache)) if showSelf { diff --git a/p2p/remotepeer.go b/p2p/remotepeer.go index 948a07c91..efd19633b 100644 --- a/p2p/remotepeer.go +++ b/p2p/remotepeer.go @@ -108,7 +108,7 @@ func newRemotePeer(remote p2pcommon.RemoteInfo, manageNum uint32, pm p2pcommon.P maxTxNoticeHashSize: DefaultPeerTxQueueSize, taskChannel: make(chan p2pcommon.PeerTask, 1), } - rPeer.writeBuf = make(chan p2pcommon.MsgOrder, writeMsgBufferSize) + rPeer.writeBuf = make(chan p2pcommon.MsgOrder, pm.MsgBufSize()) rPeer.writeDirect = make(chan p2pcommon.MsgOrder) var err error