Skip to content

Commit 7ef6106

Browse files
authored
Algod Importer: Longer Timeout (#133)
1 parent 9be0f37 commit 7ef6106

File tree

3 files changed

+68
-41
lines changed

3 files changed

+68
-41
lines changed

conduit/plugins/importers/algod/algod_importer.go

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ const (
4242
)
4343

4444
var (
45-
waitForRoundTimeout = 5 * time.Second
45+
waitForRoundTimeout = 30 * time.Second
4646
)
4747

4848
const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt"
@@ -419,12 +419,31 @@ func (algodImp *algodImporter) getDelta(rnd uint64) (sdk.LedgerStateDelta, error
419419

420420
// SyncError is used to indicate algod and conduit are not synchronized.
421421
type SyncError struct {
422-
rnd uint64
423-
expected uint64
422+
// retrievedRound is the round returned from an algod status call.
423+
retrievedRound uint64
424+
425+
// expectedRound is the round conduit expected to have gotten back.
426+
expectedRound uint64
427+
428+
// err is the error that was received from the endpoint caller.
429+
err error
430+
}
431+
432+
// NewSyncError creates a new SyncError.
433+
func NewSyncError(retrievedRound, expectedRound uint64, err error) *SyncError {
434+
return &SyncError{
435+
retrievedRound: retrievedRound,
436+
expectedRound: expectedRound,
437+
err: err,
438+
}
424439
}
425440

426441
func (e *SyncError) Error() string {
427-
return fmt.Sprintf("wrong round returned from status for round: %d != %d", e.rnd, e.expected)
442+
return fmt.Sprintf("wrong round returned from status for round: retrieved(%d) != expected(%d): %v", e.retrievedRound, e.expectedRound, e.err)
443+
}
444+
445+
func (e *SyncError) Unwrap() error {
446+
return e.err
428447
}
429448

430449
func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Client, rnd uint64, to time.Duration) (uint64, error) {
@@ -440,10 +459,8 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli
440459
if rnd <= status.LastRound {
441460
return status.LastRound, nil
442461
}
443-
return 0, &SyncError{
444-
rnd: status.LastRound,
445-
expected: rnd,
446-
}
462+
// algod's timeout should not be reached because context.WithTimeout is used
463+
return 0, NewSyncError(status.LastRound, rnd, fmt.Errorf("sync error, likely due to status after block timeout"))
447464
}
448465

449466
// If there was a different error and the node is responsive, call status before returning a SyncError.
@@ -454,10 +471,7 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli
454471
return 0, fmt.Errorf("unable to get status after block and status: %w", errors.Join(err, err2))
455472
}
456473
if status2.LastRound < rnd {
457-
return 0, &SyncError{
458-
rnd: status.LastRound,
459-
expected: rnd,
460-
}
474+
return 0, NewSyncError(status2.LastRound, rnd, fmt.Errorf("status2.LastRound mismatch: %w", err))
461475
}
462476

463477
// This is probably a connection error, not a SyncError.
@@ -470,10 +484,7 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)
470484

471485
nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout)
472486
if err != nil {
473-
// If context has expired.
474-
if algodImp.ctx.Err() != nil {
475-
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
476-
}
487+
err = fmt.Errorf("called waitForRoundWithTimeout: %w", err)
477488
algodImp.logger.Errorf(err.Error())
478489
return data.BlockData{}, err
479490
}
@@ -483,13 +494,14 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)
483494
dt := time.Since(start)
484495
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
485496
if err != nil {
486-
algodImp.logger.Errorf("error getting block for round %d: %s", rnd, err.Error())
497+
err = fmt.Errorf("error getting block for round %d: %w", rnd, err)
498+
algodImp.logger.Errorf(err.Error())
487499
return data.BlockData{}, err
488500
}
489501
tmpBlk := new(models.BlockResponse)
490502
err = msgpack.Decode(blockbytes, tmpBlk)
491503
if err != nil {
492-
return blk, err
504+
return blk, fmt.Errorf("error decoding block for round %d: %w", rnd, err)
493505
}
494506

495507
blk.BlockHeader = tmpBlk.Block.BlockHeader
@@ -523,10 +535,10 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
523535
if err != nil {
524536
target := &SyncError{}
525537
if errors.As(err, &target) {
526-
algodImp.logger.Warnf("Sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
538+
algodImp.logger.Warnf("importer algod.GetBlock() sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
527539
_, _ = algodImp.aclient.SetSyncRound(rnd).Do(algodImp.ctx)
528540
} else {
529-
err = fmt.Errorf("error getting block for round %d, check node configuration: %s", rnd, err)
541+
err = fmt.Errorf("importer algod.GetBlock() error getting block for round %d, check node configuration: %s", rnd, err)
530542
algodImp.logger.Errorf(err.Error())
531543
}
532544
return data.BlockData{}, err

conduit/plugins/importers/algod/algod_importer_test.go

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,18 @@ package algodimporter
33
import (
44
"context"
55
"fmt"
6-
"github.com/sirupsen/logrus"
7-
"github.com/sirupsen/logrus/hooks/test"
8-
"github.com/stretchr/testify/assert"
9-
"github.com/stretchr/testify/require"
10-
"gopkg.in/yaml.v3"
116
"net/http"
127
"net/http/httptest"
138
"strings"
149
"testing"
1510
"time"
1611

12+
"github.com/sirupsen/logrus"
13+
"github.com/sirupsen/logrus/hooks/test"
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
"gopkg.in/yaml.v3"
17+
1718
"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
1819
"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
1920
sdk "github.com/algorand/go-algorand-sdk/v2/types"
@@ -681,34 +682,39 @@ func TestGetBlockErrors(t *testing.T) {
681682
name: "Cannot wait for block",
682683
rnd: 123,
683684
blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{LastRound: 1}}),
684-
err: fmt.Sprintf("error getting block for round 123"),
685+
blockResponder: nil,
686+
deltaResponder: nil,
687+
err: "error getting block for round 123",
685688
logs: []string{"error getting block for round 123"},
686689
},
687690
{
688691
name: "Cannot get block",
689692
rnd: 123,
690693
blockAfterResponder: BlockAfterResponder,
691-
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
692694
blockResponder: MakeMsgpStatusResponder("get", "/v2/blocks/", http.StatusNotFound, ""),
693-
err: fmt.Sprintf("error getting block for round 123"),
695+
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
696+
err: "error getting block for round 123",
694697
logs: []string{"error getting block for round 123"},
695698
},
696699
{
697-
name: "Cannot get delta (node behind, re-send sync)",
700+
name: "Cannot get delta - node behind, re-send sync",
698701
rnd: 200,
699702
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}),
700703
blockResponder: BlockResponder,
701704
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
702-
err: fmt.Sprintf("wrong round returned from status for round: 50 != 200"),
703-
logs: []string{"wrong round returned from status for round: 50 != 200", "Sync error detected, attempting to set the sync round to recover the node"},
705+
err: "wrong round returned from status for round: retrieved(50) != expected(200)",
706+
logs: []string{
707+
"wrong round returned from status for round: retrieved(50) != expected(200)",
708+
"sync error detected, attempting to set the sync round to recover the node",
709+
},
704710
},
705711
{
706-
name: "Cannot get delta (caught up)",
712+
name: "Cannot get delta - caught up",
707713
rnd: 200,
708714
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 200}),
709715
blockResponder: BlockResponder,
710716
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
711-
err: fmt.Sprintf("ledger state delta not found: node round (200), required round (200)"),
717+
err: "ledger state delta not found: node round (200), required round (200)",
712718
logs: []string{"ledger state delta not found: node round (200), required round (200)"},
713719
},
714720
}
@@ -752,21 +758,26 @@ func TestGetBlockErrors(t *testing.T) {
752758
_, err = testImporter.GetBlock(tc.rnd)
753759
noError := assert.ErrorContains(t, err, tc.err)
754760

755-
// Make sure each of the expected log messages are present
761+
// Make sure each of the expected log messages are present in the hookEntries
762+
hookEntries := hook.AllEntries()
756763
for _, log := range tc.logs {
757764
found := false
758-
for _, entry := range hook.AllEntries() {
759-
fmt.Println(strings.Contains(entry.Message, log))
760-
found = found || strings.Contains(entry.Message, log)
765+
for _, entry := range hookEntries {
766+
logIsSubstring := strings.Contains(entry.Message, log)
767+
found = found || logIsSubstring
768+
fmt.Printf("logIsSubstring=%t, found=%t:\n\t%s\n", logIsSubstring, found, entry.Message)
761769
}
762-
noError = noError && assert.True(t, found, "Expected log was not found: '%s'", log)
770+
if !found {
771+
fmt.Printf(">>>>>>WE HAVE A PROBLEM<<<<<<\n")
772+
}
773+
noError = noError && assert.True(t, found, "(%s) Expected log was not found: '%s'", tc.name, log)
763774
}
764775

765776
// Print logs if there was an error.
766777
if !noError {
767-
fmt.Println("An error was detected, printing logs")
778+
fmt.Printf("An error was detected, printing logs (%s)\n", tc.name)
768779
fmt.Println("------------------------------------")
769-
for _, entry := range hook.AllEntries() {
780+
for _, entry := range hookEntries {
770781
fmt.Printf(" %s\n", entry.Message)
771782
}
772783
fmt.Println("------------------------------------")

conduit/plugins/importers/algod/mock_algod_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ func NewAlgodHandler(responders ...algodCustomHandler) *AlgodHandler {
3333

3434
// ServeHTTP implements the http.Handler interface for AlgodHandler
3535
func (handler *AlgodHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
36-
for _, responder := range handler.responders {
36+
for i, responder := range handler.responders {
37+
_ = i
38+
if responder == nil {
39+
continue
40+
}
3741
if responder(req, w) {
3842
return
3943
}

0 commit comments

Comments
 (0)