diff --git a/config/types.go b/config/types.go index b3da4904a..cee7a9133 100644 --- a/config/types.go +++ b/config/types.go @@ -93,6 +93,7 @@ type P2PConfig struct { InternalZones []string `mapstructure:"internalzones" description:"List of address ranges that are recognised as inner zone of agent. defined by CIDR notation."` Agent string `mapstructure:"agent" description:"Peer id of agent that delegates this producer, only available when local peer is producer"` AllowLegacy bool `mapstructure:"allowlegacy" description:"Whether to allow legacy security protocols"` + MsgBufSize int `mapstructure:"msgbufsize" description:"Size of message buffer for each peer"` } // AuthConfig defines configuration for auditing diff --git a/p2p/msgorder_test.go b/p2p/msgorder_test.go index 600491031..8f919c7e0 100644 --- a/p2p/msgorder_test.go +++ b/p2p/msgorder_test.go @@ -45,6 +45,7 @@ func Test_pbRequestOrder_SendTo(t *testing.T) { mockPeerManager := p2pmock.NewMockPeerManager(ctrl) mockRW := p2pmock.NewMockMsgReadWriter(ctrl) + mockPeerManager.EXPECT().MsgBufSize().Return(writeMsgBufferSize).AnyTimes() mockRW.EXPECT().WriteMsg(gomock.Any()).Return(tt.writeErr) peer := newRemotePeer(sampleRemote, 0, mockPeerManager, mockActorServ, logger, factory, &dummySigner{}, mockRW) @@ -92,6 +93,7 @@ func Test_pbMessageOrder_SendTo(t *testing.T) { mockPeerManager := p2pmock.NewMockPeerManager(ctrl) mockRW := p2pmock.NewMockMsgReadWriter(ctrl) + mockPeerManager.EXPECT().MsgBufSize().Return(writeMsgBufferSize).AnyTimes() mockRW.EXPECT().WriteMsg(gomock.Any()).Return(tt.writeErr) peer := newRemotePeer(sampleRemote, 0, mockPeerManager, mockActorServ, logger, factory, &dummySigner{}, mockRW) @@ -141,6 +143,8 @@ func Test_pbBlkNoticeOrder_SendTo(t *testing.T) { mockPeerManager := p2pmock.NewMockPeerManager(ctrl) mockRW := p2pmock.NewMockMsgReadWriter(ctrl) + mockPeerManager.EXPECT().MsgBufSize().Return(writeMsgBufferSize).AnyTimes() + if tt.keyExist { mockRW.EXPECT().WriteMsg(gomock.Any()).Return(tt.writeErr).Times(0) } else { @@ -211,6 +215,7 @@ func Test_pbBlkNoticeOrder_SendTo_SkipByHeight(t *testing.T) { mockPeerManager := p2pmock.NewMockPeerManager(ctrl) mockRW := p2pmock.NewMockMsgReadWriter(ctrl) + mockPeerManager.EXPECT().MsgBufSize().Return(writeMsgBufferSize).AnyTimes() writeCnt := 0 mockRW.EXPECT().WriteMsg(gomock.Any()).Do(func(arg interface{}) { writeCnt++ @@ -287,6 +292,7 @@ func Test_pbBlkNoticeOrder_SendTo_SkipByTime(t *testing.T) { mockPeerManager := p2pmock.NewMockPeerManager(ctrl) mockRW := p2pmock.NewMockMsgReadWriter(ctrl) + mockPeerManager.EXPECT().MsgBufSize().Return(writeMsgBufferSize).AnyTimes() writeCnt := 0 mockRW.EXPECT().WriteMsg(gomock.Any()).Do(func(arg interface{}) { writeCnt++ @@ -350,6 +356,7 @@ func Test_pbTxNoticeOrder_SendTo(t *testing.T) { mockPeerManager := p2pmock.NewMockPeerManager(ctrl) mockRW := p2pmock.NewMockMsgReadWriter(ctrl) + mockPeerManager.EXPECT().MsgBufSize().Return(writeMsgBufferSize).AnyTimes() if tt.keyExist == len(sampleHashes) { mockRW.EXPECT().WriteMsg(gomock.Any()).Return(tt.writeErr).Times(0) } else { diff --git a/p2p/p2pcommon/peermanager.go b/p2p/p2pcommon/peermanager.go index 6dfe4121e..6e8fb5625 100644 --- a/p2p/p2pcommon/peermanager.go +++ b/p2p/p2pcommon/peermanager.go @@ -44,4 +44,5 @@ type PeerManager interface { AddDesignatedPeer(meta PeerMeta) RemoveDesignatedPeer(peerID types.PeerID) ListDesignatedPeers() []PeerMeta + MsgBufSize() int } diff --git a/p2p/p2pcommon/temptype.go b/p2p/p2pcommon/temptype.go index fde708572..42b8144c3 100644 --- a/p2p/p2pcommon/temptype.go +++ b/p2p/p2pcommon/temptype.go @@ -6,7 +6,7 @@ package p2pcommon // This file describe the command to generate mock objects of imported interfaces -//go:generate sh -c "mockgen github.com/aergoio/aergo/v2/p2p/p2pcommon NTContainer,NetworkTransport | gsed -e 's/[Pp]ackage mock_p2pcommon/package p2pmock/g' > p2p/p2pmock/mock_networktransport.go" +//go:generate sh -c "mockgen github.com/aergoio/aergo/v2/p2p/p2pcommon NTContainer,NetworkTransport | gsed -e 's/[Pp]ackage mock_p2pcommon/package p2pmock/g' > ../p2pmock/mock_networktransport.go" // in aergo but outside of p2p //go:generate sh -c "mockgen github.com/aergoio/aergo/v2/types ChainAccessor | sed -e 's/[Pp]ackage mock_types/package p2pmock/g' > ../p2pmock/mock_chainaccessor.go" diff --git a/p2p/p2pmock/mock_peermanager.go b/p2p/p2pmock/mock_peermanager.go index 0a499dd69..fa7802241 100644 --- a/p2p/p2pmock/mock_peermanager.go +++ b/p2p/p2pmock/mock_peermanager.go @@ -5,9 +5,9 @@ package p2pmock import ( - message "github.com/aergoio/aergo/v2/types/message" p2pcommon "github.com/aergoio/aergo/v2/p2p/p2pcommon" types "github.com/aergoio/aergo/v2/types" + message "github.com/aergoio/aergo/v2/types/message" gomock "github.com/golang/mock/gomock" reflect "reflect" ) @@ -273,3 +273,17 @@ func (mr *MockPeerManagerMockRecorder) ListDesignatedPeers() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListDesignatedPeers", reflect.TypeOf((*MockPeerManager)(nil).ListDesignatedPeers)) } + +// MsgBufSize mocks base method +func (m *MockPeerManager) MsgBufSize() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MsgBufSize") + ret0, _ := ret[0].(int) + return ret0 +} + +// MsgBufSize indicates an expected call of MsgBufSize +func (mr *MockPeerManagerMockRecorder) MsgBufSize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MsgBufSize", reflect.TypeOf((*MockPeerManager)(nil).MsgBufSize)) +} diff --git a/p2p/p2pmock/readme.txt b/p2p/p2pmock/readme.txt index f26d700f9..af6e35d0b 100644 --- a/p2p/p2pmock/readme.txt +++ b/p2p/p2pmock/readme.txt @@ -1,14 +1,23 @@ Prerequisites +NOTE: + +At this point, it may not works properly in this document. As the version of Golang has updated, the change in security policy or internal implementation is suspected of not being able to execute properly. + Installing mockgen the package github.com/golang/mock is superseded by https://github.com/uber/mock in 2021, but we are not ready to migrate yet by transitive dependencies. -go install github.com/golang/mock/mockgen@v1.6.0 +go install github.com/golang/mock/mockgen@v1.4.0 (In the future) Refering to https://github.com/uber-go/mock/ ( go install go.uber.org/mock/mockgen@latest ) +Installing stringer + +Some generate operations of this project needs stringer tool. It can be installed like below; + +go install golang.org/x/tools/cmd/stringer@latest Examples to generate mock class @@ -29,8 +38,9 @@ mockgen -source=p2p/p2pcommon/pool.go -mock_names=WaitingPeerManager=MockWaiting # Manually generated mock classes -The generate descriptions of these mock objects are in p2p/p2pcommon/temptypes.go . So you can use such like `go generate ./p2p/p2pcommon/temptypes.go` command. +The generate descriptions of these mock objects are in p2p/p2pcommon/temptypes.go . So you can use such like `go generate ./p2p/p2pcommon/temptype.go` command. +# Some files cannot be generated by go generate command. They should be generated using mockgen and gnu sed. # mock files which are not generated automatically by go generate ./p2p mockgen github.com/aergoio/aergo/v2/consensus ConsensusAccessor,AergoRaftAccessor | gsed -e 's/^package mock_[a-zA-Z0-9_]\+/package p2pmock/g' > p2p/p2pmock/mock_consensus.go diff --git a/p2p/peermanager.go b/p2p/peermanager.go index ac4c48ba9..8418bd387 100644 --- a/p2p/peermanager.go +++ b/p2p/peermanager.go @@ -75,7 +75,8 @@ type peerManager struct { designatedPeers map[types.PeerID]p2pcommon.PeerMeta hiddenPeerSet map[types.PeerID]bool - logger *log.Logger + msgBufSize int + logger *log.Logger } // getPeerTask is struct to get peer for concurrent use @@ -90,6 +91,10 @@ var _ p2pcommon.PeerManager = (*peerManager)(nil) func NewPeerManager(is p2pcommon.InternalService, hsFactory p2pcommon.HSHandlerFactory, actor p2pcommon.ActorService, pf p2pcommon.PeerFactory, nt p2pcommon.NetworkTransport, mm metric.MetricsManager, lm p2pcommon.ListManager, logger *log.Logger, cfg *cfg.Config, skipHandshakeSync bool) p2pcommon.PeerManager { p2pConf := cfg.P2P //logger.SetLevel("debug") + msgBufSize := p2pConf.MsgBufSize + if msgBufSize < writeMsgBufferSize { + msgBufSize = writeMsgBufferSize + } pm := &peerManager{ is: is, nt: nt, @@ -125,6 +130,7 @@ func NewPeerManager(is p2pcommon.InternalService, hsFactory p2pcommon.HSHandlerF eventListeners: make([]p2pcommon.PeerEventListener, 0, 4), taskChannel: make(chan pmTask, 4), finishChannel: make(chan struct{}), + msgBufSize: msgBufSize, } // additional initializations @@ -471,6 +477,10 @@ func (pm *peerManager) GetPeerBlockInfos() []types.PeerBlockInfo { return infos } +func (pm *peerManager) MsgBufSize() int { + return pm.msgBufSize +} + func (pm *peerManager) GetPeerAddresses(noHidden bool, showSelf bool) []*message.PeerInfo { peers := make([]*message.PeerInfo, 0, len(pm.peerCache)) if showSelf { diff --git a/p2p/remotepeer.go b/p2p/remotepeer.go index 948a07c91..efd19633b 100644 --- a/p2p/remotepeer.go +++ b/p2p/remotepeer.go @@ -108,7 +108,7 @@ func newRemotePeer(remote p2pcommon.RemoteInfo, manageNum uint32, pm p2pcommon.P maxTxNoticeHashSize: DefaultPeerTxQueueSize, taskChannel: make(chan p2pcommon.PeerTask, 1), } - rPeer.writeBuf = make(chan p2pcommon.MsgOrder, writeMsgBufferSize) + rPeer.writeBuf = make(chan p2pcommon.MsgOrder, pm.MsgBufSize()) rPeer.writeDirect = make(chan p2pcommon.MsgOrder) var err error diff --git a/p2p/remotepeer_test.go b/p2p/remotepeer_test.go index e03fd0cab..35f80b8a2 100644 --- a/p2p/remotepeer_test.go +++ b/p2p/remotepeer_test.go @@ -330,13 +330,18 @@ func TestRemotePeerImpl_UpdateBlkCache(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockActor := new(p2pmock.MockActorService) - mockPeerManager := new(p2pmock.MockPeerManager) + mockPeerManager := p2pmock.NewMockPeerManager(ctrl) mockSigner := new(p2pmock.MockMsgSigner) mockMF := new(p2pmock.MockMoFactory) sampleConn := p2pcommon.RemoteConn{IP: net.ParseIP(sampleMeta.PrimaryAddress()), Port: sampleMeta.PrimaryPort()} sampleRemote := p2pcommon.RemoteInfo{Meta: sampleMeta, Connection: sampleConn} + mockPeerManager.EXPECT().MsgBufSize().Return(writeMsgBufferSize).AnyTimes() + target := newRemotePeer(sampleRemote, 0, mockPeerManager, mockActor, logger, mockMF, mockSigner, nil) for _, hash := range test.inCache { target.blkHashCache.Add(hash, true) @@ -362,13 +367,17 @@ func TestRemotePeerImpl_UpdateTxCache(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() mockActor := new(p2pmock.MockActorService) - mockPeerManager := new(p2pmock.MockPeerManager) + mockPeerManager := p2pmock.NewMockPeerManager(ctrl) mockSigner := new(p2pmock.MockMsgSigner) mockMF := new(p2pmock.MockMoFactory) sampleConn := p2pcommon.RemoteConn{IP: net.ParseIP(sampleMeta.PrimaryAddress()), Port: sampleMeta.PrimaryPort()} sampleRemote := p2pcommon.RemoteInfo{Meta: sampleMeta, Connection: sampleConn} + mockPeerManager.EXPECT().MsgBufSize().Return(writeMsgBufferSize).AnyTimes() + target := newRemotePeer(sampleRemote, 0, mockPeerManager, mockActor, logger, mockMF, mockSigner, nil) for _, hash := range test.inCache { target.txHashCache.Add(hash, true) @@ -417,13 +426,15 @@ func TestRemotePeerImpl_GetReceiver(t *testing.T) { defer ctrl.Finish() mockActor := new(p2pmock.MockActorService) - mockPeerManager := new(p2pmock.MockPeerManager) + mockPeerManager := p2pmock.NewMockPeerManager(ctrl) mockSigner := new(p2pmock.MockMsgSigner) mockMF := new(p2pmock.MockMoFactory) sampleConn := p2pcommon.RemoteConn{IP: net.ParseIP(sampleMeta.PrimaryAddress()), Port: sampleMeta.PrimaryPort()} sampleRemote := p2pcommon.RemoteInfo{Meta: sampleMeta, Connection: sampleConn} mockMo := p2pmock.NewMockMsgOrder(ctrl) + mockMo.EXPECT().GetProtocolID().Return(p2pcommon.GetBlocksRequest).AnyTimes() + mockPeerManager.EXPECT().MsgBufSize().Return(writeMsgBufferSize).AnyTimes() p := newRemotePeer(sampleRemote, 0, mockPeerManager, mockActor, logger, mockMF, mockSigner, nil) for _, add := range test.toAdd { @@ -471,6 +482,7 @@ func TestRemotePeerImpl_pushTxsNotice(t *testing.T) { mockMF := p2pmock.NewMockMoFactory(ctrl) mockSigner := new(p2pmock.MockMsgSigner) + mockPeerManager.EXPECT().MsgBufSize().Return(writeMsgBufferSize).AnyTimes() mockMO.EXPECT().GetMsgID().Return(p2pcommon.NewMsgID()).AnyTimes() mockMF.EXPECT().NewMsgTxBroadcastOrder(gomock.Any()).Return(mockMO). Times(test.expectSend) @@ -487,8 +499,6 @@ func TestRemotePeerImpl_pushTxsNotice(t *testing.T) { } } func TestRemotePeer_writeToPeer(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() rand := uuid.Must(uuid.NewV4()) var sampleMsgID p2pcommon.MsgID @@ -518,12 +528,16 @@ func TestRemotePeer_writeToPeer(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockPeerManager := p2pmock.NewMockPeerManager(ctrl) mockMO := p2pmock.NewMockMsgOrder(ctrl) mockStream := p2pmock.NewMockStream(ctrl) dummyRW := p2pmock.NewMockMsgReadWriter(ctrl) dummyRW.EXPECT().Close().AnyTimes() + mockPeerManager.EXPECT().MsgBufSize().Return(writeMsgBufferSize).AnyTimes() mockStream.EXPECT().Close().Return(nil).AnyTimes() mockMO.EXPECT().IsNeedSign().Return(true).AnyTimes() mockMO.EXPECT().SendTo(gomock.Any()).Return(tt.args.sendErr)