-
Notifications
You must be signed in to change notification settings - Fork 242
Add Shared Storage Download Manager #607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
98a2ed5 to
f9a4d59
Compare
| rewind, pollingOffset := ep.conf.maybeRewind() | ||
| if rewind { | ||
| ep.rewindPollingOffset(pollingOffset) | ||
| pollingOffset = ep.rewindPollingOffset(pollingOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had a check in the rewindPollingOffset function that we only go backwards in time.
However we didn't return the choice we made - so the calling function was rewinding forwards. This cause a really hard to track down bug that's eaten a bunch of my cycles.
The reason we're only seeing this now, is because with the batches for broadcast and private both rewinding, it's much more likely you'll hit a rewind forwards scenario.
internal/operations/manager.go
Outdated
| return om.RunOperationWithFailState(ctx, op, fftypes.OpStatusFailed) | ||
| } | ||
|
|
||
| func (om *operationsManager) RunOperationWithFailState(ctx context.Context, op *fftypes.PreparedOperation, failState fftypes.OpStatus) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name confuses me a little, and I can foresee maybe needing other options in the future... so I wonder if a RunOperationWithOptions would be more future-proof, and have an options struct something like:
{
// If the operation fails, keep it in "Pending" state
RemainPendingOnFailure bool
}
I'm open to spelling suggestions, but having documentation for the option(s) would be a nice plus IMO.
Side note - the batch_processor has a similar retry loop built in to the code, but I think failed operations are left in a "Failed" state as they're being retried, rather than staying "Pending". Might be worth a look sometime on whether this functionality should be leveraged there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated RunOperation to take the options as optional varargs param
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the feedback on the broadcast publish, and I've used the option there too.
pkg/fftypes/operation.go
Outdated
| // OpTypeBlockchainBatchPin is a blockchain transaction to pin a batch | ||
| OpTypeBlockchainBatchPin OpType = ffEnum("optype", "blockchain_batch_pin") | ||
| // OpTypeBlockchainPinBatch is a blockchain transaction to pin a batch | ||
| OpTypeBlockchainPinBatch OpType = ffEnum("optype", "blockchain_pin_batch") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should make sure @eberger727 is plugged into these name changes for any filtering in-flight on the firefly-ui
pkg/fftypes/batch.go
Outdated
| }).Manifest(b.ID) | ||
| } | ||
|
|
||
| func (b *BatchPersisted) Inflight(messages []*Message, data DataArray) *Batch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps GenInflight or GenBatch to match the helper above...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe the helper above should just be Manifest... not terribly picky here, but we do now have a lot of utilities for converting the various representations of a batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went for GenInflight 👍
| fftypes.OpTypeSharedStorageDownloadBlob, | ||
| }), | ||
| fb.Eq("status", fftypes.OpStatusPending), | ||
| fb.Lt("created", startupTime), // retry is handled completely separately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we worried about querying based on time? If the clock moved backwards would this potentially miss some items?
Definitely an edge case, but want to at least track it further if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't have a significantly better solution here off the top of my head.
Think the double edge case, of clock skew, combined with a full runtime restart, feels acceptable risk for now.
| dm.cancelFunc() | ||
| for _, w := range dm.workers { | ||
| <-w.done | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we wait for recoveryComplete?
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package ssdownload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S.S. Download 🚢
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name looks funny whenever I read it, but I really have nothing better to offer. Could use simply download, but maybe that's too generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, fought with this. Started with just download, but it's so coupled to shared storage that I added download (and used sd as its short name). Might go for shareddownload, which would put it next to sharedstorage in the components at least.
|
|
||
| // We need to work out what pins potentially are unblocked by the arrival of this data | ||
|
|
||
| // Find any data associated with this blob |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to consider caching these data/message lookups?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potentially, yes. But I'm going to defer this. Current assumption is that blobs are going to be used when there is heavy data, and the resulting saving in caching for these small data object lookups is hence less significant.
|
|
||
| func (ag *aggregator) rewindForBlobArrival(ctx context.Context, blobHash *fftypes.Bytes32) error { | ||
|
|
||
| batchIDs := make(map[fftypes.UUID]bool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this just be an array instead of a map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The map is used to de-dup, as multiple messages might be associated.
| if err := em.persistBatchTransaction(ctx, batchPin); err != nil { | ||
| return err | ||
| } | ||
| private := batchPin.BatchPayloadRef == "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This felt a little surprising, but I guess it is an accurate way to check for private vs broadcast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm actually I guess it's how we've always been differentiating 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we try to be compact on the blockchain itself. So this is inferred, rather than having a separate "type" field
| } | ||
|
|
||
| func (ed *eventDispatcher) getEvents(ctx context.Context, filter database.Filter) ([]fftypes.LocallySequenced, error) { | ||
| func (ed *eventDispatcher) getEvents(ctx context.Context, filter database.Filter, offset int64) ([]fftypes.LocallySequenced, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like this param is used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was added for logging on the the common interface with pins - will add similar logging here
awrichar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good - nice to see the continuing convergence of the broadcast/private paths. I left a few questions and cleanup notes, but nothing major.
…y match DX Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
38031a2 to
ce1abc1
Compare
Codecov Report
@@ Coverage Diff @@
## main #607 +/- ##
==========================================
Coverage 100.00% 100.00%
==========================================
Files 305 309 +4
Lines 18357 18705 +348
==========================================
+ Hits 18357 18705 +348
Continue to review full report at Codecov.
|
Signed-off-by: Peter Broadhurst <[email protected]>
bf0ba2e to
ff80503
Compare
Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
Resolves #601
Resolves #568
Summary of the changes in this PR:
Add workers that download from IPFS asynchronously, then notify with callbacks when they are ready
Add
batch_hashcolumn topinsAdd option to set
outputsto the operation managerAdd operations for downloading shared storage (IPFS) batches and blobs
Add operation for uploading blobs (per Operations: Add operation for Shared Storage (IPFS) uploads, and move API calls to batch worker #568)
More consistently name all operation types, with
type_verb_nounblockchain_pin_batch- previouslyblockchain_batch_pindataexchange_send_batch- previouslydataexchange_batch_senddataexchange_send_blob- previouslydataexchange_blob_sendsharedstorage_upload_batch- previouslysharedstorage_batch_broadcastsharedstorage_upload_blob- newsharedstorage_download_batch- newsharedstorage_download_blob- newFixes a bug where we might miss rewinds to process pins after batch arrival
rewindPollingOffsetneeded to return the decision it made on whether to rewind inevent_poller.gorewindPollingOffsetcode in the event poller checks for queued rewinds each time round the polling loop, and contained a check to make sure it only went backwards. However, it didn't return the outcome of that check to the calling function, which then always assumed the rewind had been applied. Meaning if there was a "rewind forwards" scenario, it could do it and mean it missed pins. This left those messages stalled.Fixes a bug where a node receiving messages, but not sending any, could delay for a long time before confirming messages.