Skip to content

Commit 07e9b7e

Browse files
authored
Merge pull request corda#1722 from corda/aslemmer-flow-multithreading-interfaces
StateMachine interface
2 parents 11db199 + c66a84b commit 07e9b7e

File tree

26 files changed

+941
-718
lines changed

26 files changed

+941
-718
lines changed

.ci/api-current.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,9 +1074,13 @@ public abstract class net.corda.core.flows.FlowLogic extends java.lang.Object
10741074
public <init>()
10751075
@org.jetbrains.annotations.NotNull public abstract net.corda.core.identity.Party getCounterparty()
10761076
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public abstract net.corda.core.flows.FlowInfo getCounterpartyFlowInfo()
1077+
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public abstract net.corda.core.flows.FlowInfo getCounterpartyFlowInfo(boolean)
10771078
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public abstract net.corda.core.utilities.UntrustworthyData receive(Class)
1079+
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public abstract net.corda.core.utilities.UntrustworthyData receive(Class, boolean)
10781080
@co.paralleluniverse.fibers.Suspendable public abstract void send(Object)
1081+
@co.paralleluniverse.fibers.Suspendable public abstract void send(Object, boolean)
10791082
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public abstract net.corda.core.utilities.UntrustworthyData sendAndReceive(Class, Object)
1083+
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public abstract net.corda.core.utilities.UntrustworthyData sendAndReceive(Class, Object, boolean)
10801084
##
10811085
public final class net.corda.core.flows.FlowStackSnapshot extends java.lang.Object
10821086
public <init>(java.time.Instant, String, List)

client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package net.corda.client.rpc
22

33
import net.corda.core.crypto.random63BitValue
44
import net.corda.core.flows.FlowInitiator
5+
import net.corda.core.internal.concurrent.flatMap
56
import net.corda.core.internal.packageName
67
import net.corda.core.messaging.FlowProgressHandle
78
import net.corda.core.messaging.StateMachineUpdate
@@ -143,7 +144,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
143144
}
144145
}
145146
val nodeIdentity = node.info.chooseIdentity()
146-
node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0), nodeIdentity), FlowInitiator.Shell).resultFuture.getOrThrow()
147+
node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0), nodeIdentity), FlowInitiator.Shell).flatMap { it.resultFuture }.getOrThrow()
147148
proxy.startFlow(::CashIssueFlow,
148149
123.DOLLARS,
149150
OpaqueBytes.of(0),

core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ import java.time.Instant
4141
* and request they start their counterpart flow, then make sure it's annotated with [InitiatingFlow]. This annotation
4242
* also has a version property to allow you to version your flow and enables a node to restrict support for the flow to
4343
* that particular version.
44+
*
45+
* Functions that suspend the flow (including all functions on [FlowSession]) accept a [maySkipCheckpoint] parameter
46+
* defaulting to false, false meaning a checkpoint should always be created on suspend. This parameter may be set to
47+
* true which allows the implementation to potentially optimise away the checkpoint, saving a roundtrip to the database.
48+
*
49+
* This option however comes with a big warning sign: Setting the parameter to true requires the flow's code to be
50+
* replayable from the previous checkpoint (or start of flow) up until the next checkpoint (or end of flow) in order to
51+
* prepare for hard failures. As suspending functions always commit the flow's database transaction regardless of this
52+
* parameter the flow must be prepared for scenarios where a previous running of the flow *already committed its
53+
* relevant database transactions*. Only set this option to true if you know what you're doing.
4454
*/
4555
abstract class FlowLogic<out T> {
4656
/** This is where you should log things to. */
@@ -123,7 +133,7 @@ abstract class FlowLogic<out T> {
123133
*/
124134
@Deprecated("Use FlowSession.getFlowInfo()", level = DeprecationLevel.WARNING)
125135
@Suspendable
126-
fun getFlowInfo(otherParty: Party): FlowInfo = stateMachine.getFlowInfo(otherParty, flowUsedForSessions)
136+
fun getFlowInfo(otherParty: Party): FlowInfo = stateMachine.getFlowInfo(otherParty, flowUsedForSessions, maySkipCheckpoint = false)
127137

128138
/**
129139
* Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
@@ -157,7 +167,7 @@ abstract class FlowLogic<out T> {
157167
@Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING)
158168
@Suspendable
159169
open fun <R : Any> sendAndReceive(receiveType: Class<R>, otherParty: Party, payload: Any): UntrustworthyData<R> {
160-
return stateMachine.sendAndReceive(receiveType, otherParty, payload, flowUsedForSessions)
170+
return stateMachine.sendAndReceive(receiveType, otherParty, payload, flowUsedForSessions, retrySend = false, maySkipCheckpoint = false)
161171
}
162172

163173
/**
@@ -171,17 +181,17 @@ abstract class FlowLogic<out T> {
171181
*/
172182
@Deprecated("Use FlowSession.sendAndReceiveWithRetry()", level = DeprecationLevel.WARNING)
173183
internal inline fun <reified R : Any> sendAndReceiveWithRetry(otherParty: Party, payload: Any): UntrustworthyData<R> {
174-
return stateMachine.sendAndReceive(R::class.java, otherParty, payload, flowUsedForSessions, retrySend = true)
184+
return stateMachine.sendAndReceive(R::class.java, otherParty, payload, flowUsedForSessions, retrySend = true, maySkipCheckpoint = false)
175185
}
176186

177187
@Suspendable
178188
internal fun <R : Any> FlowSession.sendAndReceiveWithRetry(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
179-
return stateMachine.sendAndReceive(receiveType, counterparty, payload, flowUsedForSessions, retrySend = true)
189+
return stateMachine.sendAndReceive(receiveType, counterparty, payload, flowUsedForSessions, retrySend = true, maySkipCheckpoint = false)
180190
}
181191

182192
@Suspendable
183193
internal inline fun <reified R : Any> FlowSession.sendAndReceiveWithRetry(payload: Any): UntrustworthyData<R> {
184-
return stateMachine.sendAndReceive(R::class.java, counterparty, payload, flowUsedForSessions, retrySend = true)
194+
return stateMachine.sendAndReceive(R::class.java, counterparty, payload, flowUsedForSessions, retrySend = true, maySkipCheckpoint = false)
185195
}
186196

187197
/**
@@ -206,7 +216,7 @@ abstract class FlowLogic<out T> {
206216
@Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING)
207217
@Suspendable
208218
open fun <R : Any> receive(receiveType: Class<R>, otherParty: Party): UntrustworthyData<R> {
209-
return stateMachine.receive(receiveType, otherParty, flowUsedForSessions)
219+
return stateMachine.receive(receiveType, otherParty, flowUsedForSessions, maySkipCheckpoint = false)
210220
}
211221

212222
/** Suspends until a message has been received for each session in the specified [sessions].
@@ -250,7 +260,9 @@ abstract class FlowLogic<out T> {
250260
*/
251261
@Deprecated("Use FlowSession.send()", level = DeprecationLevel.WARNING)
252262
@Suspendable
253-
open fun send(otherParty: Party, payload: Any) = stateMachine.send(otherParty, payload, flowUsedForSessions)
263+
open fun send(otherParty: Party, payload: Any) {
264+
stateMachine.send(otherParty, payload, flowUsedForSessions, maySkipCheckpoint = false)
265+
}
254266

255267
/**
256268
* Invokes the given subflow. This function returns once the subflow completes successfully with the result
@@ -342,7 +354,10 @@ abstract class FlowLogic<out T> {
342354
* valid by the local node, but that doesn't imply the vault will consider it relevant.
343355
*/
344356
@Suspendable
345-
fun waitForLedgerCommit(hash: SecureHash): SignedTransaction = stateMachine.waitForLedgerCommit(hash, this)
357+
@JvmOverloads
358+
fun waitForLedgerCommit(hash: SecureHash, maySkipCheckpoint: Boolean = false): SignedTransaction {
359+
return stateMachine.waitForLedgerCommit(hash, this, maySkipCheckpoint = maySkipCheckpoint)
360+
}
346361

347362
/**
348363
* Returns a shallow copy of the Quasar stack frames at the time of call to [flowStackSnapshot]. Use this to inspect

core/src/main/kotlin/net/corda/core/flows/FlowSession.kt

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,20 @@ abstract class FlowSession {
5454
* Returns a [FlowInfo] object describing the flow [counterparty] is using. With [FlowInfo.flowVersion] it
5555
* provides the necessary information needed for the evolution of flows and enabling backwards compatibility.
5656
*
57-
* This method can be called before any send or receive has been done with [counterparty]. In such a case this will force
58-
* them to start their flow.
57+
* This method can be called before any send or receive has been done with [counterparty]. In such a case this will
58+
* force them to start their flow.
59+
*
60+
* @param maySkipCheckpoint setting it to true indicates to the platform that it may optimise away the checkpoint.
61+
*/
62+
@Suspendable
63+
abstract fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo
64+
65+
/**
66+
* Returns a [FlowInfo] object describing the flow [counterparty] is using. With [FlowInfo.flowVersion] it
67+
* provides the necessary information needed for the evolution of flows and enabling backwards compatibility.
68+
*
69+
* This method can be called before any send or receive has been done with [counterparty]. In such a case this will
70+
* force them to start their flow.
5971
*/
6072
@Suspendable
6173
abstract fun getCounterpartyFlowInfo(): FlowInfo
@@ -80,8 +92,26 @@ abstract class FlowSession {
8092

8193
/**
8294
* Serializes and queues the given [payload] object for sending to the [counterparty]. Suspends until a response
83-
* is received, which must be of the given [receiveType]. Remember that when receiving data from other parties the data
84-
* should not be trusted until it's been thoroughly verified for consistency and that all expectations are
95+
* is received, which must be of the given [receiveType]. Remember that when receiving data from other parties the
96+
* data should not be trusted until it's been thoroughly verified for consistency and that all expectations are
97+
* satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
98+
*
99+
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
100+
* use this when you expect to do a message swap than do use [send] and then [receive] in turn.
101+
*
102+
* @param maySkipCheckpoint setting it to true indicates to the platform that it may optimise away the checkpoint.
103+
* @return an [UntrustworthyData] wrapper around the received object.
104+
*/
105+
@Suspendable
106+
abstract fun <R : Any> sendAndReceive(
107+
receiveType: Class<R>,
108+
payload: Any, maySkipCheckpoint: Boolean
109+
): UntrustworthyData<R>
110+
111+
/**
112+
* Serializes and queues the given [payload] object for sending to the [counterparty]. Suspends until a response
113+
* is received, which must be of the given [receiveType]. Remember that when receiving data from other parties the
114+
* data should not be trusted until it's been thoroughly verified for consistency and that all expectations are
85115
* satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
86116
*
87117
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
@@ -104,6 +134,19 @@ abstract class FlowSession {
104134
return receive(R::class.java)
105135
}
106136

137+
/**
138+
* Suspends until [counterparty] sends us a message of type [receiveType].
139+
*
140+
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
141+
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
142+
* corrupted data in order to exploit your code.
143+
*
144+
* @param maySkipCheckpoint setting it to true indicates to the platform that it may optimise away the checkpoint.
145+
* @return an [UntrustworthyData] wrapper around the received object.
146+
*/
147+
@Suspendable
148+
abstract fun <R : Any> receive(receiveType: Class<R>, maySkipCheckpoint: Boolean): UntrustworthyData<R>
149+
107150
/**
108151
* Suspends until [counterparty] sends us a message of type [receiveType].
109152
*
@@ -116,6 +159,18 @@ abstract class FlowSession {
116159
@Suspendable
117160
abstract fun <R : Any> receive(receiveType: Class<R>): UntrustworthyData<R>
118161

162+
/**
163+
* Queues the given [payload] for sending to the [counterparty] and continues without suspending.
164+
*
165+
* Note that the other party may receive the message at some arbitrary later point or not at all: if [counterparty]
166+
* is offline then message delivery will be retried until it comes back or until the message is older than the
167+
* network's event horizon time.
168+
*
169+
* @param maySkipCheckpoint setting it to true indicates to the platform that it may optimise away the checkpoint.
170+
*/
171+
@Suspendable
172+
abstract fun send(payload: Any, maySkipCheckpoint: Boolean)
173+
119174
/**
120175
* Queues the given [payload] for sending to the [counterparty] and continues without suspending.
121176
*

core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import java.time.Instant
1515
/** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */
1616
interface FlowStateMachine<R> {
1717
@Suspendable
18-
fun getFlowInfo(otherParty: Party, sessionFlow: FlowLogic<*>): FlowInfo
18+
fun getFlowInfo(otherParty: Party, sessionFlow: FlowLogic<*>, maySkipCheckpoint: Boolean): FlowInfo
1919

2020
@Suspendable
2121
fun initiateFlow(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSession
@@ -25,16 +25,17 @@ interface FlowStateMachine<R> {
2525
otherParty: Party,
2626
payload: Any,
2727
sessionFlow: FlowLogic<*>,
28-
retrySend: Boolean = false): UntrustworthyData<T>
28+
retrySend: Boolean,
29+
maySkipCheckpoint: Boolean): UntrustworthyData<T>
2930

3031
@Suspendable
31-
fun <T : Any> receive(receiveType: Class<T>, otherParty: Party, sessionFlow: FlowLogic<*>): UntrustworthyData<T>
32+
fun <T : Any> receive(receiveType: Class<T>, otherParty: Party, sessionFlow: FlowLogic<*>, maySkipCheckpoint: Boolean): UntrustworthyData<T>
3233

3334
@Suspendable
34-
fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>)
35+
fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>, maySkipCheckpoint: Boolean)
3536

3637
@Suspendable
37-
fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction
38+
fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>, maySkipCheckpoint: Boolean): SignedTransaction
3839

3940
@Suspendable
4041
fun sleepUntil(until: Instant)

docs/source/example-code/src/test/kotlin/net/corda/docs/CustomVaultQueryTest.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,18 @@ class CustomVaultQueryTest {
2727

2828
@Before
2929
fun setup() {
30-
mockNet = MockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.finance.contracts.asset", CashSchemaV1::class.packageName))
30+
mockNet = MockNetwork(
31+
threadPerNode = true,
32+
cordappPackages = listOf(
33+
"net.corda.finance.contracts.asset",
34+
CashSchemaV1::class.packageName,
35+
"net.corda.docs"
36+
)
37+
)
3138
mockNet.createNotaryNode(legalName = DUMMY_NOTARY.name)
3239
nodeA = mockNet.createPartyNode()
3340
nodeB = mockNet.createPartyNode()
3441
nodeA.internals.registerInitiatedFlow(TopupIssuerFlow.TopupIssuer::class.java)
35-
nodeA.installCordaService(CustomVaultQuery.Service::class.java)
3642
notary = nodeA.services.getDefaultNotary()
3743
}
3844

0 commit comments

Comments
 (0)