Skip to content

Conversation

@peterbroadhurst
Copy link
Contributor

@peterbroadhurst peterbroadhurst commented Mar 17, 2022

Resolves #601
Resolves #568

Summary of the changes in this PR:

  1. Add workers that download from IPFS asynchronously, then notify with callbacks when they are ready

  2. Add batch_hash column to pins

    • So we can compare the hash of the persisted batch, against the pin - common to private and broadcast
  3. Add option to set outputs to the operation manager

    • Useful to have the outputs persisted from download, so you can see the public refs
  4. Add operations for downloading shared storage (IPFS) batches and blobs

  5. Add operation for uploading blobs (per Operations: Add operation for Shared Storage (IPFS) uploads, and move API calls to batch worker #568)

  6. More consistently name all operation types, with type_verb_noun

    • blockchain_pin_batch - previously blockchain_batch_pin
    • dataexchange_send_batch - previously dataexchange_batch_send
    • dataexchange_send_blob - previously dataexchange_blob_send
    • sharedstorage_upload_batch - previously sharedstorage_batch_broadcast
    • sharedstorage_upload_blob - new
    • sharedstorage_download_batch - new
    • sharedstorage_download_blob - new
  7. Fixes a bug where we might miss rewinds to process pins after batch arrival

    • Fix: rewindPollingOffset needed to return the decision it made on whether to rewind in event_poller.go
    • Cause: The rewindPollingOffset code in the event poller checks for queued rewinds each time round the polling loop, and contained a check to make sure it only went backwards. However, it didn't return the outcome of that check to the calling function, which then always assumed the rewind had been applied. Meaning if there was a "rewind forwards" scenario, it could do it and mean it missed pins. This left those messages stalled.
  8. Fixes a bug where a node receiving messages, but not sending any, could delay for a long time before confirming messages.

    • Fix: Add a new goroutine to the batch manager, to continually process new-message events
    • Cause: There was a situation on the recently updated batching logic, where it wasn't consuming new-message events while waiting in a polling cycle. As on the 2nd node there are no events arriving, that was meaning Database commits were blocked waiting to emit these new-message events.

@peterbroadhurst peterbroadhurst force-pushed the ipfs-async branch 9 times, most recently from 98a2ed5 to f9a4d59 Compare March 18, 2022 14:26
rewind, pollingOffset := ep.conf.maybeRewind()
if rewind {
ep.rewindPollingOffset(pollingOffset)
pollingOffset = ep.rewindPollingOffset(pollingOffset)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a check in the rewindPollingOffset function that we only go backwards in time.

However we didn't return the choice we made - so the calling function was rewinding forwards. This cause a really hard to track down bug that's eaten a bunch of my cycles.

The reason we're only seeing this now, is because with the batches for broadcast and private both rewinding, it's much more likely you'll hit a rewind forwards scenario.

return om.RunOperationWithFailState(ctx, op, fftypes.OpStatusFailed)
}

func (om *operationsManager) RunOperationWithFailState(ctx context.Context, op *fftypes.PreparedOperation, failState fftypes.OpStatus) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name confuses me a little, and I can foresee maybe needing other options in the future... so I wonder if a RunOperationWithOptions would be more future-proof, and have an options struct something like:

{
    // If the operation fails, keep it in "Pending" state
    RemainPendingOnFailure bool
}

I'm open to spelling suggestions, but having documentation for the option(s) would be a nice plus IMO.

Side note - the batch_processor has a similar retry loop built in to the code, but I think failed operations are left in a "Failed" state as they're being retried, rather than staying "Pending". Might be worth a look sometime on whether this functionality should be leveraged there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated RunOperation to take the options as optional varargs param

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the feedback on the broadcast publish, and I've used the option there too.

// OpTypeBlockchainBatchPin is a blockchain transaction to pin a batch
OpTypeBlockchainBatchPin OpType = ffEnum("optype", "blockchain_batch_pin")
// OpTypeBlockchainPinBatch is a blockchain transaction to pin a batch
OpTypeBlockchainPinBatch OpType = ffEnum("optype", "blockchain_pin_batch")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should make sure @eberger727 is plugged into these name changes for any filtering in-flight on the firefly-ui

}).Manifest(b.ID)
}

func (b *BatchPersisted) Inflight(messages []*Message, data DataArray) *Batch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps GenInflight or GenBatch to match the helper above...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe the helper above should just be Manifest... not terribly picky here, but we do now have a lot of utilities for converting the various representations of a batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went for GenInflight 👍

fftypes.OpTypeSharedStorageDownloadBlob,
}),
fb.Eq("status", fftypes.OpStatusPending),
fb.Lt("created", startupTime), // retry is handled completely separately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we worried about querying based on time? If the clock moved backwards would this potentially miss some items?

Definitely an edge case, but want to at least track it further if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't have a significantly better solution here off the top of my head.
Think the double edge case, of clock skew, combined with a full runtime restart, feels acceptable risk for now.

dm.cancelFunc()
for _, w := range dm.workers {
<-w.done
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wait for recoveryComplete?

// See the License for the specific language governing permissions and
// limitations under the License.

package ssdownload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S.S. Download 🚢

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name looks funny whenever I read it, but I really have nothing better to offer. Could use simply download, but maybe that's too generic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fought with this. Started with just download, but it's so coupled to shared storage that I added download (and used sd as its short name). Might go for shareddownload, which would put it next to sharedstorage in the components at least.


// We need to work out what pins potentially are unblocked by the arrival of this data

// Find any data associated with this blob
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider caching these data/message lookups?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially, yes. But I'm going to defer this. Current assumption is that blobs are going to be used when there is heavy data, and the resulting saving in caching for these small data object lookups is hence less significant.


func (ag *aggregator) rewindForBlobArrival(ctx context.Context, blobHash *fftypes.Bytes32) error {

batchIDs := make(map[fftypes.UUID]bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just be an array instead of a map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map is used to de-dup, as multiple messages might be associated.

if err := em.persistBatchTransaction(ctx, batchPin); err != nil {
return err
}
private := batchPin.BatchPayloadRef == ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This felt a little surprising, but I guess it is an accurate way to check for private vs broadcast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm actually I guess it's how we've always been differentiating 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we try to be compact on the blockchain itself. So this is inferred, rather than having a separate "type" field

}

func (ed *eventDispatcher) getEvents(ctx context.Context, filter database.Filter) ([]fftypes.LocallySequenced, error) {
func (ed *eventDispatcher) getEvents(ctx context.Context, filter database.Filter, offset int64) ([]fftypes.LocallySequenced, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like this param is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was added for logging on the the common interface with pins - will add similar logging here

Copy link
Contributor

@awrichar awrichar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good - nice to see the continuing convergence of the broadcast/private paths. I left a few questions and cleanup notes, but nothing major.

@codecov-commenter
Copy link

codecov-commenter commented Mar 20, 2022

Codecov Report

Merging #607 (bdf4b4a) into main (224ea30) will not change coverage.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##              main      #607    +/-   ##
==========================================
  Coverage   100.00%   100.00%            
==========================================
  Files          305       309     +4     
  Lines        18357     18705   +348     
==========================================
+ Hits         18357     18705   +348     
Impacted Files Coverage Δ
internal/assets/manager.go 100.00% <ø> (ø)
internal/contracts/manager.go 100.00% <ø> (ø)
internal/database/sqlcommon/data_sql.go 100.00% <ø> (ø)
pkg/fftypes/operation.go 100.00% <ø> (ø)
pkg/fftypes/pin.go 100.00% <ø> (ø)
internal/assets/operations.go 100.00% <100.00%> (ø)
internal/batch/batch_manager.go 100.00% <100.00%> (ø)
internal/batch/batch_processor.go 100.00% <100.00%> (ø)
internal/batchpin/batchpin.go 100.00% <100.00%> (ø)
internal/batchpin/operations.go 100.00% <100.00%> (ø)
... and 34 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 224ea30...bdf4b4a. Read the comment docs.

Signed-off-by: Peter Broadhurst <[email protected]>
@peterbroadhurst peterbroadhurst marked this pull request as ready for review March 20, 2022 16:18
Signed-off-by: Peter Broadhurst <[email protected]>
@peterbroadhurst peterbroadhurst merged commit 06f0ed3 into hyperledger:main Mar 20, 2022
@peterbroadhurst peterbroadhurst deleted the ipfs-async branch March 20, 2022 16:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

IPFS Download Workers Operations: Add operation for Shared Storage (IPFS) uploads, and move API calls to batch worker

3 participants