Skip to content

Commit 6ae0142

Browse files
shamsasarisollecitom
authored andcommitted
CORDA-1276: Cleaned up creation of node-info object on node start and when using --just-generate-node-info (#2912)
(cherry picked from commit 65ff214)
1 parent 33af80a commit 6ae0142

File tree

4 files changed

+159
-112
lines changed

4 files changed

+159
-112
lines changed

node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt

Lines changed: 65 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ import net.corda.node.services.vault.NodeVaultService
6060
import net.corda.node.services.vault.VaultSoftLockManager
6161
import net.corda.node.shell.InteractiveShell
6262
import net.corda.node.utilities.AffinityExecutor
63+
import net.corda.node.utilities.NamedThreadFactory
6364
import net.corda.nodeapi.internal.DevIdentityGenerator
6465
import net.corda.nodeapi.internal.NodeInfoAndSigned
66+
import net.corda.nodeapi.internal.SignedNodeInfo
6567
import net.corda.nodeapi.internal.crypto.X509Utilities
6668
import net.corda.nodeapi.internal.persistence.CordaPersistence
6769
import net.corda.nodeapi.internal.persistence.DatabaseConfig
@@ -84,6 +86,8 @@ import java.time.Duration
8486
import java.util.*
8587
import java.util.concurrent.ConcurrentHashMap
8688
import java.util.concurrent.ExecutorService
89+
import java.util.concurrent.Executors
90+
import java.util.concurrent.TimeUnit
8791
import java.util.concurrent.TimeUnit.SECONDS
8892
import kotlin.collections.set
8993
import kotlin.reflect.KClass
@@ -140,7 +144,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
140144
protected val runOnStop = ArrayList<() -> Any?>()
141145
private val _nodeReadyFuture = openFuture<Unit>()
142146
protected var networkMapClient: NetworkMapClient? = null
143-
protected lateinit var networkMapUpdater: NetworkMapUpdater
147+
private lateinit var networkMapUpdater: NetworkMapUpdater
144148
lateinit var securityManager: RPCSecurityManager
145149

146150
/** Completes once the node has successfully registered with the network map service
@@ -174,18 +178,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
174178
initCertificate()
175179
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
176180
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
177-
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)) { database ->
178-
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like
179-
// a code smell.
180-
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
181-
persistentNetworkMapCache.start()
182-
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
183-
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
184-
val privateKey = keyPairs.single { it.public == publicKey }.private
185-
privateKey.sign(serialised.bytes)
186-
}
187-
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
188-
nodeInfo
181+
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)){ database ->
182+
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks// like// a design smell.
183+
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
184+
persistentNetworkMapCache.start()
185+
val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null,identity, identityKeyPair)
186+
187+
nodeInfo
188+
189189
}
190190
}
191191

@@ -196,15 +196,18 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
196196
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null)
197197
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
198198
val identityService = makeIdentityService(identity.certificate)
199+
199200
networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) }
201+
200202
val networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters
201203
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
202204
"Node's platform version is lower than network's required minimumPlatformVersion"
203205
}
206+
204207
// Do all of this in a database transaction so anything that might need a connection has one.
205208
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
206209
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
207-
val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair)
210+
val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair)
208211
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
209212
val metrics = MetricRegistry()
210213
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
@@ -239,14 +242,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
239242
flowLogicRefFactory = flowLogicRefFactory,
240243
drainingModePollPeriod = configuration.drainingModePollPeriod,
241244
nodeProperties = nodeProperties)
242-
if (serverThread is ExecutorService) {
245+
246+
(serverThread as? ExecutorService)?.let {
243247
runOnStop += {
244248
// We wait here, even though any in-flight messages should have been drained away because the
245249
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
246250
// arbitrary and might be inappropriate.
247-
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS)
251+
MoreExecutors.shutdownAndAwaitTermination(it, 50, SECONDS)
248252
}
249253
}
254+
250255
makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory)
251256
val rpcOps = makeRPCOps(flowStarter, database, smm)
252257
startMessagingService(rpcOps)
@@ -258,22 +263,17 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
258263
startShell(rpcOps)
259264
Pair(StartedNodeImpl(this, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
260265
}
266+
261267
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
262268
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
263269
networkMapClient,
264270
networkParameters.serialize().hash,
265271
configuration.baseDirectory)
266272
runOnStop += networkMapUpdater::close
267273

268-
log.info("Node-info for this node: ${services.myInfo}")
269-
270-
val nodeInfoAndSigned = NodeInfoAndSigned(services.myInfo) { publicKey, serialised ->
271-
services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey()
272-
}
273-
networkMapUpdater.updateNodeInfo(nodeInfoAndSigned)
274274
networkMapUpdater.subscribeToNetworkMap()
275275

276-
// If we successfully loaded network data from database, we set this future to Unit.
276+
// If we successfully loaded network data from database, we set this future to Unit.
277277
_nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit })
278278

279279
return startedImpl.apply {
@@ -297,9 +297,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
297297
InteractiveShell.startShell(configuration, rpcOps, securityManager, _services.identityService, _services.database)
298298
}
299299

300-
private fun initNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
301-
identity: PartyAndCertificate,
302-
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> {
300+
private fun updateNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
301+
networkMapClient: NetworkMapClient?,
302+
identity: PartyAndCertificate,
303+
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> {
303304
val keyPairs = mutableSetOf(identityKeyPair)
304305

305306
myNotaryIdentity = configuration.notary?.let {
@@ -313,7 +314,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
313314
}
314315
}
315316

316-
val nodeInfoWithBlankSerial = NodeInfo(
317+
val potentialNodeInfo = NodeInfo(
317318
myAddresses(),
318319
setOf(identity, myNotaryIdentity).filterNotNull(),
319320
versionInfo.platformVersion,
@@ -322,16 +323,51 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
322323

323324
val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name)
324325

325-
val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) {
326+
val nodeInfo = if (potentialNodeInfo == nodeInfoFromDb?.copy(serial = 0)) {
326327
// The node info hasn't changed. We use the one from the database to preserve the serial.
328+
log.debug("Node-info hasn't changed")
327329
nodeInfoFromDb
328330
} else {
329-
nodeInfoWithBlankSerial.copy(serial = platformClock.millis())
331+
log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb")
332+
val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis())
333+
networkMapCache.addNode(newNodeInfo)
334+
log.info("New node-info: $newNodeInfo")
335+
newNodeInfo
336+
}
337+
338+
val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised ->
339+
val privateKey = keyPairs.single { it.public == publicKey }.private
340+
privateKey.sign(serialised.bytes)
341+
}
342+
343+
// Write the node-info file even if nothing's changed, just in case the file has been deleted.
344+
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
345+
346+
if (networkMapClient != null) {
347+
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
330348
}
349+
331350
return Pair(keyPairs, nodeInfo)
332351
}
333352

353+
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
354+
val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory()))
355+
356+
executor.submit(object : Runnable {
357+
override fun run() {
358+
try {
359+
networkMapClient.publish(signedNodeInfo)
360+
} catch (t: Throwable) {
361+
log.warn("Error encountered while publishing node info, will retry again", t)
362+
// TODO: Exponential backoff?
363+
executor.schedule(this, 1, TimeUnit.MINUTES)
364+
}
365+
}
366+
})
367+
}
368+
334369
protected abstract fun myAddresses(): List<NetworkHostAndPort>
370+
335371
protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager {
336372
return StateMachineManagerImpl(
337373
services,

node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt

Lines changed: 20 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,15 @@ import net.corda.core.utilities.minutes
1515
import net.corda.node.services.api.NetworkMapCacheInternal
1616
import net.corda.node.utilities.NamedThreadFactory
1717
import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
18-
import net.corda.nodeapi.internal.NodeInfoAndSigned
19-
import net.corda.nodeapi.internal.SignedNodeInfo
20-
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
21-
import net.corda.nodeapi.internal.network.ParametersUpdate
22-
import net.corda.nodeapi.internal.network.SignedNetworkParameters
23-
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
18+
import net.corda.nodeapi.internal.network.*
2419
import rx.Subscription
2520
import rx.subjects.PublishSubject
2621
import java.nio.file.Path
2722
import java.nio.file.StandardCopyOption
2823
import java.time.Duration
2924
import java.util.concurrent.Executors
3025
import java.util.concurrent.TimeUnit
26+
import kotlin.system.exitProcess
3127

3228
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
3329
private val fileWatcher: NodeInfoWatcher,
@@ -57,36 +53,6 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
5753
return DataFeed(currentUpdateInfo, parametersUpdatesTrack)
5854
}
5955

60-
fun updateNodeInfo(nodeInfoAndSigned: NodeInfoAndSigned) {
61-
// TODO We've already done this lookup and check in AbstractNode.initNodeInfo
62-
val oldNodeInfo = networkMapCache.getNodeByLegalIdentity(nodeInfoAndSigned.nodeInfo.legalIdentities[0])
63-
// Compare node info without timestamp.
64-
if (nodeInfoAndSigned.nodeInfo.copy(serial = 0L) == oldNodeInfo?.copy(serial = 0L)) return
65-
66-
logger.info("Node-info has changed so submitting update. Old node-info was $oldNodeInfo")
67-
// Only publish and write to disk if there are changes to the node info.
68-
networkMapCache.addNode(nodeInfoAndSigned.nodeInfo)
69-
fileWatcher.saveToFile(nodeInfoAndSigned)
70-
71-
if (networkMapClient != null) {
72-
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
73-
}
74-
}
75-
76-
private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) {
77-
executor.submit(object : Runnable {
78-
override fun run() {
79-
try {
80-
networkMapClient.publish(signedNodeInfo)
81-
} catch (t: Throwable) {
82-
logger.warn("Error encountered while publishing node info, will retry in $defaultRetryInterval", t)
83-
// TODO: Exponential backoff?
84-
executor.schedule(this, defaultRetryInterval.toMillis(), TimeUnit.MILLISECONDS)
85-
}
86-
}
87-
})
88-
}
89-
9056
fun subscribeToNetworkMap() {
9157
require(fileWatcherSubscription == null) { "Should not call this method twice." }
9258
// Subscribe to file based networkMap
@@ -114,17 +80,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
11480
networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) }
11581

11682
if (currentParametersHash != networkMap.networkParameterHash) {
117-
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
118-
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
119-
if (acceptedHash == networkMap.networkParameterHash) {
120-
logger.info("Flag day occurred. Network map switched to the new network parameters: ${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
121-
} else {
122-
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
123-
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
124-
"advertising: ${networkMap.networkParameterHash}.\n" +
125-
"Node will shutdown now. Please update node to use correct network parameters file.")
126-
}
127-
System.exit(1)
83+
exitOnParametersMismatch(networkMap)
12884
}
12985

13086
val currentNodeHashes = networkMapCache.allNodeHashes
@@ -151,6 +107,23 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
151107
return cacheTimeout
152108
}
153109

110+
private fun exitOnParametersMismatch(networkMap: NetworkMap) {
111+
val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
112+
val acceptedHash = if (updatesFile.exists()) updatesFile.readObject<SignedNetworkParameters>().raw.hash else null
113+
val exitCode = if (acceptedHash == networkMap.networkParameterHash) {
114+
logger.info("Flag day occurred. Network map switched to the new network parameters: " +
115+
"${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.")
116+
0
117+
} else {
118+
// TODO This needs special handling (node omitted update process or didn't accept new parameters)
119+
logger.error("Node is using parameters with hash: $currentParametersHash but network map is " +
120+
"advertising: ${networkMap.networkParameterHash}.\n" +
121+
"Node will shutdown now. Please update node to use correct network parameters file.")
122+
1
123+
}
124+
exitProcess(exitCode)
125+
}
126+
154127
private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) {
155128
if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) {
156129
// This update was handled already.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package net.corda.node.internal
2+
3+
import com.nhaarman.mockito_kotlin.doReturn
4+
import com.nhaarman.mockito_kotlin.whenever
5+
import net.corda.core.identity.CordaX500Name
6+
import net.corda.core.internal.readObject
7+
import net.corda.core.node.NodeInfo
8+
import net.corda.core.serialization.deserialize
9+
import net.corda.core.utilities.NetworkHostAndPort
10+
import net.corda.node.VersionInfo
11+
import net.corda.node.services.config.NodeConfiguration
12+
import net.corda.nodeapi.internal.SignedNodeInfo
13+
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
14+
import net.corda.nodeapi.internal.persistence.DatabaseConfig
15+
import net.corda.testing.core.SerializationEnvironmentRule
16+
import net.corda.testing.internal.rigorousMock
17+
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
18+
import org.junit.Rule
19+
import org.junit.Test
20+
import org.junit.rules.TemporaryFolder
21+
import java.nio.file.Files
22+
import kotlin.test.assertEquals
23+
import kotlin.test.assertNull
24+
25+
class NodeTest {
26+
private abstract class AbstractNodeConfiguration : NodeConfiguration
27+
28+
@Rule
29+
@JvmField
30+
val temporaryFolder = TemporaryFolder()
31+
@Rule
32+
@JvmField
33+
val testSerialization = SerializationEnvironmentRule()
34+
35+
private fun nodeInfoFile() = temporaryFolder.root.listFiles().singleOrNull { it.name.startsWith(NODE_INFO_FILE_NAME_PREFIX) }
36+
private fun AbstractNode.generateNodeInfo(): NodeInfo {
37+
assertNull(nodeInfoFile())
38+
generateAndSaveNodeInfo()
39+
val path = nodeInfoFile()!!.toPath()
40+
val nodeInfo = path.readObject<SignedNodeInfo>().raw.deserialize()
41+
Files.delete(path)
42+
return nodeInfo
43+
}
44+
45+
@Test
46+
fun `generateAndSaveNodeInfo works`() {
47+
val nodeAddress = NetworkHostAndPort("0.1.2.3", 456)
48+
val nodeName = CordaX500Name("Manx Blockchain Corp", "Douglas", "IM")
49+
val platformVersion = 789
50+
val dataSourceProperties = makeTestDataSourceProperties()
51+
val databaseConfig = DatabaseConfig()
52+
val configuration = rigorousMock<AbstractNodeConfiguration>().also {
53+
doReturn(nodeAddress).whenever(it).p2pAddress
54+
doReturn(nodeName).whenever(it).myLegalName
55+
doReturn(null).whenever(it).notary // Don't add notary identity.
56+
doReturn(dataSourceProperties).whenever(it).dataSourceProperties
57+
doReturn(databaseConfig).whenever(it).database
58+
doReturn(temporaryFolder.root.toPath()).whenever(it).baseDirectory
59+
doReturn(true).whenever(it).devMode // Needed for identity cert.
60+
doReturn("tsp").whenever(it).trustStorePassword
61+
doReturn("ksp").whenever(it).keyStorePassword
62+
}
63+
configureDatabase(dataSourceProperties, databaseConfig, rigorousMock()).use { database ->
64+
val node = Node(configuration, rigorousMock<VersionInfo>().also {
65+
doReturn(platformVersion).whenever(it).platformVersion
66+
}, initialiseSerialization = false)
67+
assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial)
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)