Skip to content

Commit 81d1459

Browse files
committed
Fixed bug where messages can't be sent to services running on the network map node (corda#71)
1 parent 6ad3ca4 commit 81d1459

File tree

7 files changed

+179
-76
lines changed

7 files changed

+179
-76
lines changed

core/src/main/kotlin/net/corda/core/Utils.kt

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,7 @@ import java.nio.file.*
2727
import java.nio.file.attribute.FileAttribute
2828
import java.time.Duration
2929
import java.time.temporal.Temporal
30-
import java.util.concurrent.CompletableFuture
31-
import java.util.concurrent.ExecutionException
32-
import java.util.concurrent.Executor
33-
import java.util.concurrent.Future
30+
import java.util.concurrent.*
3431
import java.util.concurrent.locks.ReentrantLock
3532
import java.util.function.BiConsumer
3633
import java.util.stream.Stream
@@ -68,9 +65,9 @@ infix fun Long.checkedAdd(b: Long) = Math.addExact(this, b)
6865
fun random63BitValue(): Long = Math.abs(newSecureRandom().nextLong())
6966

7067
/** Same as [Future.get] but with a more descriptive name, and doesn't throw [ExecutionException], instead throwing its cause */
71-
fun <T> Future<T>.getOrThrow(): T {
72-
try {
73-
return get()
68+
fun <T> Future<T>.getOrThrow(timeout: Duration? = null): T {
69+
return try {
70+
if (timeout == null) get() else get(timeout.toNanos(), TimeUnit.NANOSECONDS)
7471
} catch (e: ExecutionException) {
7572
throw e.cause!!
7673
}

node/src/integration-test/kotlin/net/corda/services/messaging/ArtemisMessagingServerTest.kt

Lines changed: 0 additions & 26 deletions
This file was deleted.
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package net.corda.services.messaging
2+
3+
import co.paralleluniverse.fibers.Suspendable
4+
import net.corda.core.crypto.Party
5+
import net.corda.core.div
6+
import net.corda.core.flows.FlowLogic
7+
import net.corda.core.future
8+
import net.corda.core.getOrThrow
9+
import net.corda.core.node.services.ServiceInfo
10+
import net.corda.core.seconds
11+
import net.corda.node.services.transactions.RaftValidatingNotaryService
12+
import net.corda.node.services.transactions.SimpleNotaryService
13+
import net.corda.node.utilities.ServiceIdentityGenerator
14+
import net.corda.testing.freeLocalHostAndPort
15+
import net.corda.testing.node.NodeBasedTest
16+
import org.assertj.core.api.Assertions.assertThat
17+
import org.junit.Test
18+
import java.util.*
19+
20+
class P2PMessagingTest : NodeBasedTest() {
21+
@Test
22+
fun `network map will work after restart`() {
23+
fun startNodes() {
24+
startNode("NodeA")
25+
startNode("NodeB")
26+
startNode("Notary")
27+
}
28+
29+
startNodes()
30+
// Start the network map second time, this will restore message queues from the journal.
31+
// This will hang and fail prior the fix. https://github.com/corda/corda/issues/37
32+
stopAllNodes()
33+
future {
34+
startNodes()
35+
}.getOrThrow(30.seconds)
36+
}
37+
38+
// https://github.com/corda/corda/issues/71
39+
@Test
40+
fun `sending message to a service running on the network map node`() {
41+
startNetworkMapNode(advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
42+
networkMapNode.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow(it, "Hello") }
43+
val serviceParty = networkMapNode.services.networkMapCache.getAnyNotary()!!
44+
val received = startNode("Alice").services.startFlow(ReceiveFlow(serviceParty)).resultFuture.getOrThrow(10.seconds)
45+
assertThat(received).isEqualTo("Hello")
46+
}
47+
48+
@Test
49+
fun `sending message to a distributed service which the network map node is part of`() {
50+
val serviceName = "DistributedService"
51+
52+
val root = tempFolder.root.toPath()
53+
ServiceIdentityGenerator.generateToDisk(
54+
listOf(
55+
root / "NetworkMap",
56+
root / "Alice"),
57+
RaftValidatingNotaryService.type.id,
58+
serviceName)
59+
60+
val distributedService = ServiceInfo(RaftValidatingNotaryService.type, serviceName)
61+
val notaryClusterAddress = freeLocalHostAndPort()
62+
startNetworkMapNode(
63+
"NetworkMap",
64+
advertisedServices = setOf(distributedService),
65+
configOverrides = mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
66+
val alice = startNode(
67+
"Alice",
68+
advertisedServices = setOf(distributedService),
69+
configOverrides = mapOf(
70+
"notaryNodeAddress" to freeLocalHostAndPort().toString(),
71+
"notaryClusterAddresses" to listOf(notaryClusterAddress.toString())))
72+
val bob = startNode("Bob")
73+
74+
// Setup each node in the distributed service to return back it's Party so that we can know which node is being used
75+
val serviceNodes = listOf(networkMapNode, alice)
76+
serviceNodes.forEach { node ->
77+
node.services.registerFlowInitiator(ReceiveFlow::class) { SendFlow(it, node.info.legalIdentity) }
78+
}
79+
80+
val serviceParty = networkMapNode.services.networkMapCache.getNotary(serviceName)!!
81+
val participatingParties = HashSet<Any>()
82+
// Try up to 4 times so that we can be fairly sure that any node not participating is not due to Artemis' selection strategy
83+
for (it in 1..5) {
84+
participatingParties += bob.services.startFlow(ReceiveFlow(serviceParty)).resultFuture.getOrThrow(10.seconds)
85+
if (participatingParties.size == 2) {
86+
break
87+
}
88+
}
89+
assertThat(participatingParties).containsOnlyElementsOf(serviceNodes.map { it.info.legalIdentity })
90+
}
91+
92+
private class SendFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
93+
@Suspendable
94+
override fun call() = send(otherParty, payload)
95+
}
96+
97+
private class ReceiveFlow(val otherParty: Party) : FlowLogic<Any>() {
98+
@Suspendable
99+
override fun call() = receive<Any>(otherParty).unwrap { it }
100+
}
101+
}

node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
110110
if (!query.isExists) {
111111
activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false)
112112
}
113-
maybeDeployBridgeForAddress(networkMapService)
113+
deployBridgeIfAbsent(networkMapService.queueName, networkMapService.hostAndPort)
114114
}
115115

116116
/**
@@ -148,10 +148,10 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
148148
}
149149

150150
(addressesToRemoveBridgesFrom - addressesToCreateBridgesTo).forEach {
151-
maybeDestroyBridge(bridgeNameForAddress(it))
151+
activeMQServer.destroyBridge(getBridgeName(it.queueName, it.hostAndPort))
152152
}
153-
addressesToCreateBridgesTo.forEach {
154-
if (activeMQServer.queueQuery(it.queueName).isExists) maybeDeployBridgeForAddress(it)
153+
addressesToCreateBridgesTo.filter { activeMQServer.queueQuery(it.queueName).isExists }.forEach {
154+
deployBridgeIfAbsent(it.queueName, it.hostAndPort)
155155
}
156156
}
157157

@@ -171,12 +171,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
171171
}
172172

173173
private fun maybeDeployBridgeForNode(queueName: SimpleString, nodeInfo: NodeInfo) {
174-
log.debug("Deploying bridge for $queueName to $nodeInfo")
175174
val address = nodeInfo.address
176-
if (address is NodeAddress) {
177-
maybeDeployBridgeForAddress(NodeAddress(queueName, address.hostAndPort))
175+
if (address is ArtemisPeerAddress) {
176+
log.debug("Deploying bridge for $queueName to $nodeInfo")
177+
deployBridgeIfAbsent(queueName, address.hostAndPort)
178178
} else {
179-
log.error("Don't know how to deal with $address")
179+
log.error("Don't know how to deal with $address for queue $queueName")
180180
}
181181
}
182182

@@ -284,30 +284,30 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
284284

285285
private fun bridgeExists(name: String) = activeMQServer.clusterManager.bridges.containsKey(name)
286286

287-
private fun maybeDeployBridgeForAddress(peerAddress: ArtemisPeerAddress) {
288-
if (!connectorExists(peerAddress.hostAndPort)) {
289-
addConnector(peerAddress.hostAndPort)
287+
private fun deployBridgeIfAbsent(queueName: SimpleString, hostAndPort: HostAndPort) {
288+
if (!connectorExists(hostAndPort)) {
289+
addConnector(hostAndPort)
290290
}
291-
val bridgeName = bridgeNameForAddress(peerAddress)
291+
val bridgeName = getBridgeName(queueName, hostAndPort)
292292
if (!bridgeExists(bridgeName)) {
293-
deployBridge(bridgeName, peerAddress)
293+
deployBridge(bridgeName, queueName, hostAndPort)
294294
}
295295
}
296296

297-
private fun bridgeNameForAddress(peerAddress: ArtemisPeerAddress) = "${peerAddress.queueName}-${peerAddress.hostAndPort}"
297+
private fun getBridgeName(queueName: SimpleString, hostAndPort: HostAndPort) = "$queueName -> $hostAndPort"
298298

299299
/**
300300
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
301301
* messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
302302
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
303303
* P2P address.
304304
*/
305-
private fun deployBridge(bridgeName: String, peerAddress: ArtemisPeerAddress) {
305+
private fun deployBridge(bridgeName: String, queueName: SimpleString, hostAndPort: HostAndPort) {
306306
activeMQServer.deployBridge(BridgeConfiguration().apply {
307307
name = bridgeName
308-
queueName = peerAddress.queueName.toString()
308+
this.queueName = queueName.toString()
309309
forwardingAddress = P2P_QUEUE
310-
staticConnectors = listOf(peerAddress.hostAndPort.toString())
310+
staticConnectors = listOf(hostAndPort.toString())
311311
confirmationWindowSize = 100000 // a guess
312312
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
313313
// As a peer of the target node we must connect to it using the peer user. Actual authentication is done using
@@ -317,12 +317,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
317317
})
318318
}
319319

320-
private fun maybeDestroyBridge(name: String) {
321-
if (bridgeExists(name)) {
322-
activeMQServer.destroyBridge(name)
323-
}
324-
}
325-
326320
/**
327321
* Clients must connect to us with a username and password and must use TLS. If a someone connects with
328322
* [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate

node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
379379
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString()))
380380
}
381381

382-
log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}")
382+
log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} " +
383+
"uuid: ${message.uniqueMessageId}")
383384
producer!!.send(mqAddress, artemisMessage)
384385
}
385386
}
@@ -391,6 +392,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
391392
} else {
392393
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
393394
// broker's job to route the message to the target's P2P queue.
395+
// TODO Make sure that if target is a service that we're part of and the broker routes the message back to us
396+
// it doesn't cause any issues.
394397
val internalTargetQueue = (target as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address")
395398
createQueueIfAbsent(internalTargetQueue)
396399
internalTargetQueue

node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import kotlin.concurrent.thread
4242
import kotlin.test.assertEquals
4343
import kotlin.test.assertNull
4444

45+
//TODO This needs to be merged into P2PMessagingTest as that creates a more realistic environment
4546
class ArtemisMessagingTests {
4647
@Rule @JvmField val temporaryFolder = TemporaryFolder()
4748

test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package net.corda.testing.node
22

3+
import net.corda.core.createDirectories
4+
import net.corda.core.div
35
import net.corda.core.getOrThrow
6+
import net.corda.core.node.services.ServiceInfo
47
import net.corda.node.internal.Node
58
import net.corda.node.services.User
69
import net.corda.node.services.config.ConfigHelper
710
import net.corda.node.services.config.FullNodeConfiguration
811
import net.corda.testing.freeLocalHostAndPort
912
import org.junit.After
10-
import org.junit.Before
1113
import org.junit.Rule
1214
import org.junit.rules.TemporaryFolder
1315
import java.util.*
@@ -17,43 +19,74 @@ import kotlin.concurrent.thread
1719
* Extend this class if you need to run nodes in a test. You could use the driver DSL but it's extremely slow for testing
1820
* purposes.
1921
*/
22+
// TODO Some of the logic here duplicates what's in the driver
2023
abstract class NodeBasedTest {
2124
@Rule
2225
@JvmField
2326
val tempFolder = TemporaryFolder()
2427

2528
private val nodes = ArrayList<Node>()
26-
lateinit var networkMapNode: Node
29+
private var _networkMapNode: Node? = null
2730

28-
@Before
29-
fun startNetworkMapNode() {
30-
networkMapNode = startNode("Network Map", emptyMap())
31-
}
31+
val networkMapNode: Node get() = _networkMapNode ?: startNetworkMapNode()
3232

33+
/**
34+
* Stops the network map node and all the nodes started by [startNode]. This is called automatically after each test
35+
* but can also be called manually within a test.
36+
*/
3337
@After
34-
fun stopNodes() {
38+
fun stopAllNodes() {
3539
nodes.forEach(Node::stop)
40+
nodes.clear()
41+
_networkMapNode = null
42+
}
43+
44+
/**
45+
* You can use this method to start the network map node in a more customised manner. Otherwise it
46+
* will automatically be started with the default parameters.
47+
*/
48+
fun startNetworkMapNode(legalName: String = "Network Map",
49+
advertisedServices: Set<ServiceInfo> = emptySet(),
50+
rpcUsers: List<User> = emptyList(),
51+
configOverrides: Map<String, Any> = emptyMap()): Node {
52+
check(_networkMapNode == null)
53+
return startNodeInternal(legalName, advertisedServices, rpcUsers, configOverrides).apply {
54+
_networkMapNode = this
55+
}
3656
}
3757

38-
fun startNode(legalName: String, rpcUsers: List<User> = emptyList()): Node {
39-
return startNode(legalName, mapOf(
40-
"networkMapAddress" to networkMapNode.configuration.artemisAddress.toString(),
41-
"rpcUsers" to rpcUsers.map { mapOf(
42-
"user" to it.username,
43-
"password" to it.password,
44-
"permissions" to it.permissions)
45-
}
46-
))
58+
fun startNode(legalName: String,
59+
advertisedServices: Set<ServiceInfo> = emptySet(),
60+
rpcUsers: List<User> = emptyList(),
61+
configOverrides: Map<String, Any> = emptyMap()): Node {
62+
return startNodeInternal(
63+
legalName,
64+
advertisedServices,
65+
rpcUsers,
66+
configOverrides + mapOf(
67+
"networkMapAddress" to networkMapNode.configuration.artemisAddress.toString()
68+
)
69+
)
4770
}
4871

49-
private fun startNode(legalName: String, configOverrides: Map<String, Any>): Node {
72+
private fun startNodeInternal(legalName: String,
73+
advertisedServices: Set<ServiceInfo>,
74+
rpcUsers: List<User>,
75+
configOverrides: Map<String, Any>): Node {
5076
val config = ConfigHelper.loadConfig(
51-
baseDirectoryPath = tempFolder.newFolder(legalName).toPath(),
77+
baseDirectoryPath = (tempFolder.root.toPath() / legalName).createDirectories(),
5278
allowMissingConfig = true,
5379
configOverrides = configOverrides + mapOf(
5480
"myLegalName" to legalName,
5581
"artemisAddress" to freeLocalHostAndPort().toString(),
56-
"extraAdvertisedServiceIds" to ""
82+
"extraAdvertisedServiceIds" to advertisedServices.joinToString(","),
83+
"rpcUsers" to rpcUsers.map {
84+
mapOf(
85+
"user" to it.username,
86+
"password" to it.password,
87+
"permissions" to it.permissions
88+
)
89+
}
5790
)
5891
)
5992

0 commit comments

Comments
 (0)