-
Notifications
You must be signed in to change notification settings - Fork 972
Refactor PersistentStream. #1041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This breaks out a number of changes I made as prep for b/80402781 (Continue retrying streams for 1 minute (idle delay)). PersistentStream changes: * Rather than providing a stream event listener to every call of start(), the stream listener is now provided once to the constructor and cannot be changed. * Streams can now be restarted indefinitely, even after a call to stop(). * PersistentStreamState.Stopped was removed and we just return to 'Initial' after a stop() call. * Added `currentAuthAttempt` member to PersistentStream in order to avoid bleedthrough issues if stop() / start() are called while an auth attempt is in progress (i.e. so we correctly abandon the old auth attempt). * Calling stop() now triggers the onClose() event listener, which simplifies stream cleanup. * PersistentStreamState.Auth renamed to 'Starting' to better reflect that it encompasses both authentication and opening the stream. RemoteStore changes: * Creates streams once and just stop() / start()s them as necessary, never recreating them completely. * Added networkEnabled flag to track whether the network is enabled or not, since we no longer null out the streams. * Refactored disableNetwork() / enableNetwork() to remove stream re-creation. They're now simple enough that I stopped re-using them from shutdown() and handleUserChange(). Misc: * Comment improvements including a state diagram on PersistentStream. * Fixed spec test shutdown to schedule via the AsyncQueue to fix sequencing order I ran into.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This basically looks good. You might be able to simplify this a tiny bit more if you can re-use the logic in dispatchIfStillActive
.
* [any state] --------------------------> INITIAL | ||
* stop() called or | ||
* idle timer expired | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! You raised the bars for comments here quite a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs more shading. Like this:
****:***:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::.::::::::::::::.:.::.::::::::::::::.:::::::::::::::::::::::::
****:::::::::::::::::::::::::::::::::::::::::::::::::::::..:::::::..:....::.::......:.::::...:::::..:..:..:..::.:::::::..:.:::::
***::::::::::::::::::::::::::::::...:::::.:::..:.:..........::::::.::...:.:.:..:.....::.:.........................:::::::...::::
**:::::I***I:::::::::::::::::::::::::::...:.::...:::....:.:.:::::...:.....:............::......:::......:...:::...:::::::.::..::
*:::::::*I:*:::::::::::::::::::::::::::::**IV......I*:::::::::::::.::..::...:::.::::..:.........::::....::.....::.::::::.:::::::
**::::::IF:*:::::::::::::::::::::::I:............:::*:..*...:.:::...::..:.......:............:..:.....:.::.....:....:::..:::..::
***:*::::V.::::::::::::.........I...............:::...::I..I:::::.::..:................................:.:..:...:::....:::::.:::
*::::*:*:*:*:::::::::.......I...............II:I:II:...:.*...:.*..:..::..................................:.:::::::.:::::::::::::
*:::I:::.:.*::::.:::::...::........*:.:...I*.:I:*IIV:.::***I:I:::.::.::::................................:::::::::::::::::::::::
********::*::*::::::...I.........:**:*III:IIIFVFNF:*I..**::.:*.I*:::::...........................::::..:::::::::::::::.:::::::::
***VF*:****:*::::::::V.........**:*:.::NFFFIIFNNNNF..::*I:*.:.::::::.............:..............:::.::::::::::::::::::::::::::::
***IV***I:::*:::::V..........::.::*NF*FNNFFNNVFFFNNNV:::.I.*..:.....I:.**...................:....:::::::::::::::::::::::::::::::
*******I::****::::...........INFFN*FNNNVNNNNVFFNNVFIFV:II*::*:*..**I:*:.::......................::.:::::::::::::::::::::::::::::
****I***********.............INFFNFNNNVNNFNVNNNNVFNNNNN:..::.:*.::..I....*:........................:::::::::::::::::::::::::::::
*************V:...............:FNNVFNFVFNNINNFVFNNVNNFF*...I*.*II..::..:.I.I:............................:.:...:.....:::::::::::
II********VI:.:....................:I::NNFFFVFFNNFNIFVF:*.:*::::.::..*:II:..:::.:.:....................................:.:.:.:.:
FFFFFFVFV:::.:::....::I*......I...:II:VFNVNIFV:NFNFFF:......:.:*:I:..I:**I..:..................................................:
*IVV*::.:.:...::I:::I*:...:..::::V:..FNI.V*INFIFFFFF:.........:I..::*.*.::*:::..:..............................................:
:::::::::*:...:*::.:I::*I.:*.*I::II.:.:.:VFF*FFNNVFF...:...........*I****...:.....................................:..........*..
:::*I***I*V:*FFV*::.***:I.*:.****I:::*::.:.*I*:NNVFI:................:.:..:::::..::.........:......................:.:.....*....
:::::*V**I*:.NFNIFNV:::*IV*.::FNVNVFIN**:I*FVFNFNFV::...:..:.::.::.::..:::.::...:...............:.....:.................:*.....:
::I***::I*:*:IVFVNFVV.:I**:*...VF*:FVFIVVVVF:NFINNFI........::.....::...............................::........:......:........:*
:V****INF*:*V*V*FINNF:**I*::I:*.IIIFNFNFNVVIFNFFNNFV........:.........................::...:.............:.:.:::......I........:
::*:IV::FNVVNNIFFFF:*IV::*VFN*II..*INNFNFFFNVVFVNNNVI::::::.:::::..:::...:....:.:...::.:........:::..:::::::::::.I..I......I*:*:
I:*NVNV:*V*VNFFFFFVV*:FFFIFFFI*:*N**VVNFNNN**NFIINNF.V:::::::::::::.:.:.:::.::::.::.:::::::::::::::::::::::::::*:.*I.......:.VV*
I::::::I:I**VFFI*NIFF*FVNVV:FNFFVIFFFFFVN*......*VFFNFF:::*::::::::::::::::::::::::::::::::::*:::***********IIFFI*......II*::VVN
**::*II:::..*:IINNFFFFFFVFFNFFFNFVVFF*FF:...........N*NVI*I************.************************IIIIIIIIII*FVVFI.........I..IFFF
****I*:I*:*I**I:*IVINFFFFNVFFNFVFVFIIF...............IVNFIII:IIII**IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIVVVIIIIIFV*FV.......**:F:VFVFFF
*I****I:::I*V*:F:I:**IIVIVFN*IFNVVIF........I:.......***FFFVIVIIVVVIVIVVIIIVVVIII:*IIIIIIIVIVVVVVVVVVVVFFVVV....*.:.:VIFFFFFNNN*
***NNNVNF*NFIII*I**I:I::I*I*::*VF*.........***I:.......:..*NFIIVVVVVVVVVVVVVVVV.....IIIVVVVVVVVVVVVIVVII:*...*:.:..VFVFFFFF::::.
:::FNVFVNIVFFFI*II*I**IIFFF*F:..........:**I:*..........::.....VVVVVVVVIVVVVI......:..*VVVVVVVVVFF*::*I........VVFINFFF:.:VF....
:::FFNFNFFFFNIFI*:I*V*::V:.........:::.I:.*:I*.**..:....:*.V:.:.VVVVVVVV**:*IFFN.IFFVI.:VFF*:*::*V...**.I*.VI*IVV:.:.:VVV..:.::
:::::::*FFNFVI:..........:..........I:..:**.I...::*..:::....II*.....IFNNNNNNNN*VVVVVFVVFVVFFVV:.....**VVVVVV*..:....*VV:.:::::::
:::.:::::..:....::......:V::::..II.I*II::::.I:I..*.*:VV.::..IIVF.:VI:.*...*:.:*II***::.......*I.FIF***..:::::.:VFFVII.::::::IVFF
::::*****:......I::.....:*I::::..::::*:......:::..FVVF*F*:*F..IVFV:V**:FF*::.........:............:::..FFFFFFFFFFF..:::::FFVVFFV
::::::I***I*.I.*:::.I..*I*:.*.I.......*.I*:..*::..*:F*:*I:*:V...*FFIFFFV*FFV::.....:..::...*.:..::**:.VVVFVVVVVI....:IFNNNNNNFI*
:::::::::.**::::::.*NV.I:.I*:*:..:V.:::::..::I.**..FNFVNI**FFV:V...*FNNFFIINFF::::.:I*::.:::::VFVVVVI*:*VNNFF*..:::::::::*VVFFFN
::::::*V:I*VF*IFV*NVFFI:**::.VI::::*.::I.:*.:.I:*:..FNFFFFV*IIV:*FVVIV.INFFFIFFNFI::..FFFFI::VNNFFI:*IIVFFV..:::::::VNNNNFVFNV**
::::FVVFF*NNFFNNNNFFV::I::.I*I::.....:*::VF.FF*::*:..:VNFFNFF***III:II:*V*::IFNFFNNFFF*:.IFNFFFFFFFFFFVI......:.:::*I::..:VFFFFF
:::NNVFVFNNFNFNFVV::*V*F.FNVN:I:::*IV:::.IFFNVIVI.:::..NNFFFNVN*II**IIVIVIV***********IIIV:FVIF*VFIV.....:*VFFFFFFFFFNNNFFFFFFFF
::NFVNFVIN*VFN*NNIIFINVFNNINVNN:.::..........FNN:VVF.....:NFFFNNVF*I:::::*IIIIIV***********:**I*...::::**I:IFNNFNNNNNNF*::IIVFNN
::NVFFNVNFNVNNVNNNVNNIFNFVNFNN**:::IVNN.I.FN*IIINIVNNI:V:...VNNFNFNFNV**::::::::::::::*II*....:FNFFNNNFFFNNNNNF:.......:::::::..
.:NINNVFFFNIFNVNFNVNFVNNNVNVNNF:::IN:*V**NI:V::VFIFNNNFV***I:...:VFFFNNFFFNNNNNNNV:.......:::......::::....:::::::::YRREBRAP*NAI
@@ -157,16 +169,18 @@ export abstract class PersistentStream< | |||
private inactivityTimerPromise: CancelablePromise<void> | null = null; | |||
private stream: Stream<SendType, ReceiveType> | null = null; | |||
|
|||
protected backoff: ExponentialBackoff; | |||
/** A sentinel object used to ignore auth callbacks for prior attempts. */ | |||
private currentAuthAttempt: object = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like it should have been a counter, and I think it might make the flow easier to debug if it actually was a counter. What do you think of making it one? (Alternatively: Can we use the current stream instance as suggested below?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this actually points to a larger issue with this class. We now have three different ways of handling the same underlying issue: most callbacks are not tolerant of an unexpected state transition but we have to deal with those because users can request network restarts/stops.
As of this PR, the three different mechanisms I'm aware of are these:
this.stream
which handles late received messages from the serverthis.currentAuthAttempt
, which handles late auth responsesthis.state
, which handles unexpected transitions while waiting for timers (often indirectly viathis.isOpen
)
I think these can all be handled via the same mechanism because fundamentally we have some delayed action that's expecting the stream to have been in some preceding state. If the state changes while we're waiting we should disregard the next callback.
I propose a stateSequenceNumber, which we bump on every state transition. Any scheduled callback needs to capture the stateSequenceNumber that was current at the time and then disregard the callback if the stateSequenceNumber does not match the one that was current at the time of submitting the delayed action.
This handles all known interruptions as far as I can see and would make our callback handling uniform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting idea! I liked it initially and went ahead and implemented it... I'm a bit unsure of the final result...
- There's a bit of a wart in that we intentionally change the state from Starting to Open after we've attached our stream handlers, so we have to allow for that.
- That wart makes me question whether tying callback validity to
state
is really the right thing to do. In a magical ideal world we could just cancel the auth request and/or the underlying stream and rely on no more callbacks coming from it, which I think is what the original implementation implemented. Usingstate
to implicitly cancel callbacks is perhaps a little less direct. - That said, I like the unification and integrating with the AsyncQueue since it makes it harder to mess up.
I'm also not sure how this will look when ported to the other platforms. It'll be more jarring in any case since iOS / Android didn't have the dispatchIfStillActive() mechanism.
WDYT? I'm fine going either way.
(FWIW, only your first two mechanisms are actually issues because our timers are already AsyncQueue-aware and explicitly canceled on state transitions--in particular, we call backoff.cancel() in our close() implementation).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've addressed (1) elsewhere. I think what I was really asking for was a stream generation number, but read on.
Re (2) I think you're right: this is a mechanism for implementing centralized cancellation. We could instead force everything to be cancellation aware. We've partially done this on iOS by way of FSTCallbackFilter
.
dispatchIfStillActive
was always kind of a hack. It works because Javascript is so functional, but it's fundamentally a non-portable pattern. This came about back when we were seriously under the gun for time and I didn't push back on Jonny to stop and think about how to express this portably.
Functionally I think we have a few concerns to address in a centralized way:
- we want to dispatch back onto the async queue
- we need to be able to cancel the response if we don't need it anymore
- cancelation needs to be checked as we're executing on the async queue
This is basically what DelayedOperation
implements, but it does so in a non-reusable way. If we could share that between DelayedOperations and the auth callback we'd be golden on that front.
The stream callbacks could be treated the same way with something like FSTCallbackFilter
here.
|
||
// Create sentinel object used to identify this auth attempt in case it | ||
// is invalidated by a call to stop(). | ||
const authAttempt = (this.currentAuthAttempt = new Object()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use the same logic that dispatchIfStillActive
uses and compare the current stream object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stream can't be created until we have the auth token, and so this.stream
is going to be null
during the auth request.
@@ -43,21 +43,38 @@ interface ListenRequest extends api.ListenRequest { | |||
export interface WriteRequest extends api.WriteRequest { | |||
database?: string; | |||
} | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General comment: The hardest client for the idle timeout change was iOS. I sporadically got stream callbacks from GRPC even after I closed the stream. With that said, I would suggest that you port this to iOS before you do the Android port. It might save you one backport if there are any other changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM.
const networkEnabled = this.networkEnabled; | ||
this.networkEnabled = false; | ||
this.writeStream.stop(); | ||
this.watchStream.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need to restart the stream if we did not receive an error? It seems like that might be enough to decide whether we should restart or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So that is a potential option, though I would be nervous about relying on that... And right now I actually have an assert that explicitly makes sure that shouldStartWatchStream() returns false if we did not receive an error (see my discussion with Gil about that), so if I remove the "this.networkEnabled = false;" line, we hit that assert.
private canUseNetwork(): boolean { | ||
// TODO(mikelehen): This could take into account isPrimary when we merge | ||
// with multitab. | ||
return this.networkEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! :)
await this.remoteStore.shutdown(); | ||
await this.persistence.shutdown(/* deleteData= */ true); | ||
await this.destroyPersistence(); | ||
await this.queue.enqueue(async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope this is going to solve all the shutdown troubles I see in the spec tests when I hit assertions. Fingers crossed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope so, but I'm not sure it will. Yesterday I fell into a quagmire of confusing errors that I think were due to us inconsistently queuing spec step stuff on the asyncQueue. :-/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm generally in favor of where this is going.
// closed, but just in case we make this a no-op. | ||
return; | ||
} | ||
assert( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this assertion is valid to make.
If the user stops the network as the timer fires it's possible for both the disableNetwork and the resume-from-backoff block to end up enqueued on the async queue. Therefore performBackoff has to tolerate the possibility that the state has changed underneath it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be safe because we call this.backoff.cancel()
in close() and close() is the only way to get out of the Backoff state before the timer fires. (and timers guarantee that if you cancel them while on the AsyncQueue then the callback will not be run.)
So even though I've added your stateSequenceNumber suggestion, I haven't used it here because the code is designed to be safe without it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I see what you mean re inherent safety.
On the other hand, I'd like to get us out of the business of having to make that kind of argument when reasoning about this code. I would prefer to just handle all the callbacks in a uniform manner so that we don't have to think about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm leaving this as-is for now. I'd rather not use redundant mechanisms that accomplish the same thing and I think backoff.cancel() is the clearer mechanism. And now that callback cancellation (via closeCount) is no longer tied to this.state but instead just incremented in close(), we deal with that and cancelling the backoff (and idle timers) in the same place, so it should hopefully be more apparent that it's safe. I think this is a reasonable place to end up in until or unless we add generic cancellation support to the AsyncQueue.
If you're not a a fan, let me know.
@@ -157,16 +169,18 @@ export abstract class PersistentStream< | |||
private inactivityTimerPromise: CancelablePromise<void> | null = null; | |||
private stream: Stream<SendType, ReceiveType> | null = null; | |||
|
|||
protected backoff: ExponentialBackoff; | |||
/** A sentinel object used to ignore auth callbacks for prior attempts. */ | |||
private currentAuthAttempt: object = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this actually points to a larger issue with this class. We now have three different ways of handling the same underlying issue: most callbacks are not tolerant of an unexpected state transition but we have to deal with those because users can request network restarts/stops.
As of this PR, the three different mechanisms I'm aware of are these:
this.stream
which handles late received messages from the serverthis.currentAuthAttempt
, which handles late auth responsesthis.state
, which handles unexpected transitions while waiting for timers (often indirectly viathis.isOpen
)
I think these can all be handled via the same mechanism because fundamentally we have some delayed action that's expecting the stream to have been in some preceding state. If the state changes while we're waiting we should disregard the next callback.
I propose a stateSequenceNumber, which we bump on every state transition. Any scheduled callback needs to capture the stateSequenceNumber that was current at the time and then disregard the callback if the stateSequenceNumber does not match the one that was current at the time of submitting the delayed action.
This handles all known interruptions as far as I can see and would make our callback handling uniform.
objUtils.forEachNumber(this.listenTargets, (targetId, queryData) => { | ||
this.sendWatchRequest(queryData); | ||
}); | ||
} | ||
|
||
private async onWatchStreamClose(error?: FirestoreError): Promise<void> { | ||
assert( | ||
this.isNetworkEnabled(), | ||
'onWatchStreamClose() should only be called when the network is enabled' | ||
error !== undefined || !this.shouldStartWatchStream(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of this assertion here because the condition is hard to reason about. It's better to make the simple assertion that error !== undefined
inside the if block for this.shouldStartWatchStream()
.
However, I'm not sure this is a good use of our time. In the caller we're mostly channeling gRPC's response. If it happens to close a stream without supplying an error do we really care? Is it really worth crashing our customers' apps over this?
I'm not arguing against assertions generally: the assertions in the remote store have saved our bacon repeatedly, but the good ones catch our bugs where we failed to anticipate a transition that was actually happening in the wild.
By contrast here it seems like this case of error being null is possible but harmless. If this assertion were to fire in the wild because gRPC was giving us a null error, would we do anything but remove this assertion?
Feel free to push back (especially if I'm not considering all the angles).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention was to guard against calling stop() or the stream idle timeout firing while the stream is actually still needed, but I agree that intention wasn't very clear. I tweaked the code / assert to hopefully make it clearer.
I've verified that any actual error callback on the stream will have a defined error (without needing to make any assumptions about grpc or webchannel internals), so I'm inclined to keep the assert. But I could be persuaded otherwise.
return ( | ||
this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES | ||
); | ||
return this.writePipeline.length < MAX_PENDING_WRITES; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fillWritePipeline()
above is called by the syncEngine after a local write. Why would we want batches to be in the pipeline while the network is disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH it just makes more sense to me. I don't see what we gain by throwing away the write pipeline when the network is disabled and then re-filling it (with the exact same writes) once it's enabled again. We may as well just always be filling the write pipeline and then it will just consistently contain the next 10 writes to send to the write stream.
This is similar to how listenTargets
persists across the network being disabled / re-enabled.
The only time we actually want to clear the write pipeline is on a user change, and so I reworked the handleUserChange code to do that explicitly.
Does this explanation sway you? We may just have different conceptual models of what the write pipeline represents... I'm willing to be convinced to reconsider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure that the multi-tab code continues to throw away writes that were executed by other clients. This is simplified if we throw away the in-memory state and then re-fill the write pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm... I guess that's a fair point. With multi-tab we'll have 2 places (handleUserChange and applyPrimaryState) where we really do need to re-sync with the LocalStore.
Okay... I've reverted the parts of my PR related to this (we now clear the write pipeline whenever the network is disabled, disallow adding while it's disabled, and re-fetch the lastStreamToken whenever the network is re-enabled). And so I also brought back the disableNetworkInternal() / enableNetwork() methods from before as well and use them in start, shutdown, enable/disableNetwork, and handleUserChange again. And now you can use them in applyPrimaryStateChange when multi-tab merges.
Sorry for the churn and un-churn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL
@@ -43,21 +43,38 @@ interface ListenRequest extends api.ListenRequest { | |||
export interface WriteRequest extends api.WriteRequest { | |||
database?: string; | |||
} | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM.
@@ -157,16 +169,18 @@ export abstract class PersistentStream< | |||
private inactivityTimerPromise: CancelablePromise<void> | null = null; | |||
private stream: Stream<SendType, ReceiveType> | null = null; | |||
|
|||
protected backoff: ExponentialBackoff; | |||
/** A sentinel object used to ignore auth callbacks for prior attempts. */ | |||
private currentAuthAttempt: object = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting idea! I liked it initially and went ahead and implemented it... I'm a bit unsure of the final result...
- There's a bit of a wart in that we intentionally change the state from Starting to Open after we've attached our stream handlers, so we have to allow for that.
- That wart makes me question whether tying callback validity to
state
is really the right thing to do. In a magical ideal world we could just cancel the auth request and/or the underlying stream and rely on no more callbacks coming from it, which I think is what the original implementation implemented. Usingstate
to implicitly cancel callbacks is perhaps a little less direct. - That said, I like the unification and integrating with the AsyncQueue since it makes it harder to mess up.
I'm also not sure how this will look when ported to the other platforms. It'll be more jarring in any case since iOS / Android didn't have the dispatchIfStillActive() mechanism.
WDYT? I'm fine going either way.
(FWIW, only your first two mechanisms are actually issues because our timers are already AsyncQueue-aware and explicitly canceled on state transitions--in particular, we call backoff.cancel() in our close() implementation).
|
||
// Create sentinel object used to identify this auth attempt in case it | ||
// is invalidated by a call to stop(). | ||
const authAttempt = (this.currentAuthAttempt = new Object()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stream can't be created until we have the auth token, and so this.stream
is going to be null
during the auth request.
// closed, but just in case we make this a no-op. | ||
return; | ||
} | ||
assert( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be safe because we call this.backoff.cancel()
in close() and close() is the only way to get out of the Backoff state before the timer fires. (and timers guarantee that if you cancel them while on the AsyncQueue then the callback will not be run.)
So even though I've added your stateSequenceNumber suggestion, I haven't used it here because the code is designed to be safe without it.
objUtils.forEachNumber(this.listenTargets, (targetId, queryData) => { | ||
this.sendWatchRequest(queryData); | ||
}); | ||
} | ||
|
||
private async onWatchStreamClose(error?: FirestoreError): Promise<void> { | ||
assert( | ||
this.isNetworkEnabled(), | ||
'onWatchStreamClose() should only be called when the network is enabled' | ||
error !== undefined || !this.shouldStartWatchStream(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention was to guard against calling stop() or the stream idle timeout firing while the stream is actually still needed, but I agree that intention wasn't very clear. I tweaked the code / assert to hopefully make it clearer.
I've verified that any actual error callback on the stream will have a defined error (without needing to make any assumptions about grpc or webchannel internals), so I'm inclined to keep the assert. But I could be persuaded otherwise.
return ( | ||
this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES | ||
); | ||
return this.writePipeline.length < MAX_PENDING_WRITES; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH it just makes more sense to me. I don't see what we gain by throwing away the write pipeline when the network is disabled and then re-filling it (with the exact same writes) once it's enabled again. We may as well just always be filling the write pipeline and then it will just consistently contain the next 10 writes to send to the write stream.
This is similar to how listenTargets
persists across the network being disabled / re-enabled.
The only time we actually want to clear the write pipeline is on a user change, and so I reworked the handleUserChange code to do that explicitly.
Does this explanation sway you? We may just have different conceptual models of what the write pipeline represents... I'm willing to be convinced to reconsider.
await this.remoteStore.shutdown(); | ||
await this.persistence.shutdown(/* deleteData= */ true); | ||
await this.destroyPersistence(); | ||
await this.queue.enqueue(async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope so, but I'm not sure it will. Yesterday I fell into a quagmire of confusing errors that I think were due to us inconsistently queuing spec step stuff on the asyncQueue. :-/
const networkEnabled = this.networkEnabled; | ||
this.networkEnabled = false; | ||
this.writeStream.stop(); | ||
this.watchStream.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So that is a potential option, though I would be nervous about relying on that... And right now I actually have an assert that explicitly makes sure that shouldStartWatchStream() returns false if we did not receive an error (see my discussion with Gil about that), so if I remove the "this.networkEnabled = false;" line, we hit that assert.
'Expected stream to be in state Starting, but was ' + this.state | ||
); | ||
this.updateState(PersistentStreamState.Open); | ||
// Need to recreate our dispatcher since we changed the state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the wart I mentioned...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I we could rename the "state sequence number" to be a stream generation number to make this clearer. As long as we're progressing down the happy path of Initial -> Starting -> Open
state transitions don't matter as far as versioning goes. It's any transition off that happy path that should cause us to disregard a prior callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, yeah. That is better. I implemented this but called it closeCount (used with the now renamed closeGuardedDispatcher
) because I think that's more precise (we increment it in close()) and understandable (what's a "stream generation"?)... but if you don't like the naming, let me know.
* waiting is complete, the stream will try to open. While in this | ||
* state isStarted() will return YES but isOpen will return false. | ||
* re-starting. After waiting is complete, the stream will try to open. | ||
* While in this state isStarted() will return true but isOpen will return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: While in this state, isStarted() will return true but isOpen() will return..
I had to do a double take as I was reading this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are actually a bunch of these. I started to fix them but decided it added too much noise to the PR. I'll send a subsequent PR.
|
||
const dispatchIfStateUnchanged = this.stateGuardedDispatcher(); | ||
|
||
// TODO(mikelehen): Just use dispatchIfStillActive, but see TODO below. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method name is outdated in this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* This allows us to turn auth / stream callbacks into no-ops if the stream | ||
* is closed / re-opened, etc. | ||
*/ | ||
private stateGuardedDispatcher(): (fn: () => Promise<void>) => void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be cleaner if it:
- was called something along the lines of
dispatchIfStateUnchanged
- you passed in the state sequence number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented the second suggestion but am pushing back on the first.
The result:
const dispatchIfNotClosed =
this.closeGuardedDispatcher(this.closeCount);
I tried renaming but I don't like that the method and its return value have the same name:
const dispatchIfNotClosed =
this.dispatchIfNotClosed(this.closeCount);
And it really is a function that creates/returns a dispatcher. So I think the current name is pretty accurate albeit a bit awkward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closeGuardedDispatcher
still seems a little off to me. The name is very tied to its implementation, but at first glance it doesn't seem to convey what it is used for (it might also just be the choice of words).
I don't have a good suggestion for you, but even something like getCloseGuardedDispatcher()
might clear up some of my confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getCloseGuardedDispatcher() SGTM
return ( | ||
this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES | ||
); | ||
return this.writePipeline.length < MAX_PENDING_WRITES; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure that the multi-tab code continues to throw away writes that were executed by other clients. This is simplified if we throw away the in-memory state and then re-fill the write pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feedback addressed-ish. PTAL
* waiting is complete, the stream will try to open. While in this | ||
* state isStarted() will return YES but isOpen will return false. | ||
* re-starting. After waiting is complete, the stream will try to open. | ||
* While in this state isStarted() will return true but isOpen will return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are actually a bunch of these. I started to fix them but decided it added too much noise to the PR. I'll send a subsequent PR.
|
||
const dispatchIfStateUnchanged = this.stateGuardedDispatcher(); | ||
|
||
// TODO(mikelehen): Just use dispatchIfStillActive, but see TODO below. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// closed, but just in case we make this a no-op. | ||
return; | ||
} | ||
assert( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm leaving this as-is for now. I'd rather not use redundant mechanisms that accomplish the same thing and I think backoff.cancel() is the clearer mechanism. And now that callback cancellation (via closeCount) is no longer tied to this.state but instead just incremented in close(), we deal with that and cancelling the backoff (and idle timers) in the same place, so it should hopefully be more apparent that it's safe. I think this is a reasonable place to end up in until or unless we add generic cancellation support to the AsyncQueue.
If you're not a a fan, let me know.
* This allows us to turn auth / stream callbacks into no-ops if the stream | ||
* is closed / re-opened, etc. | ||
*/ | ||
private stateGuardedDispatcher(): (fn: () => Promise<void>) => void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented the second suggestion but am pushing back on the first.
The result:
const dispatchIfNotClosed =
this.closeGuardedDispatcher(this.closeCount);
I tried renaming but I don't like that the method and its return value have the same name:
const dispatchIfNotClosed =
this.dispatchIfNotClosed(this.closeCount);
And it really is a function that creates/returns a dispatcher. So I think the current name is pretty accurate albeit a bit awkward.
return ( | ||
this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES | ||
); | ||
return this.writePipeline.length < MAX_PENDING_WRITES; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm... I guess that's a fair point. With multi-tab we'll have 2 places (handleUserChange and applyPrimaryState) where we really do need to re-sync with the LocalStore.
Okay... I've reverted the parts of my PR related to this (we now clear the write pipeline whenever the network is disabled, disallow adding while it's disabled, and re-fetch the lastStreamToken whenever the network is re-enabled). And so I also brought back the disableNetworkInternal() / enableNetwork() methods from before as well and use them in start, shutdown, enable/disableNetwork, and handleUserChange again. And now you can use them in applyPrimaryStateChange when multi-tab merges.
Sorry for the churn and un-churn.
'Expected stream to be in state Starting, but was ' + this.state | ||
); | ||
this.updateState(PersistentStreamState.Open); | ||
// Need to recreate our dispatcher since we changed the state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, yeah. That is better. I implemented this but called it closeCount (used with the now renamed closeGuardedDispatcher
) because I think that's more precise (we increment it in close()) and understandable (what's a "stream generation"?)... but if you don't like the naming, let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, minus a request to spend a minute or so on the name of closeGuardedDispatcher
.
} | ||
async start(): Promise<void> { | ||
// Load any saved stream token from persistent storage | ||
this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are doing this twice now. Once instart()
and then right below in enableNetwork()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, thanks! I meant to remove this one.
if (this.writePipeline.length > 0) { | ||
log.debug( | ||
LOG_TAG, | ||
'Stopping write stream with ' + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'm guesstimating that this might fit on one line if you change it to use backticks:
`Stopping write stream with ${this.writePipeline.length} pending writes`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope! Nice try though.
* This allows us to turn auth / stream callbacks into no-ops if the stream | ||
* is closed / re-opened, etc. | ||
*/ | ||
private stateGuardedDispatcher(): (fn: () => Promise<void>) => void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closeGuardedDispatcher
still seems a little off to me. The name is very tied to its implementation, but at first glance it doesn't seem to convey what it is used for (it might also just be the choice of words).
I don't have a good suggestion for you, but even something like getCloseGuardedDispatcher()
might clear up some of my confusion.
[FYI- I'll probably have @wilhuff take a look at this too since he had opinions about the stream listener stuff, but in the interest of saving him time, can you do the first pass?]
This breaks out a number of changes I made as prep for b/80402781 (Continue
retrying streams for 1 minute (idle delay)).
PersistentStream changes:
the stream listener is now provided once to the constructor and cannot
be changed.
'Initial' after a stop() call.
currentAuthAttempt
member to PersistentStream in order to avoidbleedthrough issues if stop() / start() are called while an auth attempt
is in progress (i.e. so we correctly abandon the old auth attempt).
simplifies stream cleanup.
it encompasses both authentication and opening the stream.
RemoteStore changes:
never recreating them completely.
enabled or not, since we no longer null out the streams.
re-creation. They're now simple enough that I stopped re-using
them from shutdown() and handleUserChange().
Misc:
sequencing order I ran into.