@@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule
77import com.google.common.net.HostAndPort
88import com.google.common.util.concurrent.Futures
99import com.google.common.util.concurrent.ListenableFuture
10+ import com.google.common.util.concurrent.SettableFuture
1011import com.typesafe.config.Config
1112import com.typesafe.config.ConfigRenderOptions
1213import net.corda.core.*
@@ -33,14 +34,13 @@ import java.time.Instant
3334import java.time.ZoneOffset.UTC
3435import java.time.format.DateTimeFormatter
3536import java.util.*
36- import java.util.concurrent.CompletableFuture
37- import java.util.concurrent.CountDownLatch
37+ import java.util.concurrent.Executors
3838import java.util.concurrent.Future
39+ import java.util.concurrent.ScheduledExecutorService
40+ import java.util.concurrent.TimeUnit.MILLISECONDS
3941import java.util.concurrent.TimeUnit.SECONDS
4042import java.util.concurrent.TimeoutException
4143import java.util.concurrent.atomic.AtomicInteger
42- import kotlin.concurrent.thread
43- import kotlin.test.assertEquals
4444
4545/* *
4646 * This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
@@ -67,7 +67,7 @@ interface DriverDSLExposedInterface {
6767 fun startNode (providedName : String? = null,
6868 advertisedServices : Set <ServiceInfo > = emptySet(),
6969 rpcUsers : List <User > = emptyList(),
70- customOverrides : Map <String , Any ?> = emptyMap()): Future <NodeHandle >
70+ customOverrides : Map <String , Any ?> = emptyMap()): ListenableFuture <NodeHandle >
7171
7272 /* *
7373 * Starts a distributed notary cluster.
@@ -198,8 +198,8 @@ fun getTimestampAsDirectoryName(): String {
198198 return DateTimeFormatter .ofPattern(" yyyyMMddHHmmss" ).withZone(UTC ).format(Instant .now())
199199}
200200
201- fun addressMustBeBound (hostAndPort : HostAndPort ) {
202- poll(" address $hostAndPort to bind" ) {
201+ fun addressMustBeBound (executorService : ScheduledExecutorService , hostAndPort : HostAndPort ): ListenableFuture < Unit > {
202+ return poll(executorService, " address $hostAndPort to bind" ) {
203203 try {
204204 Socket (hostAndPort.hostText, hostAndPort.port).close()
205205 Unit
@@ -209,8 +209,8 @@ fun addressMustBeBound(hostAndPort: HostAndPort) {
209209 }
210210}
211211
212- fun addressMustNotBeBound (hostAndPort : HostAndPort ) {
213- poll(" address $hostAndPort to unbind" ) {
212+ fun addressMustNotBeBound (executorService : ScheduledExecutorService , hostAndPort : HostAndPort ): ListenableFuture < Unit > {
213+ return poll(executorService, " address $hostAndPort to unbind" ) {
214214 try {
215215 Socket (hostAndPort.hostText, hostAndPort.port).close()
216216 null
@@ -220,18 +220,36 @@ fun addressMustNotBeBound(hostAndPort: HostAndPort) {
220220 }
221221}
222222
223- fun <A > poll (pollName : String , pollIntervalMs : Long = 500, warnCount : Int = 120, f : () -> A ? ): A {
223+ private fun <A > poll (
224+ executorService : ScheduledExecutorService ,
225+ pollName : String ,
226+ pollIntervalMs : Long = 500,
227+ warnCount : Int = 120,
228+ check : () -> A ?
229+ ): ListenableFuture <A > {
230+ val initialResult = check()
231+ val resultFuture = SettableFuture .create<A >()
232+ if (initialResult != null ) {
233+ resultFuture.set(initialResult)
234+ return resultFuture
235+ }
224236 var counter = 0
225- var result = f()
226- while (result == null ) {
227- if (counter == warnCount) {
228- log.warn(" Been polling $pollName for ${pollIntervalMs * warnCount / 1000.0 } seconds..." )
229- }
230- counter = (counter % warnCount) + 1
231- Thread .sleep(pollIntervalMs)
232- result = f()
237+ fun schedulePoll () {
238+ executorService.schedule({
239+ counter++
240+ if (counter == warnCount) {
241+ log.warn(" Been polling $pollName for ${pollIntervalMs * warnCount / 1000.0 } seconds..." )
242+ }
243+ val result = check()
244+ if (result == null ) {
245+ schedulePoll()
246+ } else {
247+ resultFuture.set(result)
248+ }
249+ }, pollIntervalMs, MILLISECONDS )
233250 }
234- return result
251+ schedulePoll()
252+ return resultFuture
235253}
236254
237255open class DriverDSL (
@@ -241,13 +259,13 @@ open class DriverDSL(
241259 val useTestClock : Boolean ,
242260 val isDebug : Boolean
243261) : DriverDSLInternalInterface {
262+ private val executorService: ScheduledExecutorService = Executors .newScheduledThreadPool(2 )
244263 private val networkMapName = " NetworkMapService"
245264 private val networkMapAddress = portAllocation.nextHostAndPort()
246265
247266 class State {
248267 val registeredProcesses = LinkedList <Process >()
249268 val clients = LinkedList <NodeMessagingClient >()
250- var localServer: ArtemisMessagingServer ? = null
251269 }
252270
253271 private val state = ThreadBox (State ())
@@ -276,11 +294,10 @@ open class DriverDSL(
276294 clients.forEach {
277295 it.stop()
278296 }
279- localServer?.stop()
280297 registeredProcesses.forEach(Process ::destroy)
281298 }
282299 /* * Wait 5 seconds, then [Process.destroyForcibly] */
283- val finishedFuture = future {
300+ val finishedFuture = executorService.submit {
284301 waitForAllNodesToFinish()
285302 }
286303 try {
@@ -295,10 +312,8 @@ open class DriverDSL(
295312 }
296313
297314 // Check that we shut down properly
298- state.locked {
299- localServer?.run { addressMustNotBeBound(myHostPort) }
300- }
301- addressMustNotBeBound(networkMapAddress)
315+ addressMustNotBeBound(executorService, networkMapAddress).get()
316+ executorService.shutdown()
302317 }
303318
304319 private fun queryNodeInfo (webAddress : HostAndPort ): NodeInfo ? {
@@ -353,10 +368,9 @@ open class DriverDSL(
353368 configOverrides = configOverrides
354369 )
355370
356- return future {
357- val process = DriverDSL .startNode(FullNodeConfiguration (config), quasarJarPath, debugPort)
358- registerProcess(process)
359- NodeHandle (queryNodeInfo(apiAddress)!! , config, process)
371+ return startNode(executorService, FullNodeConfiguration (config), quasarJarPath, debugPort).map {
372+ registerProcess(it)
373+ NodeHandle (queryNodeInfo(apiAddress)!! , config, it)
360374 }
361375 }
362376
@@ -395,7 +409,7 @@ open class DriverDSL(
395409 startNetworkMapService()
396410 }
397411
398- private fun startNetworkMapService () {
412+ private fun startNetworkMapService (): ListenableFuture < Unit > {
399413 val apiAddress = portAllocation.nextHostAndPort()
400414 val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
401415
@@ -414,7 +428,9 @@ open class DriverDSL(
414428 )
415429
416430 log.info(" Starting network-map-service" )
417- registerProcess(startNode(FullNodeConfiguration (config), quasarJarPath, debugPort))
431+ return startNode(executorService, FullNodeConfiguration (config), quasarJarPath, debugPort).map {
432+ registerProcess(it)
433+ }
418434 }
419435
420436 companion object {
@@ -428,10 +444,11 @@ open class DriverDSL(
428444 fun <A > pickA (array : Array <A >): A = array[Math .abs(Random ().nextInt()) % array.size]
429445
430446 private fun startNode (
447+ executorService : ScheduledExecutorService ,
431448 nodeConf : FullNodeConfiguration ,
432449 quasarJarPath : String ,
433450 debugPort : Int?
434- ): Process {
451+ ): ListenableFuture < Process > {
435452 // Write node.conf
436453 writeConfig(nodeConf.basedir, " node.conf" , nodeConf.config)
437454
@@ -454,13 +471,13 @@ open class DriverDSL(
454471 builder.inheritIO()
455472 builder.directory(nodeConf.basedir.toFile())
456473 val process = builder.start()
457- addressMustBeBound(nodeConf.artemisAddress)
458- // TODO There is a race condition here. Even though the messaging address is bound it may be the case that
459- // the handlers for the advertised services are not yet registered. A hacky workaround is that we wait for
460- // the web api address to be bound as well, as that starts after the services. Needs rethinking.
461- addressMustBeBound(nodeConf.webAddress)
462-
463- return process
474+ return Futures .allAsList(
475+ addressMustBeBound(executorService, nodeConf.artemisAddress),
476+ // TODO There is a race condition here. Even though the messaging address is bound it may be the case that
477+ // the handlers for the advertised services are not yet registered. A hacky workaround is that we wait for
478+ // the web api address to be bound as well, as that starts after the services. Needs rethinking.
479+ addressMustBeBound(executorService, nodeConf.webAddress)
480+ ).map { process }
464481 }
465482 }
466483}
@@ -469,4 +486,3 @@ fun writeConfig(path: Path, filename: String, config: Config) {
469486 path.toFile().mkdirs()
470487 File (" $path /$filename " ).writeText(config.root().render(ConfigRenderOptions .concise()))
471488}
472-
0 commit comments