@@ -9,6 +9,7 @@ import net.corda.core.crypto.X509Utilities.CORDA_CLIENT_CA
99import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA
1010import net.corda.core.crypto.newSecureRandom
1111import net.corda.core.div
12+ import net.corda.core.node.NodeInfo
1213import net.corda.core.node.services.NetworkMapCache
1314import net.corda.core.node.services.NetworkMapCache.MapChange
1415import net.corda.core.utilities.debug
@@ -92,7 +93,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
9293 fun start () = mutex.locked {
9394 if (! running) {
9495 configureAndStartServer()
95- networkChangeHandle = networkMapCache.changed.subscribe { destroyOrCreateBridge (it) }
96+ networkChangeHandle = networkMapCache.changed.subscribe { destroyOrCreateBridges (it) }
9697 running = true
9798 }
9899 }
@@ -120,14 +121,36 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
120121 * We create the bridges indirectly now because the network map is not persisted and there are no ways to obtain host and port information on startup.
121122 * TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service.
122123 */
123- private fun destroyOrCreateBridge (change : MapChange ) {
124- val (newNode, staleNode) = when (change) {
125- is MapChange .Modified -> change.node to change.previousNode
126- is MapChange .Removed -> null to change.node
127- is MapChange .Added -> change.node to null
124+ private fun destroyOrCreateBridges (change : MapChange ) {
125+ fun addAddresses (node : NodeInfo , target : HashSet <ArtemisAddress >) {
126+ val nodeAddress = node.address as ArtemisAddress
127+ target.add(nodeAddress)
128+ change.node.advertisedServices.forEach {
129+ target.add(NodeAddress .asService(it.identity.owningKey, nodeAddress.hostAndPort))
130+ }
131+ }
132+
133+ val addressesToCreateBridgesTo = HashSet <ArtemisAddress >()
134+ val addressesToRemoveBridgesTo = HashSet <ArtemisAddress >()
135+ when (change) {
136+ is MapChange .Modified -> {
137+ addAddresses(change.node, addressesToCreateBridgesTo)
138+ addAddresses(change.previousNode, addressesToRemoveBridgesTo)
139+ }
140+ is MapChange .Removed -> {
141+ addAddresses(change.node, addressesToRemoveBridgesTo)
142+ }
143+ is MapChange .Added -> {
144+ addAddresses(change.node, addressesToCreateBridgesTo)
145+ }
146+ }
147+
148+ (addressesToRemoveBridgesTo - addressesToCreateBridgesTo).forEach {
149+ maybeDestroyBridge(bridgeNameForAddress(it))
150+ }
151+ addressesToCreateBridgesTo.forEach {
152+ maybeDeployBridgeForAddress(it)
128153 }
129- (staleNode?.address as ? ArtemisAddress )?.let { maybeDestroyBridge(it.queueName) }
130- (newNode?.address as ? ArtemisAddress )?.let { if (activeMQServer.queueQuery(it.queueName).isExists) maybeDeployBridgeForAddress(it) }
131154 }
132155
133156 private fun configureAndStartServer () {
@@ -138,31 +161,47 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
138161 registerActivationFailureListener { exception -> throw exception }
139162 // Some types of queue might need special preparation on our side, like dialling back or preparing
140163 // a lazily initialised subsystem.
141- registerPostQueueCreationCallback { deployBridgeFromNewPeerQueue (it) }
164+ registerPostQueueCreationCallback { deployBridgeFromNewQueue (it) }
142165 registerPostQueueDeletionCallback { address, qName -> log.debug { " Queue deleted: $qName for $address " } }
143166 }
144167 activeMQServer.start()
145168 printBasicNodeInfo(" Node listening on address" , myHostPort.toString())
146169 }
147170
148- private fun deployBridgeFromNewPeerQueue (queueName : SimpleString ) {
149- log.debug { " Queue created: $queueName " }
150- if (! queueName.startsWith(PEERS_PREFIX )) return
151- try {
152- val identity = CompositeKey .parseFromBase58(queueName.substring(PEERS_PREFIX .length))
153- val nodeInfo = networkMapCache.getNodeByCompositeKey(identity)
154- if (nodeInfo != null ) {
155- val address = nodeInfo.address
156- if (address is NodeAddress ) {
157- maybeDeployBridgeForAddress(address)
171+ private fun maybeDeployBridgeForNode (queueName : SimpleString , nodeInfo : NodeInfo ) {
172+ log.debug(" Deploying bridge for $queueName to $nodeInfo " )
173+ val address = nodeInfo.address
174+ if (address is NodeAddress ) {
175+ maybeDeployBridgeForAddress(NodeAddress (queueName, address.hostAndPort))
176+ } else {
177+ log.error(" Don't know how to deal with $address " )
178+ }
179+ }
180+
181+ private fun deployBridgeFromNewQueue (queueName : SimpleString ) {
182+ log.debug { " Queue created: $queueName , deploying bridge(s)" }
183+ when {
184+ queueName.startsWith(PEERS_PREFIX ) -> try {
185+ val identity = CompositeKey .parseFromBase58(queueName.substring(PEERS_PREFIX .length))
186+ val nodeInfo = networkMapCache.getNodeByLegalIdentityKey(identity)
187+ if (nodeInfo != null ) {
188+ maybeDeployBridgeForNode(queueName, nodeInfo)
158189 } else {
159- log.error(" Don 't know how to deal with $address " )
190+ log.error(" Queue created for a peer that we don 't know from the network map: $queueName " )
160191 }
161- } else {
162- log.error(" Queue created for a peer that we don't know from the network map: $queueName " )
192+ } catch (e: AddressFormatException ) {
193+ log.error(" Flow violation: Could not parse peer queue name as Base 58: $queueName " )
194+ }
195+
196+ queueName.startsWith(SERVICES_PREFIX ) -> try {
197+ val identity = CompositeKey .parseFromBase58(queueName.substring(SERVICES_PREFIX .length))
198+ val nodeInfos = networkMapCache.getNodesByAdvertisedServiceIdentityKey(identity)
199+ for (nodeInfo in nodeInfos) {
200+ maybeDeployBridgeForNode(queueName, nodeInfo)
201+ }
202+ } catch (e: AddressFormatException ) {
203+ log.error(" Flow violation: Could not parse service queue name as Base 58: $queueName " )
163204 }
164- } catch (e: AddressFormatException ) {
165- log.error(" Flow violation: Could not parse queue name as Base 58: $queueName " )
166205 }
167206 }
168207
@@ -240,26 +279,29 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
240279 tcpTransport(OUTBOUND , hostAndPort.hostText, hostAndPort.port)
241280 )
242281
243- private fun bridgeExists (name : SimpleString ) = activeMQServer.clusterManager.bridges.containsKey(name.toString() )
282+ private fun bridgeExists (name : String ) = activeMQServer.clusterManager.bridges.containsKey(name)
244283
245284 private fun maybeDeployBridgeForAddress (address : ArtemisAddress ) {
246285 if (! connectorExists(address.hostAndPort)) {
247286 addConnector(address.hostAndPort)
248287 }
249- if (! bridgeExists(address.queueName)) {
250- deployBridge(address)
288+ val bridgeName = bridgeNameForAddress(address)
289+ if (! bridgeExists(bridgeName)) {
290+ deployBridge(bridgeName, address)
251291 }
252292 }
253293
294+ private fun bridgeNameForAddress (address : ArtemisAddress ) = " ${address.queueName} -${address.hostAndPort} "
295+
254296 /* *
255297 * All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
256298 * messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
257299 * as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
258300 * P2P address.
259301 */
260- private fun deployBridge (address : ArtemisAddress ) {
302+ private fun deployBridge (bridgeName : String , address : ArtemisAddress ) {
261303 activeMQServer.deployBridge(BridgeConfiguration ().apply {
262- name = address.queueName.toString()
304+ name = bridgeName
263305 queueName = address.queueName.toString()
264306 forwardingAddress = P2P_QUEUE
265307 staticConnectors = listOf (address.hostAndPort.toString())
@@ -272,9 +314,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
272314 })
273315 }
274316
275- private fun maybeDestroyBridge (name : SimpleString ) {
317+ private fun maybeDestroyBridge (name : String ) {
276318 if (bridgeExists(name)) {
277- activeMQServer.destroyBridge(name.toString() )
319+ activeMQServer.destroyBridge(name)
278320 }
279321 }
280322
0 commit comments