@@ -60,8 +60,10 @@ import net.corda.node.services.vault.NodeVaultService
6060import net.corda.node.services.vault.VaultSoftLockManager
6161import net.corda.node.shell.InteractiveShell
6262import net.corda.node.utilities.AffinityExecutor
63+ import net.corda.node.utilities.NamedThreadFactory
6364import net.corda.nodeapi.internal.DevIdentityGenerator
6465import net.corda.nodeapi.internal.NodeInfoAndSigned
66+ import net.corda.nodeapi.internal.SignedNodeInfo
6567import net.corda.nodeapi.internal.crypto.X509Utilities
6668import net.corda.nodeapi.internal.persistence.CordaPersistence
6769import net.corda.nodeapi.internal.persistence.DatabaseConfig
@@ -84,6 +86,8 @@ import java.time.Duration
8486import java.util.*
8587import java.util.concurrent.ConcurrentHashMap
8688import java.util.concurrent.ExecutorService
89+ import java.util.concurrent.Executors
90+ import java.util.concurrent.TimeUnit
8791import java.util.concurrent.TimeUnit.SECONDS
8892import kotlin.collections.set
8993import kotlin.reflect.KClass
@@ -140,7 +144,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
140144 protected val runOnStop = ArrayList < () -> Any? > ()
141145 private val _nodeReadyFuture = openFuture<Unit >()
142146 protected var networkMapClient: NetworkMapClient ? = null
143- protected lateinit var networkMapUpdater: NetworkMapUpdater
147+ private lateinit var networkMapUpdater: NetworkMapUpdater
144148 lateinit var securityManager: RPCSecurityManager
145149
146150 /* * Completes once the node has successfully registered with the network map service
@@ -174,18 +178,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
174178 initCertificate()
175179 val schemaService = NodeSchemaService (cordappLoader.cordappSchemas)
176180 val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null )
177- return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)) { database ->
178- // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like
179- // a code smell.
180- val persistentNetworkMapCache = PersistentNetworkMapCache (database, notaries = emptyList())
181- persistentNetworkMapCache.start()
182- val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
183- val nodeInfoAndSigned = NodeInfoAndSigned (nodeInfo) { publicKey, serialised ->
184- val privateKey = keyPairs.single { it.public == publicKey }.private
185- privateKey.sign(serialised.bytes)
186- }
187- NodeInfoWatcher .saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
188- nodeInfo
181+ return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)){ database ->
182+ // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks// like// a design smell.
183+ val persistentNetworkMapCache = PersistentNetworkMapCache (database, notaries = emptyList())
184+ persistentNetworkMapCache.start()
185+ val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null ,identity, identityKeyPair)
186+
187+ nodeInfo
188+
189189 }
190190 }
191191
@@ -196,15 +196,18 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
196196 val schemaService = NodeSchemaService (cordappLoader.cordappSchemas, configuration.notary != null )
197197 val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null )
198198 val identityService = makeIdentityService(identity.certificate)
199+
199200 networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient (it, identityService.trustRoot) }
201+
200202 val networkParameters = NetworkParametersReader (identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters
201203 check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
202204 " Node's platform version is lower than network's required minimumPlatformVersion"
203205 }
206+
204207 // Do all of this in a database transaction so anything that might need a connection has one.
205208 val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
206209 val networkMapCache = NetworkMapCacheImpl (PersistentNetworkMapCache (database, networkParameters.notaries).start(), identityService)
207- val (keyPairs, nodeInfo) = initNodeInfo (networkMapCache, identity, identityKeyPair)
210+ val (keyPairs, nodeInfo) = updateNodeInfo (networkMapCache, networkMapClient , identity, identityKeyPair)
208211 identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
209212 val metrics = MetricRegistry ()
210213 val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
@@ -239,14 +242,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
239242 flowLogicRefFactory = flowLogicRefFactory,
240243 drainingModePollPeriod = configuration.drainingModePollPeriod,
241244 nodeProperties = nodeProperties)
242- if (serverThread is ExecutorService ) {
245+
246+ (serverThread as ? ExecutorService )?.let {
243247 runOnStop + = {
244248 // We wait here, even though any in-flight messages should have been drained away because the
245249 // server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
246250 // arbitrary and might be inappropriate.
247- MoreExecutors .shutdownAndAwaitTermination(serverThread as ExecutorService , 50 , SECONDS )
251+ MoreExecutors .shutdownAndAwaitTermination(it , 50 , SECONDS )
248252 }
249253 }
254+
250255 makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory)
251256 val rpcOps = makeRPCOps(flowStarter, database, smm)
252257 startMessagingService(rpcOps)
@@ -258,22 +263,17 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
258263 startShell(rpcOps)
259264 Pair (StartedNodeImpl (this , _services , nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
260265 }
266+
261267 networkMapUpdater = NetworkMapUpdater (services.networkMapCache,
262268 NodeInfoWatcher (configuration.baseDirectory, getRxIoScheduler(), Duration .ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
263269 networkMapClient,
264270 networkParameters.serialize().hash,
265271 configuration.baseDirectory)
266272 runOnStop + = networkMapUpdater::close
267273
268- log.info(" Node-info for this node: ${services.myInfo} " )
269-
270- val nodeInfoAndSigned = NodeInfoAndSigned (services.myInfo) { publicKey, serialised ->
271- services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
272- }
273- networkMapUpdater.updateNodeInfo(nodeInfoAndSigned)
274274 networkMapUpdater.subscribeToNetworkMap()
275275
276- // If we successfully loaded network data from database, we set this future to Unit.
276+ // If we successfully loaded network data from database, we set this future to Unit.
277277 _nodeReadyFuture .captureLater(services.networkMapCache.nodeReady.map { Unit })
278278
279279 return startedImpl.apply {
@@ -297,9 +297,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
297297 InteractiveShell .startShell(configuration, rpcOps, securityManager, _services .identityService, _services .database)
298298 }
299299
300- private fun initNodeInfo (networkMapCache : NetworkMapCacheBaseInternal ,
301- identity : PartyAndCertificate ,
302- identityKeyPair : KeyPair ): Pair <Set <KeyPair >, NodeInfo> {
300+ private fun updateNodeInfo (networkMapCache : NetworkMapCacheBaseInternal ,
301+ networkMapClient : NetworkMapClient ? ,
302+ identity : PartyAndCertificate ,
303+ identityKeyPair : KeyPair ): Pair <Set <KeyPair >, NodeInfo> {
303304 val keyPairs = mutableSetOf (identityKeyPair)
304305
305306 myNotaryIdentity = configuration.notary?.let {
@@ -313,7 +314,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
313314 }
314315 }
315316
316- val nodeInfoWithBlankSerial = NodeInfo (
317+ val potentialNodeInfo = NodeInfo (
317318 myAddresses(),
318319 setOf (identity, myNotaryIdentity).filterNotNull(),
319320 versionInfo.platformVersion,
@@ -322,16 +323,51 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
322323
323324 val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name)
324325
325- val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0 )) {
326+ val nodeInfo = if (potentialNodeInfo == nodeInfoFromDb?.copy(serial = 0 )) {
326327 // The node info hasn't changed. We use the one from the database to preserve the serial.
328+ log.debug(" Node-info hasn't changed" )
327329 nodeInfoFromDb
328330 } else {
329- nodeInfoWithBlankSerial.copy(serial = platformClock.millis())
331+ log.info(" Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb " )
332+ val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis())
333+ networkMapCache.addNode(newNodeInfo)
334+ log.info(" New node-info: $newNodeInfo " )
335+ newNodeInfo
336+ }
337+
338+ val nodeInfoAndSigned = NodeInfoAndSigned (nodeInfo) { publicKey, serialised ->
339+ val privateKey = keyPairs.single { it.public == publicKey }.private
340+ privateKey.sign(serialised.bytes)
341+ }
342+
343+ // Write the node-info file even if nothing's changed, just in case the file has been deleted.
344+ NodeInfoWatcher .saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
345+
346+ if (networkMapClient != null ) {
347+ tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
330348 }
349+
331350 return Pair (keyPairs, nodeInfo)
332351 }
333352
353+ private fun tryPublishNodeInfoAsync (signedNodeInfo : SignedNodeInfo , networkMapClient : NetworkMapClient ) {
354+ val executor = Executors .newSingleThreadScheduledExecutor(NamedThreadFactory (" Network Map Updater" , Executors .defaultThreadFactory()))
355+
356+ executor.submit(object : Runnable {
357+ override fun run () {
358+ try {
359+ networkMapClient.publish(signedNodeInfo)
360+ } catch (t: Throwable ) {
361+ log.warn(" Error encountered while publishing node info, will retry again" , t)
362+ // TODO: Exponential backoff?
363+ executor.schedule(this , 1 , TimeUnit .MINUTES )
364+ }
365+ }
366+ })
367+ }
368+
334369 protected abstract fun myAddresses (): List <NetworkHostAndPort >
370+
335371 protected open fun makeStateMachineManager (database : CordaPersistence ): StateMachineManager {
336372 return StateMachineManagerImpl (
337373 services,
0 commit comments