Skip to content

Commit fd436b0

Browse files
committed
artemis, sessions, mock: Add Service addressing, tests pass
1 parent 978ab7e commit fd436b0

File tree

19 files changed

+280
-114
lines changed

19 files changed

+280
-114
lines changed

core/src/main/kotlin/net/corda/core/messaging/Messaging.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import com.google.common.util.concurrent.ListenableFuture
44
import com.google.common.util.concurrent.SettableFuture
55
import net.corda.core.catch
66
import net.corda.core.node.services.DEFAULT_SESSION_ID
7+
import net.corda.core.node.services.PartyInfo
78
import net.corda.core.serialization.DeserializeAsKotlinObjectDef
89
import net.corda.core.serialization.deserialize
910
import net.corda.core.serialization.serialize
@@ -79,6 +80,8 @@ interface MessagingService {
7980
*/
8081
fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message
8182

83+
fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients
84+
8285
/** Returns an address that refers to this node. */
8386
val myAddress: SingleMessageRecipient
8487
}

core/src/main/kotlin/net/corda/core/node/ServiceHub.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ interface ServiceHub {
8181
* Typical use is during signing in flows and for unit test signing.
8282
*/
8383
val notaryIdentityKey: KeyPair get() = this.keyManagementService.toKeyPair(this.myInfo.notaryIdentity.owningKey.keys)
84-
8584
}
8685

8786
/**

core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import com.google.common.util.concurrent.ListenableFuture
55
import net.corda.core.contracts.Contract
66
import net.corda.core.crypto.CompositeKey
77
import net.corda.core.crypto.Party
8+
import net.corda.core.messaging.MessageRecipients
89
import net.corda.core.messaging.MessagingService
910
import net.corda.core.messaging.SingleMessageRecipient
1011
import net.corda.core.node.NodeInfo
12+
import net.corda.core.node.ServiceEntry
1113
import net.corda.core.randomOrNull
1214
import rx.Observable
1315

@@ -64,16 +66,28 @@ interface NetworkMapCache {
6466
fun getNodeByLegalName(name: String): NodeInfo? = partyNodes.singleOrNull { it.legalIdentity.name == name }
6567

6668
/** Look up the node info for a composite key. */
67-
fun getNodeByCompositeKey(compositeKey: CompositeKey): NodeInfo? {
69+
fun getNodeByLegalIdentityKey(compositeKey: CompositeKey): NodeInfo? {
6870
// Although we should never have more than one match, it is theoretically possible. Report an error if it happens.
69-
val candidates = partyNodes.filter {
70-
(it.legalIdentity.owningKey == compositeKey)
71-
|| it.advertisedServices.any { it.identity.owningKey == compositeKey }
72-
}
71+
val candidates = partyNodes.filter { it.legalIdentity.owningKey == compositeKey }
7372
check(candidates.size <= 1) { "Found more than one match for key $compositeKey" }
7473
return candidates.singleOrNull()
7574
}
7675

76+
/**
77+
* Look up all nodes advertising the service owned by [compositeKey]
78+
*/
79+
fun getNodesByAdvertisedServiceIdentityKey(compositeKey: CompositeKey): List<NodeInfo> {
80+
return partyNodes.filter { it.advertisedServices.any { it.identity.owningKey == compositeKey } }
81+
}
82+
83+
/**
84+
* Returns information about the party, which may be a specific node or a service
85+
*
86+
* @party The party we would like the address of.
87+
* @return The address of the party, if found.
88+
*/
89+
fun getPartyInfo(party: Party): PartyInfo?
90+
7791
/**
7892
* Given a [party], returns a node advertising it as an identity. If more than one node found the result
7993
* is chosen at random.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package net.corda.core.node.services
2+
3+
import net.corda.core.crypto.Party
4+
import net.corda.core.node.NodeInfo
5+
import net.corda.core.node.ServiceEntry
6+
7+
sealed class PartyInfo(val party: Party) {
8+
class Node(val node: NodeInfo) : PartyInfo(node.legalIdentity)
9+
class Service(val service: ServiceEntry) : PartyInfo(service.identity)
10+
}

core/src/main/kotlin/net/corda/flows/AbstractStateReplacementFlow.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ abstract class AbstractStateReplacementFlow<T> {
7272
@Suspendable
7373
private fun collectSignatures(participants: List<CompositeKey>, stx: SignedTransaction): List<DigitalSignature.WithKey> {
7474
val parties = participants.map {
75-
val participantNode = serviceHub.networkMapCache.getNodeByCompositeKey(it) ?:
75+
val participantNode = serviceHub.networkMapCache.getNodeByLegalIdentityKey(it) ?:
7676
throw IllegalStateException("Participant $it to state $originalState not found on the network")
7777
participantNode.legalIdentity
7878
}

node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
277277
* A service entry contains the advertised [ServiceInfo] along with the service identity. The identity *name* is
278278
* taken from the configuration or, if non specified, generated by combining the node's legal name and the service id.
279279
*/
280-
private fun makeServiceEntries(): List<ServiceEntry> {
280+
protected fun makeServiceEntries(): List<ServiceEntry> {
281281
return advertisedServices.map {
282282
val serviceId = it.type.id
283283
val serviceName = it.name ?: "$serviceId|${configuration.myLegalName}"

node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package net.corda.node.services.messaging
33
import com.google.common.annotations.VisibleForTesting
44
import com.google.common.net.HostAndPort
55
import net.corda.core.crypto.CompositeKey
6+
import net.corda.core.messaging.MessageRecipientGroup
67
import net.corda.core.messaging.MessageRecipients
78
import net.corda.core.messaging.SingleMessageRecipient
89
import net.corda.core.read
@@ -34,6 +35,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
3435

3536
const val INTERNAL_PREFIX = "internal."
3637
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers."
38+
const val SERVICES_PREFIX = "${INTERNAL_PREFIX}services."
3739
const val CLIENTS_PREFIX = "clients."
3840
const val P2P_QUEUE = "p2p.inbound"
3941
const val RPC_REQUESTS_QUEUE = "rpc.requests"
@@ -55,7 +57,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
5557
}
5658
}
5759

58-
protected interface ArtemisAddress {
60+
protected interface ArtemisAddress : SingleMessageRecipient {
5961
val queueName: SimpleString
6062
val hostAndPort: HostAndPort
6163
}
@@ -69,11 +71,20 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
6971
* may change or evolve and code that relies upon it being a simple host/port may not function correctly.
7072
* For instance it may contain onion routing data.
7173
*/
72-
data class NodeAddress(val identity: CompositeKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
73-
override val queueName: SimpleString = SimpleString("$PEERS_PREFIX${identity.toBase58String()}")
74+
data class NodeAddress(override val queueName: SimpleString, override val hostAndPort: HostAndPort) : ArtemisAddress {
75+
companion object {
76+
fun asPeer(identity: CompositeKey, hostAndPort: HostAndPort) =
77+
NodeAddress(SimpleString("$PEERS_PREFIX${identity.toBase58String()}"), hostAndPort)
78+
fun asService(identity: CompositeKey, hostAndPort: HostAndPort) =
79+
NodeAddress(SimpleString("$SERVICES_PREFIX${identity.toBase58String()}"), hostAndPort)
80+
}
7481
override fun toString(): String = "${javaClass.simpleName}(identity = $queueName, $hostAndPort)"
7582
}
7683

84+
data class ServiceAddress(val identity: CompositeKey) : MessageRecipientGroup {
85+
val queueName: SimpleString = SimpleString("$SERVICES_PREFIX${identity.toBase58String()}")
86+
}
87+
7788
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */
7889
abstract val config: NodeSSLConfiguration
7990

node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt

Lines changed: 73 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import net.corda.core.crypto.X509Utilities.CORDA_CLIENT_CA
99
import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA
1010
import net.corda.core.crypto.newSecureRandom
1111
import net.corda.core.div
12+
import net.corda.core.node.NodeInfo
1213
import net.corda.core.node.services.NetworkMapCache
1314
import net.corda.core.node.services.NetworkMapCache.MapChange
1415
import net.corda.core.utilities.debug
@@ -92,7 +93,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
9293
fun start() = mutex.locked {
9394
if (!running) {
9495
configureAndStartServer()
95-
networkChangeHandle = networkMapCache.changed.subscribe { destroyOrCreateBridge(it) }
96+
networkChangeHandle = networkMapCache.changed.subscribe { destroyOrCreateBridges(it) }
9697
running = true
9798
}
9899
}
@@ -120,14 +121,36 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
120121
* We create the bridges indirectly now because the network map is not persisted and there are no ways to obtain host and port information on startup.
121122
* TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service.
122123
*/
123-
private fun destroyOrCreateBridge(change: MapChange) {
124-
val (newNode, staleNode) = when (change) {
125-
is MapChange.Modified -> change.node to change.previousNode
126-
is MapChange.Removed -> null to change.node
127-
is MapChange.Added -> change.node to null
124+
private fun destroyOrCreateBridges(change: MapChange) {
125+
fun addAddresses(node: NodeInfo, target: HashSet<ArtemisAddress>) {
126+
val nodeAddress = node.address as ArtemisAddress
127+
target.add(nodeAddress)
128+
change.node.advertisedServices.forEach {
129+
target.add(NodeAddress.asService(it.identity.owningKey, nodeAddress.hostAndPort))
130+
}
131+
}
132+
133+
val addressesToCreateBridgesTo = HashSet<ArtemisAddress>()
134+
val addressesToRemoveBridgesTo = HashSet<ArtemisAddress>()
135+
when (change) {
136+
is MapChange.Modified -> {
137+
addAddresses(change.node, addressesToCreateBridgesTo)
138+
addAddresses(change.previousNode, addressesToRemoveBridgesTo)
139+
}
140+
is MapChange.Removed -> {
141+
addAddresses(change.node, addressesToRemoveBridgesTo)
142+
}
143+
is MapChange.Added -> {
144+
addAddresses(change.node, addressesToCreateBridgesTo)
145+
}
146+
}
147+
148+
(addressesToRemoveBridgesTo - addressesToCreateBridgesTo).forEach {
149+
maybeDestroyBridge(bridgeNameForAddress(it))
150+
}
151+
addressesToCreateBridgesTo.forEach {
152+
maybeDeployBridgeForAddress(it)
128153
}
129-
(staleNode?.address as? ArtemisAddress)?.let { maybeDestroyBridge(it.queueName) }
130-
(newNode?.address as? ArtemisAddress)?.let { if (activeMQServer.queueQuery(it.queueName).isExists) maybeDeployBridgeForAddress(it) }
131154
}
132155

133156
private fun configureAndStartServer() {
@@ -138,31 +161,47 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
138161
registerActivationFailureListener { exception -> throw exception }
139162
// Some types of queue might need special preparation on our side, like dialling back or preparing
140163
// a lazily initialised subsystem.
141-
registerPostQueueCreationCallback { deployBridgeFromNewPeerQueue(it) }
164+
registerPostQueueCreationCallback { deployBridgeFromNewQueue(it) }
142165
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
143166
}
144167
activeMQServer.start()
145168
printBasicNodeInfo("Node listening on address", myHostPort.toString())
146169
}
147170

148-
private fun deployBridgeFromNewPeerQueue(queueName: SimpleString) {
149-
log.debug { "Queue created: $queueName" }
150-
if (!queueName.startsWith(PEERS_PREFIX)) return
151-
try {
152-
val identity = CompositeKey.parseFromBase58(queueName.substring(PEERS_PREFIX.length))
153-
val nodeInfo = networkMapCache.getNodeByCompositeKey(identity)
154-
if (nodeInfo != null) {
155-
val address = nodeInfo.address
156-
if (address is NodeAddress) {
157-
maybeDeployBridgeForAddress(address)
171+
private fun maybeDeployBridgeForNode(queueName: SimpleString, nodeInfo: NodeInfo) {
172+
log.debug("Deploying bridge for $queueName to $nodeInfo")
173+
val address = nodeInfo.address
174+
if (address is NodeAddress) {
175+
maybeDeployBridgeForAddress(NodeAddress(queueName, address.hostAndPort))
176+
} else {
177+
log.error("Don't know how to deal with $address")
178+
}
179+
}
180+
181+
private fun deployBridgeFromNewQueue(queueName: SimpleString) {
182+
log.debug { "Queue created: $queueName, deploying bridge(s)" }
183+
when {
184+
queueName.startsWith(PEERS_PREFIX) -> try {
185+
val identity = CompositeKey.parseFromBase58(queueName.substring(PEERS_PREFIX.length))
186+
val nodeInfo = networkMapCache.getNodeByLegalIdentityKey(identity)
187+
if (nodeInfo != null) {
188+
maybeDeployBridgeForNode(queueName, nodeInfo)
158189
} else {
159-
log.error("Don't know how to deal with $address")
190+
log.error("Queue created for a peer that we don't know from the network map: $queueName")
160191
}
161-
} else {
162-
log.error("Queue created for a peer that we don't know from the network map: $queueName")
192+
} catch (e: AddressFormatException) {
193+
log.error("Flow violation: Could not parse peer queue name as Base 58: $queueName")
194+
}
195+
196+
queueName.startsWith(SERVICES_PREFIX) -> try {
197+
val identity = CompositeKey.parseFromBase58(queueName.substring(SERVICES_PREFIX.length))
198+
val nodeInfos = networkMapCache.getNodesByAdvertisedServiceIdentityKey(identity)
199+
for (nodeInfo in nodeInfos) {
200+
maybeDeployBridgeForNode(queueName, nodeInfo)
201+
}
202+
} catch (e: AddressFormatException) {
203+
log.error("Flow violation: Could not parse service queue name as Base 58: $queueName")
163204
}
164-
} catch (e: AddressFormatException) {
165-
log.error("Flow violation: Could not parse queue name as Base 58: $queueName")
166205
}
167206
}
168207

@@ -240,26 +279,29 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
240279
tcpTransport(OUTBOUND, hostAndPort.hostText, hostAndPort.port)
241280
)
242281

243-
private fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString())
282+
private fun bridgeExists(name: String) = activeMQServer.clusterManager.bridges.containsKey(name)
244283

245284
private fun maybeDeployBridgeForAddress(address: ArtemisAddress) {
246285
if (!connectorExists(address.hostAndPort)) {
247286
addConnector(address.hostAndPort)
248287
}
249-
if (!bridgeExists(address.queueName)) {
250-
deployBridge(address)
288+
val bridgeName = bridgeNameForAddress(address)
289+
if (!bridgeExists(bridgeName)) {
290+
deployBridge(bridgeName, address)
251291
}
252292
}
253293

294+
private fun bridgeNameForAddress(address: ArtemisAddress) = "${address.queueName}-${address.hostAndPort}"
295+
254296
/**
255297
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
256298
* messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
257299
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
258300
* P2P address.
259301
*/
260-
private fun deployBridge(address: ArtemisAddress) {
302+
private fun deployBridge(bridgeName: String, address: ArtemisAddress) {
261303
activeMQServer.deployBridge(BridgeConfiguration().apply {
262-
name = address.queueName.toString()
304+
name = bridgeName
263305
queueName = address.queueName.toString()
264306
forwardingAddress = P2P_QUEUE
265307
staticConnectors = listOf(address.hostAndPort.toString())
@@ -272,9 +314,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
272314
})
273315
}
274316

275-
private fun maybeDestroyBridge(name: SimpleString) {
317+
private fun maybeDestroyBridge(name: String) {
276318
if (bridgeExists(name)) {
277-
activeMQServer.destroyBridge(name.toString())
319+
activeMQServer.destroyBridge(name)
278320
}
279321
}
280322

node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import com.google.common.util.concurrent.ListenableFuture
55
import net.corda.core.ThreadBox
66
import net.corda.core.crypto.CompositeKey
77
import net.corda.core.messaging.*
8+
import net.corda.core.node.services.PartyInfo
89
import net.corda.core.serialization.SerializedBytes
910
import net.corda.core.serialization.opaque
1011
import net.corda.core.success
@@ -96,7 +97,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
9697
/**
9798
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
9899
*/
99-
override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort)
100+
override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress.asPeer(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort)
100101

101102
private val state = ThreadBox(InnerState())
102103
private val handlers = CopyOnWriteArrayList<Handler>()
@@ -449,4 +450,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
449450
}
450451
}
451452
}
453+
454+
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
455+
return when (partyInfo) {
456+
is PartyInfo.Node -> partyInfo.node.address
457+
is PartyInfo.Service -> ArtemisMessagingComponent.ServiceAddress(partyInfo.service.identity.owningKey)
458+
}
459+
}
452460
}

node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,11 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
200200
register(ArtemisMessagingComponent.NodeAddress::class.java,
201201
read = { kryo, input ->
202202
ArtemisMessagingComponent.NodeAddress(
203-
CompositeKey.parseFromBase58(kryo.readObject(input, String::class.java)),
203+
kryo.readObject(input, SimpleString::class.java),
204204
kryo.readObject(input, HostAndPort::class.java))
205205
},
206206
write = { kryo, output, nodeAddress ->
207-
kryo.writeObject(output, nodeAddress.identity.toBase58String())
207+
kryo.writeObject(output, nodeAddress.queueName)
208208
kryo.writeObject(output, nodeAddress.hostAndPort)
209209
}
210210
)

0 commit comments

Comments
 (0)