Skip to content

Conversation

mikelehen
Copy link
Contributor

[FYI- I'll probably have @wilhuff take a look at this too since he had opinions about the stream listener stuff, but in the interest of saving him time, can you do the first pass?]

This breaks out a number of changes I made as prep for b/80402781 (Continue
retrying streams for 1 minute (idle delay)).

PersistentStream changes:

  • Rather than providing a stream event listener to every call of start(),
    the stream listener is now provided once to the constructor and cannot
    be changed.
  • Streams can now be restarted indefinitely, even after a call to stop().
    • PersistentStreamState.Stopped was removed and we just return to
      'Initial' after a stop() call.
    • Added currentAuthAttempt member to PersistentStream in order to avoid
      bleedthrough issues if stop() / start() are called while an auth attempt
      is in progress (i.e. so we correctly abandon the old auth attempt).
    • Calling stop() now triggers the onClose() event listener, which
      simplifies stream cleanup.
  • PersistentStreamState.Auth renamed to 'Starting' to better reflect that
    it encompasses both authentication and opening the stream.

RemoteStore changes:

  • Creates streams once and just stop() / start()s them as necessary,
    never recreating them completely.
    • Added networkEnabled flag to track whether the network is
      enabled or not, since we no longer null out the streams.
    • Refactored disableNetwork() / enableNetwork() to remove stream
      re-creation. They're now simple enough that I stopped re-using
      them from shutdown() and handleUserChange().

Misc:

  • Comment improvements including a state diagram on PersistentStream.
  • Fixed spec test shutdown to schedule via the AsyncQueue to fix
    sequencing order I ran into.

This breaks out a number of changes I made as prep for b/80402781 (Continue
retrying streams for 1 minute (idle delay)).

PersistentStream changes:
* Rather than providing a stream event listener to every call of start(),
  the stream listener is now provided once to the constructor and cannot
  be changed.
* Streams can now be restarted indefinitely, even after a call to stop().
  * PersistentStreamState.Stopped was removed and we just return to
    'Initial' after a stop() call.
  * Added `currentAuthAttempt` member to PersistentStream in order to avoid
    bleedthrough issues if stop() / start() are called while an auth attempt
    is in progress (i.e. so we correctly abandon the old auth attempt).
  * Calling stop() now triggers the onClose() event listener, which
    simplifies stream cleanup.
* PersistentStreamState.Auth renamed to 'Starting' to better reflect that
  it encompasses both authentication and opening the stream.

RemoteStore changes:
* Creates streams once and just stop() / start()s them as necessary,
  never recreating them completely.
  * Added networkEnabled flag to track whether the network is
    enabled or not, since we no longer null out the streams.
  * Refactored disableNetwork() / enableNetwork() to remove stream
    re-creation. They're now simple enough that I stopped re-using
    them from shutdown() and handleUserChange().

Misc:
* Comment improvements including a state diagram on PersistentStream.
* Fixed spec test shutdown to schedule via the AsyncQueue to fix
  sequencing order I ran into.
Copy link
Contributor

@schmidt-sebastian schmidt-sebastian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This basically looks good. You might be able to simplify this a tiny bit more if you can re-use the logic in dispatchIfStillActive.

* [any state] --------------------------> INITIAL
* stop() called or
* idle timer expired
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! You raised the bars for comments here quite a bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs more shading. Like this:

****:***:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::.::::::::::::::.:.::.::::::::::::::.:::::::::::::::::::::::::
****:::::::::::::::::::::::::::::::::::::::::::::::::::::..:::::::..:....::.::......:.::::...:::::..:..:..:..::.:::::::..:.:::::
***::::::::::::::::::::::::::::::...:::::.:::..:.:..........::::::.::...:.:.:..:.....::.:.........................:::::::...::::
**:::::I***I:::::::::::::::::::::::::::...:.::...:::....:.:.:::::...:.....:............::......:::......:...:::...:::::::.::..::
*:::::::*I:*:::::::::::::::::::::::::::::**IV......I*:::::::::::::.::..::...:::.::::..:.........::::....::.....::.::::::.:::::::
**::::::IF:*:::::::::::::::::::::::I:............:::*:..*...:.:::...::..:.......:............:..:.....:.::.....:....:::..:::..::
***:*::::V.::::::::::::.........I...............:::...::I..I:::::.::..:................................:.:..:...:::....:::::.:::
*::::*:*:*:*:::::::::.......I...............II:I:II:...:.*...:.*..:..::..................................:.:::::::.:::::::::::::
*:::I:::.:.*::::.:::::...::........*:.:...I*.:I:*IIV:.::***I:I:::.::.::::................................:::::::::::::::::::::::
********::*::*::::::...I.........:**:*III:IIIFVFNF:*I..**::.:*.I*:::::...........................::::..:::::::::::::::.:::::::::
***VF*:****:*::::::::V.........**:*:.::NFFFIIFNNNNF..::*I:*.:.::::::.............:..............:::.::::::::::::::::::::::::::::
***IV***I:::*:::::V..........::.::*NF*FNNFFNNVFFFNNNV:::.I.*..:.....I:.**...................:....:::::::::::::::::::::::::::::::
*******I::****::::...........INFFN*FNNNVNNNNVFFNNVFIFV:II*::*:*..**I:*:.::......................::.:::::::::::::::::::::::::::::
****I***********.............INFFNFNNNVNNFNVNNNNVFNNNNN:..::.:*.::..I....*:........................:::::::::::::::::::::::::::::
*************V:...............:FNNVFNFVFNNINNFVFNNVNNFF*...I*.*II..::..:.I.I:............................:.:...:.....:::::::::::
II********VI:.:....................:I::NNFFFVFFNNFNIFVF:*.:*::::.::..*:II:..:::.:.:....................................:.:.:.:.:
FFFFFFVFV:::.:::....::I*......I...:II:VFNVNIFV:NFNFFF:......:.:*:I:..I:**I..:..................................................:
*IVV*::.:.:...::I:::I*:...:..::::V:..FNI.V*INFIFFFFF:.........:I..::*.*.::*:::..:..............................................:
:::::::::*:...:*::.:I::*I.:*.*I::II.:.:.:VFF*FFNNVFF...:...........*I****...:.....................................:..........*..
:::*I***I*V:*FFV*::.***:I.*:.****I:::*::.:.*I*:NNVFI:................:.:..:::::..::.........:......................:.:.....*....
:::::*V**I*:.NFNIFNV:::*IV*.::FNVNVFIN**:I*FVFNFNFV::...:..:.::.::.::..:::.::...:...............:.....:.................:*.....:
::I***::I*:*:IVFVNFVV.:I**:*...VF*:FVFIVVVVF:NFINNFI........::.....::...............................::........:......:........:*
:V****INF*:*V*V*FINNF:**I*::I:*.IIIFNFNFNVVIFNFFNNFV........:.........................::...:.............:.:.:::......I........:
::*:IV::FNVVNNIFFFF:*IV::*VFN*II..*INNFNFFFNVVFVNNNVI::::::.:::::..:::...:....:.:...::.:........:::..:::::::::::.I..I......I*:*:
I:*NVNV:*V*VNFFFFFVV*:FFFIFFFI*:*N**VVNFNNN**NFIINNF.V:::::::::::::.:.:.:::.::::.::.:::::::::::::::::::::::::::*:.*I.......:.VV*
I::::::I:I**VFFI*NIFF*FVNVV:FNFFVIFFFFFVN*......*VFFNFF:::*::::::::::::::::::::::::::::::::::*:::***********IIFFI*......II*::VVN
**::*II:::..*:IINNFFFFFFVFFNFFFNFVVFF*FF:...........N*NVI*I************.************************IIIIIIIIII*FVVFI.........I..IFFF
****I*:I*:*I**I:*IVINFFFFNVFFNFVFVFIIF...............IVNFIII:IIII**IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIVVVIIIIIFV*FV.......**:F:VFVFFF
*I****I:::I*V*:F:I:**IIVIVFN*IFNVVIF........I:.......***FFFVIVIIVVVIVIVVIIIVVVIII:*IIIIIIIVIVVVVVVVVVVVFFVVV....*.:.:VIFFFFFNNN*
***NNNVNF*NFIII*I**I:I::I*I*::*VF*.........***I:.......:..*NFIIVVVVVVVVVVVVVVVV.....IIIVVVVVVVVVVVVIVVII:*...*:.:..VFVFFFFF::::.
:::FNVFVNIVFFFI*II*I**IIFFF*F:..........:**I:*..........::.....VVVVVVVVIVVVVI......:..*VVVVVVVVVFF*::*I........VVFINFFF:.:VF....
:::FFNFNFFFFNIFI*:I*V*::V:.........:::.I:.*:I*.**..:....:*.V:.:.VVVVVVVV**:*IFFN.IFFVI.:VFF*:*::*V...**.I*.VI*IVV:.:.:VVV..:.::
:::::::*FFNFVI:..........:..........I:..:**.I...::*..:::....II*.....IFNNNNNNNN*VVVVVFVVFVVFFVV:.....**VVVVVV*..:....*VV:.:::::::
:::.:::::..:....::......:V::::..II.I*II::::.I:I..*.*:VV.::..IIVF.:VI:.*...*:.:*II***::.......*I.FIF***..:::::.:VFFVII.::::::IVFF
::::*****:......I::.....:*I::::..::::*:......:::..FVVF*F*:*F..IVFV:V**:FF*::.........:............:::..FFFFFFFFFFF..:::::FFVVFFV
::::::I***I*.I.*:::.I..*I*:.*.I.......*.I*:..*::..*:F*:*I:*:V...*FFIFFFV*FFV::.....:..::...*.:..::**:.VVVFVVVVVI....:IFNNNNNNFI*
:::::::::.**::::::.*NV.I:.I*:*:..:V.:::::..::I.**..FNFVNI**FFV:V...*FNNFFIINFF::::.:I*::.:::::VFVVVVI*:*VNNFF*..:::::::::*VVFFFN
::::::*V:I*VF*IFV*NVFFI:**::.VI::::*.::I.:*.:.I:*:..FNFFFFV*IIV:*FVVIV.INFFFIFFNFI::..FFFFI::VNNFFI:*IIVFFV..:::::::VNNNNFVFNV**
::::FVVFF*NNFFNNNNFFV::I::.I*I::.....:*::VF.FF*::*:..:VNFFNFF***III:II:*V*::IFNFFNNFFF*:.IFNFFFFFFFFFFVI......:.:::*I::..:VFFFFF
:::NNVFVFNNFNFNFVV::*V*F.FNVN:I:::*IV:::.IFFNVIVI.:::..NNFFFNVN*II**IIVIVIV***********IIIV:FVIF*VFIV.....:*VFFFFFFFFFNNNFFFFFFFF
::NFVNFVIN*VFN*NNIIFINVFNNINVNN:.::..........FNN:VVF.....:NFFFNNVF*I:::::*IIIIIV***********:**I*...::::**I:IFNNFNNNNNNF*::IIVFNN
::NVFFNVNFNVNNVNNNVNNIFNFVNFNN**:::IVNN.I.FN*IIINIVNNI:V:...VNNFNFNFNV**::::::::::::::*II*....:FNFFNNNFFFNNNNNF:.......:::::::..
.:NINNVFFFNIFNVNFNVNFVNNNVNVNNF:::IN:*V**NI:V::VFIFNNNFV***I:...:VFFFNNFFFNNNNNNNV:.......:::......::::....:::::::::YRREBRAP*NAI

@@ -157,16 +169,18 @@ export abstract class PersistentStream<
private inactivityTimerPromise: CancelablePromise<void> | null = null;
private stream: Stream<SendType, ReceiveType> | null = null;

protected backoff: ExponentialBackoff;
/** A sentinel object used to ignore auth callbacks for prior attempts. */
private currentAuthAttempt: object = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like it should have been a counter, and I think it might make the flow easier to debug if it actually was a counter. What do you think of making it one? (Alternatively: Can we use the current stream instance as suggested below?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this actually points to a larger issue with this class. We now have three different ways of handling the same underlying issue: most callbacks are not tolerant of an unexpected state transition but we have to deal with those because users can request network restarts/stops.

As of this PR, the three different mechanisms I'm aware of are these:

  • this.stream which handles late received messages from the server
  • this.currentAuthAttempt, which handles late auth responses
  • this.state, which handles unexpected transitions while waiting for timers (often indirectly via this.isOpen)

I think these can all be handled via the same mechanism because fundamentally we have some delayed action that's expecting the stream to have been in some preceding state. If the state changes while we're waiting we should disregard the next callback.

I propose a stateSequenceNumber, which we bump on every state transition. Any scheduled callback needs to capture the stateSequenceNumber that was current at the time and then disregard the callback if the stateSequenceNumber does not match the one that was current at the time of submitting the delayed action.

This handles all known interruptions as far as I can see and would make our callback handling uniform.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea! I liked it initially and went ahead and implemented it... I'm a bit unsure of the final result...

  1. There's a bit of a wart in that we intentionally change the state from Starting to Open after we've attached our stream handlers, so we have to allow for that.
  2. That wart makes me question whether tying callback validity to state is really the right thing to do. In a magical ideal world we could just cancel the auth request and/or the underlying stream and rely on no more callbacks coming from it, which I think is what the original implementation implemented. Using state to implicitly cancel callbacks is perhaps a little less direct.
  3. That said, I like the unification and integrating with the AsyncQueue since it makes it harder to mess up.

I'm also not sure how this will look when ported to the other platforms. It'll be more jarring in any case since iOS / Android didn't have the dispatchIfStillActive() mechanism.

WDYT? I'm fine going either way.

(FWIW, only your first two mechanisms are actually issues because our timers are already AsyncQueue-aware and explicitly canceled on state transitions--in particular, we call backoff.cancel() in our close() implementation).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've addressed (1) elsewhere. I think what I was really asking for was a stream generation number, but read on.

Re (2) I think you're right: this is a mechanism for implementing centralized cancellation. We could instead force everything to be cancellation aware. We've partially done this on iOS by way of FSTCallbackFilter.

dispatchIfStillActive was always kind of a hack. It works because Javascript is so functional, but it's fundamentally a non-portable pattern. This came about back when we were seriously under the gun for time and I didn't push back on Jonny to stop and think about how to express this portably.

Functionally I think we have a few concerns to address in a centralized way:

  • we want to dispatch back onto the async queue
  • we need to be able to cancel the response if we don't need it anymore
  • cancelation needs to be checked as we're executing on the async queue

This is basically what DelayedOperation implements, but it does so in a non-reusable way. If we could share that between DelayedOperations and the auth callback we'd be golden on that front.

The stream callbacks could be treated the same way with something like FSTCallbackFilter here.


// Create sentinel object used to identify this auth attempt in case it
// is invalidated by a call to stop().
const authAttempt = (this.currentAuthAttempt = new Object());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use the same logic that dispatchIfStillActive uses and compare the current stream object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream can't be created until we have the auth token, and so this.stream is going to be null during the auth request.

@@ -43,21 +43,38 @@ interface ListenRequest extends api.ListenRequest {
export interface WriteRequest extends api.WriteRequest {
database?: string;
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment: The hardest client for the idle timeout change was iOS. I sporadically got stream callbacks from GRPC even after I closed the stream. With that said, I would suggest that you port this to iOS before you do the Android port. It might save you one backport if there are any other changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

const networkEnabled = this.networkEnabled;
this.networkEnabled = false;
this.writeStream.stop();
this.watchStream.stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to restart the stream if we did not receive an error? It seems like that might be enough to decide whether we should restart or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that is a potential option, though I would be nervous about relying on that... And right now I actually have an assert that explicitly makes sure that shouldStartWatchStream() returns false if we did not receive an error (see my discussion with Gil about that), so if I remove the "this.networkEnabled = false;" line, we hit that assert.

private canUseNetwork(): boolean {
// TODO(mikelehen): This could take into account isPrimary when we merge
// with multitab.
return this.networkEnabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! :)

await this.remoteStore.shutdown();
await this.persistence.shutdown(/* deleteData= */ true);
await this.destroyPersistence();
await this.queue.enqueue(async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope this is going to solve all the shutdown troubles I see in the spec tests when I hit assertions. Fingers crossed!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope so, but I'm not sure it will. Yesterday I fell into a quagmire of confusing errors that I think were due to us inconsistently queuing spec step stuff on the asyncQueue. :-/

Copy link
Contributor

@wilhuff wilhuff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm generally in favor of where this is going.

// closed, but just in case we make this a no-op.
return;
}
assert(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this assertion is valid to make.

If the user stops the network as the timer fires it's possible for both the disableNetwork and the resume-from-backoff block to end up enqueued on the async queue. Therefore performBackoff has to tolerate the possibility that the state has changed underneath it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe because we call this.backoff.cancel() in close() and close() is the only way to get out of the Backoff state before the timer fires. (and timers guarantee that if you cancel them while on the AsyncQueue then the callback will not be run.)

So even though I've added your stateSequenceNumber suggestion, I haven't used it here because the code is designed to be safe without it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I see what you mean re inherent safety.

On the other hand, I'd like to get us out of the business of having to make that kind of argument when reasoning about this code. I would prefer to just handle all the callbacks in a uniform manner so that we don't have to think about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaving this as-is for now. I'd rather not use redundant mechanisms that accomplish the same thing and I think backoff.cancel() is the clearer mechanism. And now that callback cancellation (via closeCount) is no longer tied to this.state but instead just incremented in close(), we deal with that and cancelling the backoff (and idle timers) in the same place, so it should hopefully be more apparent that it's safe. I think this is a reasonable place to end up in until or unless we add generic cancellation support to the AsyncQueue.

If you're not a a fan, let me know.

@@ -157,16 +169,18 @@ export abstract class PersistentStream<
private inactivityTimerPromise: CancelablePromise<void> | null = null;
private stream: Stream<SendType, ReceiveType> | null = null;

protected backoff: ExponentialBackoff;
/** A sentinel object used to ignore auth callbacks for prior attempts. */
private currentAuthAttempt: object = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this actually points to a larger issue with this class. We now have three different ways of handling the same underlying issue: most callbacks are not tolerant of an unexpected state transition but we have to deal with those because users can request network restarts/stops.

As of this PR, the three different mechanisms I'm aware of are these:

  • this.stream which handles late received messages from the server
  • this.currentAuthAttempt, which handles late auth responses
  • this.state, which handles unexpected transitions while waiting for timers (often indirectly via this.isOpen)

I think these can all be handled via the same mechanism because fundamentally we have some delayed action that's expecting the stream to have been in some preceding state. If the state changes while we're waiting we should disregard the next callback.

I propose a stateSequenceNumber, which we bump on every state transition. Any scheduled callback needs to capture the stateSequenceNumber that was current at the time and then disregard the callback if the stateSequenceNumber does not match the one that was current at the time of submitting the delayed action.

This handles all known interruptions as far as I can see and would make our callback handling uniform.

objUtils.forEachNumber(this.listenTargets, (targetId, queryData) => {
this.sendWatchRequest(queryData);
});
}

private async onWatchStreamClose(error?: FirestoreError): Promise<void> {
assert(
this.isNetworkEnabled(),
'onWatchStreamClose() should only be called when the network is enabled'
error !== undefined || !this.shouldStartWatchStream(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of this assertion here because the condition is hard to reason about. It's better to make the simple assertion that error !== undefined inside the if block for this.shouldStartWatchStream().

However, I'm not sure this is a good use of our time. In the caller we're mostly channeling gRPC's response. If it happens to close a stream without supplying an error do we really care? Is it really worth crashing our customers' apps over this?

I'm not arguing against assertions generally: the assertions in the remote store have saved our bacon repeatedly, but the good ones catch our bugs where we failed to anticipate a transition that was actually happening in the wild.

By contrast here it seems like this case of error being null is possible but harmless. If this assertion were to fire in the wild because gRPC was giving us a null error, would we do anything but remove this assertion?

Feel free to push back (especially if I'm not considering all the angles).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention was to guard against calling stop() or the stream idle timeout firing while the stream is actually still needed, but I agree that intention wasn't very clear. I tweaked the code / assert to hopefully make it clearer.

I've verified that any actual error callback on the stream will have a defined error (without needing to make any assumptions about grpc or webchannel internals), so I'm inclined to keep the assert. But I could be persuaded otherwise.

return (
this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES
);
return this.writePipeline.length < MAX_PENDING_WRITES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fillWritePipeline() above is called by the syncEngine after a local write. Why would we want batches to be in the pipeline while the network is disabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH it just makes more sense to me. I don't see what we gain by throwing away the write pipeline when the network is disabled and then re-filling it (with the exact same writes) once it's enabled again. We may as well just always be filling the write pipeline and then it will just consistently contain the next 10 writes to send to the write stream.

This is similar to how listenTargets persists across the network being disabled / re-enabled.

The only time we actually want to clear the write pipeline is on a user change, and so I reworked the handleUserChange code to do that explicitly.

Does this explanation sway you? We may just have different conceptual models of what the write pipeline represents... I'm willing to be convinced to reconsider.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure that the multi-tab code continues to throw away writes that were executed by other clients. This is simplified if we throw away the in-memory state and then re-fill the write pipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm... I guess that's a fair point. With multi-tab we'll have 2 places (handleUserChange and applyPrimaryState) where we really do need to re-sync with the LocalStore.

Okay... I've reverted the parts of my PR related to this (we now clear the write pipeline whenever the network is disabled, disallow adding while it's disabled, and re-fetch the lastStreamToken whenever the network is re-enabled). And so I also brought back the disableNetworkInternal() / enableNetwork() methods from before as well and use them in start, shutdown, enable/disableNetwork, and handleUserChange again. And now you can use them in applyPrimaryStateChange when multi-tab merges.

Sorry for the churn and un-churn.

Copy link
Contributor Author

@mikelehen mikelehen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL

@@ -43,21 +43,38 @@ interface ListenRequest extends api.ListenRequest {
export interface WriteRequest extends api.WriteRequest {
database?: string;
}

/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

@@ -157,16 +169,18 @@ export abstract class PersistentStream<
private inactivityTimerPromise: CancelablePromise<void> | null = null;
private stream: Stream<SendType, ReceiveType> | null = null;

protected backoff: ExponentialBackoff;
/** A sentinel object used to ignore auth callbacks for prior attempts. */
private currentAuthAttempt: object = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea! I liked it initially and went ahead and implemented it... I'm a bit unsure of the final result...

  1. There's a bit of a wart in that we intentionally change the state from Starting to Open after we've attached our stream handlers, so we have to allow for that.
  2. That wart makes me question whether tying callback validity to state is really the right thing to do. In a magical ideal world we could just cancel the auth request and/or the underlying stream and rely on no more callbacks coming from it, which I think is what the original implementation implemented. Using state to implicitly cancel callbacks is perhaps a little less direct.
  3. That said, I like the unification and integrating with the AsyncQueue since it makes it harder to mess up.

I'm also not sure how this will look when ported to the other platforms. It'll be more jarring in any case since iOS / Android didn't have the dispatchIfStillActive() mechanism.

WDYT? I'm fine going either way.

(FWIW, only your first two mechanisms are actually issues because our timers are already AsyncQueue-aware and explicitly canceled on state transitions--in particular, we call backoff.cancel() in our close() implementation).


// Create sentinel object used to identify this auth attempt in case it
// is invalidated by a call to stop().
const authAttempt = (this.currentAuthAttempt = new Object());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream can't be created until we have the auth token, and so this.stream is going to be null during the auth request.

// closed, but just in case we make this a no-op.
return;
}
assert(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe because we call this.backoff.cancel() in close() and close() is the only way to get out of the Backoff state before the timer fires. (and timers guarantee that if you cancel them while on the AsyncQueue then the callback will not be run.)

So even though I've added your stateSequenceNumber suggestion, I haven't used it here because the code is designed to be safe without it.

objUtils.forEachNumber(this.listenTargets, (targetId, queryData) => {
this.sendWatchRequest(queryData);
});
}

private async onWatchStreamClose(error?: FirestoreError): Promise<void> {
assert(
this.isNetworkEnabled(),
'onWatchStreamClose() should only be called when the network is enabled'
error !== undefined || !this.shouldStartWatchStream(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention was to guard against calling stop() or the stream idle timeout firing while the stream is actually still needed, but I agree that intention wasn't very clear. I tweaked the code / assert to hopefully make it clearer.

I've verified that any actual error callback on the stream will have a defined error (without needing to make any assumptions about grpc or webchannel internals), so I'm inclined to keep the assert. But I could be persuaded otherwise.

return (
this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES
);
return this.writePipeline.length < MAX_PENDING_WRITES;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH it just makes more sense to me. I don't see what we gain by throwing away the write pipeline when the network is disabled and then re-filling it (with the exact same writes) once it's enabled again. We may as well just always be filling the write pipeline and then it will just consistently contain the next 10 writes to send to the write stream.

This is similar to how listenTargets persists across the network being disabled / re-enabled.

The only time we actually want to clear the write pipeline is on a user change, and so I reworked the handleUserChange code to do that explicitly.

Does this explanation sway you? We may just have different conceptual models of what the write pipeline represents... I'm willing to be convinced to reconsider.

await this.remoteStore.shutdown();
await this.persistence.shutdown(/* deleteData= */ true);
await this.destroyPersistence();
await this.queue.enqueue(async () => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope so, but I'm not sure it will. Yesterday I fell into a quagmire of confusing errors that I think were due to us inconsistently queuing spec step stuff on the asyncQueue. :-/

const networkEnabled = this.networkEnabled;
this.networkEnabled = false;
this.writeStream.stop();
this.watchStream.stop();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that is a potential option, though I would be nervous about relying on that... And right now I actually have an assert that explicitly makes sure that shouldStartWatchStream() returns false if we did not receive an error (see my discussion with Gil about that), so if I remove the "this.networkEnabled = false;" line, we hit that assert.

'Expected stream to be in state Starting, but was ' + this.state
);
this.updateState(PersistentStreamState.Open);
// Need to recreate our dispatcher since we changed the state.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the wart I mentioned...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I we could rename the "state sequence number" to be a stream generation number to make this clearer. As long as we're progressing down the happy path of Initial -> Starting -> Open state transitions don't matter as far as versioning goes. It's any transition off that happy path that should cause us to disregard a prior callback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, yeah. That is better. I implemented this but called it closeCount (used with the now renamed closeGuardedDispatcher) because I think that's more precise (we increment it in close()) and understandable (what's a "stream generation"?)... but if you don't like the naming, let me know.

* waiting is complete, the stream will try to open. While in this
* state isStarted() will return YES but isOpen will return false.
* re-starting. After waiting is complete, the stream will try to open.
* While in this state isStarted() will return true but isOpen will return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: While in this state, isStarted() will return true but isOpen() will return..

I had to do a double take as I was reading this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are actually a bunch of these. I started to fix them but decided it added too much noise to the PR. I'll send a subsequent PR.


const dispatchIfStateUnchanged = this.stateGuardedDispatcher();

// TODO(mikelehen): Just use dispatchIfStillActive, but see TODO below.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name is outdated in this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* This allows us to turn auth / stream callbacks into no-ops if the stream
* is closed / re-opened, etc.
*/
private stateGuardedDispatcher(): (fn: () => Promise<void>) => void {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be cleaner if it:

  • was called something along the lines of dispatchIfStateUnchanged
  • you passed in the state sequence number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented the second suggestion but am pushing back on the first.

The result:

    const dispatchIfNotClosed =
        this.closeGuardedDispatcher(this.closeCount);

I tried renaming but I don't like that the method and its return value have the same name:

    const dispatchIfNotClosed =
        this.dispatchIfNotClosed(this.closeCount);

And it really is a function that creates/returns a dispatcher. So I think the current name is pretty accurate albeit a bit awkward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closeGuardedDispatcher still seems a little off to me. The name is very tied to its implementation, but at first glance it doesn't seem to convey what it is used for (it might also just be the choice of words).

I don't have a good suggestion for you, but even something like getCloseGuardedDispatcher() might clear up some of my confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCloseGuardedDispatcher() SGTM

return (
this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES
);
return this.writePipeline.length < MAX_PENDING_WRITES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure that the multi-tab code continues to throw away writes that were executed by other clients. This is simplified if we throw away the in-memory state and then re-fill the write pipeline.

@schmidt-sebastian schmidt-sebastian removed their assignment Jul 27, 2018
Copy link
Contributor Author

@mikelehen mikelehen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback addressed-ish. PTAL

* waiting is complete, the stream will try to open. While in this
* state isStarted() will return YES but isOpen will return false.
* re-starting. After waiting is complete, the stream will try to open.
* While in this state isStarted() will return true but isOpen will return
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are actually a bunch of these. I started to fix them but decided it added too much noise to the PR. I'll send a subsequent PR.


const dispatchIfStateUnchanged = this.stateGuardedDispatcher();

// TODO(mikelehen): Just use dispatchIfStillActive, but see TODO below.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// closed, but just in case we make this a no-op.
return;
}
assert(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaving this as-is for now. I'd rather not use redundant mechanisms that accomplish the same thing and I think backoff.cancel() is the clearer mechanism. And now that callback cancellation (via closeCount) is no longer tied to this.state but instead just incremented in close(), we deal with that and cancelling the backoff (and idle timers) in the same place, so it should hopefully be more apparent that it's safe. I think this is a reasonable place to end up in until or unless we add generic cancellation support to the AsyncQueue.

If you're not a a fan, let me know.

* This allows us to turn auth / stream callbacks into no-ops if the stream
* is closed / re-opened, etc.
*/
private stateGuardedDispatcher(): (fn: () => Promise<void>) => void {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented the second suggestion but am pushing back on the first.

The result:

    const dispatchIfNotClosed =
        this.closeGuardedDispatcher(this.closeCount);

I tried renaming but I don't like that the method and its return value have the same name:

    const dispatchIfNotClosed =
        this.dispatchIfNotClosed(this.closeCount);

And it really is a function that creates/returns a dispatcher. So I think the current name is pretty accurate albeit a bit awkward.

return (
this.isNetworkEnabled() && this.writePipeline.length < MAX_PENDING_WRITES
);
return this.writePipeline.length < MAX_PENDING_WRITES;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm... I guess that's a fair point. With multi-tab we'll have 2 places (handleUserChange and applyPrimaryState) where we really do need to re-sync with the LocalStore.

Okay... I've reverted the parts of my PR related to this (we now clear the write pipeline whenever the network is disabled, disallow adding while it's disabled, and re-fetch the lastStreamToken whenever the network is re-enabled). And so I also brought back the disableNetworkInternal() / enableNetwork() methods from before as well and use them in start, shutdown, enable/disableNetwork, and handleUserChange again. And now you can use them in applyPrimaryStateChange when multi-tab merges.

Sorry for the churn and un-churn.

'Expected stream to be in state Starting, but was ' + this.state
);
this.updateState(PersistentStreamState.Open);
// Need to recreate our dispatcher since we changed the state.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, yeah. That is better. I implemented this but called it closeCount (used with the now renamed closeGuardedDispatcher) because I think that's more precise (we increment it in close()) and understandable (what's a "stream generation"?)... but if you don't like the naming, let me know.

Copy link
Contributor

@schmidt-sebastian schmidt-sebastian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, minus a request to spend a minute or so on the name of closeGuardedDispatcher.

}
async start(): Promise<void> {
// Load any saved stream token from persistent storage
this.writeStream.lastStreamToken = await this.localStore.getLastStreamToken();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are doing this twice now. Once instart() and then right below in enableNetwork().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks! I meant to remove this one.

if (this.writePipeline.length > 0) {
log.debug(
LOG_TAG,
'Stopping write stream with ' +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'm guesstimating that this might fit on one line if you change it to use backticks:

`Stopping write stream with ${this.writePipeline.length} pending writes`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope! Nice try though.

* This allows us to turn auth / stream callbacks into no-ops if the stream
* is closed / re-opened, etc.
*/
private stateGuardedDispatcher(): (fn: () => Promise<void>) => void {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closeGuardedDispatcher still seems a little off to me. The name is very tied to its implementation, but at first glance it doesn't seem to convey what it is used for (it might also just be the choice of words).

I don't have a good suggestion for you, but even something like getCloseGuardedDispatcher() might clear up some of my confusion.