Skip to content

Commit ad6f555

Browse files
authored
Merge pull request corda#13 from corda/aslemmer-faster-driver
node: Driver network map starts parallel with other nodes
2 parents eec16a4 + 7d9caa9 commit ad6f555

File tree

8 files changed

+104
-72
lines changed

8 files changed

+104
-72
lines changed

node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,23 @@ import net.corda.node.services.api.RegulatorService
77
import net.corda.node.services.messaging.ArtemisMessagingComponent
88
import net.corda.node.services.transactions.SimpleNotaryService
99
import org.junit.Test
10+
import java.util.concurrent.Executors
1011

1112

1213
class DriverTests {
1314
companion object {
15+
val executorService = Executors.newScheduledThreadPool(2)
16+
1417
fun nodeMustBeUp(nodeInfo: NodeInfo) {
1518
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
1619
// Check that the port is bound
17-
addressMustBeBound(hostAndPort)
20+
addressMustBeBound(executorService, hostAndPort)
1821
}
1922

2023
fun nodeMustBeDown(nodeInfo: NodeInfo) {
2124
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
2225
// Check that the port is bound
23-
addressMustNotBeBound(hostAndPort)
26+
addressMustNotBeBound(executorService, hostAndPort)
2427
}
2528
}
2629

node/src/integration-test/kotlin/net/corda/node/services/RaftValidatingNotaryServiceTests.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,15 @@ class RaftValidatingNotaryServiceTests : DriverBasedTest() {
114114
waitFor()
115115
}
116116

117-
// Pay ourselves another 10x5 pounds
118-
for (i in 1..10) {
117+
// Pay ourselves another 20x5 pounds
118+
for (i in 1..20) {
119119
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
120120
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
121121
}
122122

123-
// Artemis still dispatches some requests to the dead notary but all others should go through.
124123
val notarisationsPerNotary = HashMap<Party, Int>()
125124
notaryStateMachines.expectEvents(isStrict = false) {
126-
replicate<Pair<NodeInfo, StateMachineUpdate>>(15) {
125+
replicate<Pair<NodeInfo, StateMachineUpdate>>(30) {
127126
expect(match = { it.second is StateMachineUpdate.Added }) {
128127
val (notary, update) = it
129128
update as StateMachineUpdate.Added

node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NET
1717
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
1818
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.P2P_QUEUE
1919
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEERS_PREFIX
20+
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE
2021
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
2122
import net.corda.node.services.messaging.CordaRPCClientImpl
22-
import net.corda.node.services.messaging.NodeMessagingClient.Companion.RPC_QUEUE_REMOVALS_QUEUE
2323
import net.corda.testing.messaging.SimpleMQClient
2424
import net.corda.testing.node.NodeBasedTest
2525
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException

node/src/main/kotlin/net/corda/node/driver/Driver.kt

Lines changed: 57 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule
77
import com.google.common.net.HostAndPort
88
import com.google.common.util.concurrent.Futures
99
import com.google.common.util.concurrent.ListenableFuture
10+
import com.google.common.util.concurrent.SettableFuture
1011
import com.typesafe.config.Config
1112
import com.typesafe.config.ConfigRenderOptions
1213
import net.corda.core.*
@@ -33,14 +34,13 @@ import java.time.Instant
3334
import java.time.ZoneOffset.UTC
3435
import java.time.format.DateTimeFormatter
3536
import java.util.*
36-
import java.util.concurrent.CompletableFuture
37-
import java.util.concurrent.CountDownLatch
37+
import java.util.concurrent.Executors
3838
import java.util.concurrent.Future
39+
import java.util.concurrent.ScheduledExecutorService
40+
import java.util.concurrent.TimeUnit.MILLISECONDS
3941
import java.util.concurrent.TimeUnit.SECONDS
4042
import java.util.concurrent.TimeoutException
4143
import java.util.concurrent.atomic.AtomicInteger
42-
import kotlin.concurrent.thread
43-
import kotlin.test.assertEquals
4444

4545
/**
4646
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
@@ -67,7 +67,7 @@ interface DriverDSLExposedInterface {
6767
fun startNode(providedName: String? = null,
6868
advertisedServices: Set<ServiceInfo> = emptySet(),
6969
rpcUsers: List<User> = emptyList(),
70-
customOverrides: Map<String, Any?> = emptyMap()): Future<NodeHandle>
70+
customOverrides: Map<String, Any?> = emptyMap()): ListenableFuture<NodeHandle>
7171

7272
/**
7373
* Starts a distributed notary cluster.
@@ -198,8 +198,8 @@ fun getTimestampAsDirectoryName(): String {
198198
return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now())
199199
}
200200

201-
fun addressMustBeBound(hostAndPort: HostAndPort) {
202-
poll("address $hostAndPort to bind") {
201+
fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort): ListenableFuture<Unit> {
202+
return poll(executorService, "address $hostAndPort to bind") {
203203
try {
204204
Socket(hostAndPort.hostText, hostAndPort.port).close()
205205
Unit
@@ -209,8 +209,8 @@ fun addressMustBeBound(hostAndPort: HostAndPort) {
209209
}
210210
}
211211

212-
fun addressMustNotBeBound(hostAndPort: HostAndPort) {
213-
poll("address $hostAndPort to unbind") {
212+
fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort): ListenableFuture<Unit> {
213+
return poll(executorService, "address $hostAndPort to unbind") {
214214
try {
215215
Socket(hostAndPort.hostText, hostAndPort.port).close()
216216
null
@@ -220,18 +220,36 @@ fun addressMustNotBeBound(hostAndPort: HostAndPort) {
220220
}
221221
}
222222

223-
fun <A> poll(pollName: String, pollIntervalMs: Long = 500, warnCount: Int = 120, f: () -> A?): A {
223+
private fun <A> poll(
224+
executorService: ScheduledExecutorService,
225+
pollName: String,
226+
pollIntervalMs: Long = 500,
227+
warnCount: Int = 120,
228+
check: () -> A?
229+
): ListenableFuture<A> {
230+
val initialResult = check()
231+
val resultFuture = SettableFuture.create<A>()
232+
if (initialResult != null) {
233+
resultFuture.set(initialResult)
234+
return resultFuture
235+
}
224236
var counter = 0
225-
var result = f()
226-
while (result == null) {
227-
if (counter == warnCount) {
228-
log.warn("Been polling $pollName for ${pollIntervalMs * warnCount / 1000.0} seconds...")
229-
}
230-
counter = (counter % warnCount) + 1
231-
Thread.sleep(pollIntervalMs)
232-
result = f()
237+
fun schedulePoll() {
238+
executorService.schedule({
239+
counter++
240+
if (counter == warnCount) {
241+
log.warn("Been polling $pollName for ${pollIntervalMs * warnCount / 1000.0} seconds...")
242+
}
243+
val result = check()
244+
if (result == null) {
245+
schedulePoll()
246+
} else {
247+
resultFuture.set(result)
248+
}
249+
}, pollIntervalMs, MILLISECONDS)
233250
}
234-
return result
251+
schedulePoll()
252+
return resultFuture
235253
}
236254

237255
open class DriverDSL(
@@ -241,13 +259,13 @@ open class DriverDSL(
241259
val useTestClock: Boolean,
242260
val isDebug: Boolean
243261
) : DriverDSLInternalInterface {
262+
private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
244263
private val networkMapName = "NetworkMapService"
245264
private val networkMapAddress = portAllocation.nextHostAndPort()
246265

247266
class State {
248267
val registeredProcesses = LinkedList<Process>()
249268
val clients = LinkedList<NodeMessagingClient>()
250-
var localServer: ArtemisMessagingServer? = null
251269
}
252270

253271
private val state = ThreadBox(State())
@@ -276,11 +294,10 @@ open class DriverDSL(
276294
clients.forEach {
277295
it.stop()
278296
}
279-
localServer?.stop()
280297
registeredProcesses.forEach(Process::destroy)
281298
}
282299
/** Wait 5 seconds, then [Process.destroyForcibly] */
283-
val finishedFuture = future {
300+
val finishedFuture = executorService.submit {
284301
waitForAllNodesToFinish()
285302
}
286303
try {
@@ -295,10 +312,8 @@ open class DriverDSL(
295312
}
296313

297314
// Check that we shut down properly
298-
state.locked {
299-
localServer?.run { addressMustNotBeBound(myHostPort) }
300-
}
301-
addressMustNotBeBound(networkMapAddress)
315+
addressMustNotBeBound(executorService, networkMapAddress).get()
316+
executorService.shutdown()
302317
}
303318

304319
private fun queryNodeInfo(webAddress: HostAndPort): NodeInfo? {
@@ -353,10 +368,9 @@ open class DriverDSL(
353368
configOverrides = configOverrides
354369
)
355370

356-
return future {
357-
val process = DriverDSL.startNode(FullNodeConfiguration(config), quasarJarPath, debugPort)
358-
registerProcess(process)
359-
NodeHandle(queryNodeInfo(apiAddress)!!, config, process)
371+
return startNode(executorService, FullNodeConfiguration(config), quasarJarPath, debugPort).map {
372+
registerProcess(it)
373+
NodeHandle(queryNodeInfo(apiAddress)!!, config, it)
360374
}
361375
}
362376

@@ -395,7 +409,7 @@ open class DriverDSL(
395409
startNetworkMapService()
396410
}
397411

398-
private fun startNetworkMapService() {
412+
private fun startNetworkMapService(): ListenableFuture<Unit> {
399413
val apiAddress = portAllocation.nextHostAndPort()
400414
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
401415

@@ -414,7 +428,9 @@ open class DriverDSL(
414428
)
415429

416430
log.info("Starting network-map-service")
417-
registerProcess(startNode(FullNodeConfiguration(config), quasarJarPath, debugPort))
431+
return startNode(executorService, FullNodeConfiguration(config), quasarJarPath, debugPort).map {
432+
registerProcess(it)
433+
}
418434
}
419435

420436
companion object {
@@ -428,10 +444,11 @@ open class DriverDSL(
428444
fun <A> pickA(array: Array<A>): A = array[Math.abs(Random().nextInt()) % array.size]
429445

430446
private fun startNode(
447+
executorService: ScheduledExecutorService,
431448
nodeConf: FullNodeConfiguration,
432449
quasarJarPath: String,
433450
debugPort: Int?
434-
): Process {
451+
): ListenableFuture<Process> {
435452
// Write node.conf
436453
writeConfig(nodeConf.basedir, "node.conf", nodeConf.config)
437454

@@ -454,13 +471,13 @@ open class DriverDSL(
454471
builder.inheritIO()
455472
builder.directory(nodeConf.basedir.toFile())
456473
val process = builder.start()
457-
addressMustBeBound(nodeConf.artemisAddress)
458-
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
459-
// the handlers for the advertised services are not yet registered. A hacky workaround is that we wait for
460-
// the web api address to be bound as well, as that starts after the services. Needs rethinking.
461-
addressMustBeBound(nodeConf.webAddress)
462-
463-
return process
474+
return Futures.allAsList(
475+
addressMustBeBound(executorService, nodeConf.artemisAddress),
476+
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
477+
// the handlers for the advertised services are not yet registered. A hacky workaround is that we wait for
478+
// the web api address to be bound as well, as that starts after the services. Needs rethinking.
479+
addressMustBeBound(executorService, nodeConf.webAddress)
480+
).map { process }
464481
}
465482
}
466483
}
@@ -469,4 +486,3 @@ fun writeConfig(path: Path, filename: String, config: Config) {
469486
path.toFile().mkdirs()
470487
File("$path/$filename").writeText(config.root().render(ConfigRenderOptions.concise()))
471488
}
472-

node/src/main/kotlin/net/corda/node/internal/Node.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
142142
runOnStop += Runnable { messageBroker?.stop() }
143143
start()
144144
if (networkMapService is NetworkMapAddress) {
145-
bridgeToNetworkMapService(networkMapService)
145+
deployBridgeIfAbsent(networkMapService.queueName, networkMapService.hostAndPort)
146146
}
147147
}
148148

node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
3939
const val CLIENTS_PREFIX = "clients."
4040
const val P2P_QUEUE = "p2p.inbound"
4141
const val RPC_REQUESTS_QUEUE = "rpc.requests"
42+
const val RPC_QUEUE_REMOVALS_QUEUE = "rpc.qremovals"
4243
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
4344

4445
@JvmStatic
45-
val NETWORK_MAP_ADDRESS = SimpleString("${INTERNAL_PREFIX}networkmap")
46+
val NETWORK_MAP_ADDRESS = "${INTERNAL_PREFIX}networkmap"
4647

4748
/**
4849
* Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should
@@ -57,16 +58,16 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
5758
}
5859
}
5960

60-
protected interface ArtemisAddress : MessageRecipients {
61+
interface ArtemisAddress : MessageRecipients {
6162
val queueName: SimpleString
6263
}
6364

64-
protected interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient {
65+
interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient {
6566
val hostAndPort: HostAndPort
6667
}
6768

6869
data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisPeerAddress {
69-
override val queueName: SimpleString get() = NETWORK_MAP_ADDRESS
70+
override val queueName = SimpleString(NETWORK_MAP_ADDRESS)
7071
}
7172

7273
/**

node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.
2525
import org.apache.activemq.artemis.api.core.SimpleString
2626
import org.apache.activemq.artemis.core.config.BridgeConfiguration
2727
import org.apache.activemq.artemis.core.config.Configuration
28+
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
2829
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
2930
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
3031
import org.apache.activemq.artemis.core.security.Role
@@ -105,14 +106,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
105106
running = false
106107
}
107108

108-
fun bridgeToNetworkMapService(networkMapService: NetworkMapAddress) {
109-
val query = activeMQServer.queueQuery(NETWORK_MAP_ADDRESS)
110-
if (!query.isExists) {
111-
activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false)
112-
}
113-
deployBridgeIfAbsent(networkMapService.queueName, networkMapService.hostAndPort)
114-
}
115-
116109
/**
117110
* The bridge will be created automatically when the queues are created, however, this is not the case when the network map restarted.
118111
* The queues are restored from the journal, and because the queues are added before we register the callback handler, this method will never get called for existing queues.
@@ -225,6 +218,36 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
225218
// password is changed from the default (as warned in the docs). Since we don't need this feature we turn it off
226219
// by having its password be an unknown securely random 128-bit value.
227220
clusterPassword = BigInteger(128, newSecureRandom()).toString(16)
221+
222+
queueConfigurations.addAll(listOf(
223+
CoreQueueConfiguration().apply {
224+
address = NETWORK_MAP_ADDRESS
225+
name = NETWORK_MAP_ADDRESS
226+
isDurable = true
227+
},
228+
CoreQueueConfiguration().apply {
229+
address = P2P_QUEUE
230+
name = P2P_QUEUE
231+
isDurable = true
232+
},
233+
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
234+
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
235+
// but these queues are not worth persisting.
236+
CoreQueueConfiguration().apply {
237+
name = RPC_REQUESTS_QUEUE
238+
address = RPC_REQUESTS_QUEUE
239+
isDurable = false
240+
},
241+
// The custom name for the queue is intentional - we may wish other things to subscribe to the
242+
// NOTIFICATIONS_ADDRESS with different filters in future
243+
CoreQueueConfiguration().apply {
244+
name = RPC_QUEUE_REMOVALS_QUEUE
245+
address = NOTIFICATIONS_ADDRESS
246+
isDurable = false
247+
filterString = "_AMQ_NotifType = 1"
248+
}
249+
))
250+
228251
configureAddressSecurity()
229252
}
230253

@@ -284,7 +307,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
284307

285308
private fun bridgeExists(name: String) = activeMQServer.clusterManager.bridges.containsKey(name)
286309

287-
private fun deployBridgeIfAbsent(queueName: SimpleString, hostAndPort: HostAndPort) {
310+
fun deployBridgeIfAbsent(queueName: SimpleString, hostAndPort: HostAndPort) {
288311
if (!connectorExists(hostAndPort)) {
289312
addConnector(hostAndPort)
290313
}

0 commit comments

Comments
 (0)