diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 2f89f233..d23979d0 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -383,12 +383,13 @@ func (p *pipelineImpl) Start() { for { pipelineRun: metrics.PipelineRetryCount.Observe(float64(retry)) - if retry > p.cfg.RetryCount { + if retry > p.cfg.RetryCount && p.cfg.RetryCount != 0 { p.logger.Errorf("Pipeline has exceeded maximum retry count (%d) - stopping...", p.cfg.RetryCount) return } if retry > 0 { + p.logger.Infof("Retry number %d resuming after a %s retry delay.", retry, p.cfg.RetryDelay) time.Sleep(p.cfg.RetryDelay) } diff --git a/conduit/pipeline/pipeline_test.go b/conduit/pipeline/pipeline_test.go index 97dcd436..73692333 100644 --- a/conduit/pipeline/pipeline_test.go +++ b/conduit/pipeline/pipeline_test.go @@ -10,6 +10,7 @@ import ( "os" "path" "path/filepath" + "strings" "sync" "testing" "time" @@ -20,6 +21,7 @@ import ( "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/algorand/conduit/conduit" "github.com/algorand/conduit/conduit/data" @@ -684,17 +686,37 @@ func (e *errorImporter) GetBlock(_ uint64) (data.BlockData, error) { // TestPipelineRetryVariables tests that modifying the retry variables results in longer time taken for a pipeline to run func TestPipelineRetryVariables(t *testing.T) { + maxDuration := 5 * time.Second + epsilon := 250 * time.Millisecond // allow for some error in timing tests := []struct { name string retryDelay time.Duration retryCount uint64 totalDuration time.Duration - epsilon time.Duration }{ - {"0 seconds", 2 * time.Second, 0, 0 * time.Second, 1 * time.Second}, - {"2 seconds", 2 * time.Second, 1, 2 * time.Second, 1 * time.Second}, - {"4 seconds", 2 * time.Second, 2, 4 * time.Second, 1 * time.Second}, - {"10 seconds", 2 * time.Second, 5, 10 * time.Second, 1 * time.Second}, + { + name: "retryCount=0 (unlimited)", + retryDelay: 500 * time.Millisecond, + totalDuration: maxDuration, + }, + { + name: "retryCount=1", + retryDelay: 500 * time.Millisecond, + retryCount: 1, + totalDuration: 500 * time.Millisecond, + }, + { + name: "retryCount=2", + retryDelay: 500 * time.Millisecond, + retryCount: 2, + totalDuration: 1 * time.Second, + }, + { + name: "retryCount=5", + retryDelay: 500 * time.Millisecond, + retryCount: 5, + totalDuration: 2500 * time.Millisecond, + }, } for _, testCase := range tests { t.Run(testCase.name, func(t *testing.T) { @@ -703,9 +725,10 @@ func TestPipelineRetryVariables(t *testing.T) { var pImporter importers.Importer = errImporter var pProcessor processors.Processor = &mockProcessor{} var pExporter exporters.Exporter = &mockExporter{} - l, _ := test.NewNullLogger() + l, hook := test.NewNullLogger() + ctx, cf := context.WithCancel(context.Background()) pImpl := pipelineImpl{ - ctx: context.Background(), + ctx: ctx, cfg: &data.Config{ RetryCount: testCase.retryCount, RetryDelay: testCase.retryDelay, @@ -744,15 +767,36 @@ func TestPipelineRetryVariables(t *testing.T) { err := pImpl.Init() assert.Nil(t, err) before := time.Now() + done := false + // test for "unlimited" timeout + go func() { + time.Sleep(maxDuration) + if !done { + cf() + assert.Equal(t, testCase.totalDuration, maxDuration) + } + }() pImpl.Start() pImpl.wg.Wait() after := time.Now() timeTaken := after.Sub(before) - msg := fmt.Sprintf("seconds taken: %s, expected duration seconds: %s, epsilon: %s", timeTaken.String(), testCase.totalDuration.String(), testCase.epsilon.String()) - assert.WithinDurationf(t, before.Add(testCase.totalDuration), after, testCase.epsilon, msg) - assert.Equal(t, errImporter.GetBlockCount, testCase.retryCount+1) - + msg := fmt.Sprintf("seconds taken: %s, expected duration seconds: %s, epsilon: %s", timeTaken.String(), testCase.totalDuration.String(), epsilon) + assert.WithinDurationf(t, before.Add(testCase.totalDuration), after, epsilon, msg) + if testCase.retryCount == 0 { + assert.GreaterOrEqual(t, errImporter.GetBlockCount, uint64(1)) + } else { + assert.Equal(t, errImporter.GetBlockCount, testCase.retryCount+1) + } + done = true + fmt.Println(hook.AllEntries()) + for _, entry := range hook.AllEntries() { + str, err := entry.String() + require.NoError(t, err) + if strings.HasPrefix(str, "Retry number 1") { + assert.Equal(t, "Retry number 1 resuming after a 500ms retry delay.", str) + } + } }) } } diff --git a/conduit/plugins/importers/algod/algod_importer.go b/conduit/plugins/importers/algod/algod_importer.go index ff289e07..43f6e425 100644 --- a/conduit/plugins/importers/algod/algod_importer.go +++ b/conduit/plugins/importers/algod/algod_importer.go @@ -229,11 +229,7 @@ func checkRounds(logger *logrus.Logger, catchpointRound, nodeRound, targetRound } func (algodImp *algodImporter) needsCatchup(targetRound uint64) bool { - if algodImp.mode == followerMode { - if targetRound == 0 { - algodImp.logger.Info("No state deltas are ever available for round 0") - return true - } + if algodImp.mode == followerMode && targetRound != 0 { // If we are in follower mode, check if the round delta is available. _, err := algodImp.getDelta(targetRound) if err != nil { @@ -272,7 +268,9 @@ func (algodImp *algodImporter) catchupNode(network string, targetRound uint64) e var err error catchpoint, err = getMissingCatchpointLabel(URL, targetRound) if err != nil { - return fmt.Errorf("unable to lookup catchpoint: %w", err) + // catchpoints are only available for past 6 months. + // This case handles the scenario where the catchpoint is not available. + algodImp.logger.Warnf("unable to lookup catchpoint: %s", err) } } } diff --git a/conduit/plugins/importers/algod/algod_importer_test.go b/conduit/plugins/importers/algod/algod_importer_test.go index a8dba1e3..b0df4622 100644 --- a/conduit/plugins/importers/algod/algod_importer_test.go +++ b/conduit/plugins/importers/algod/algod_importer_test.go @@ -228,8 +228,10 @@ func TestInitCatchup(t *testing.T) { catchpoint: "", algodServer: NewAlgodServer( GenesisResponder, + MakePostSyncRoundResponder(http.StatusOK), + MakeJsonResponderSeries("/v2/status", []int{http.StatusOK, http.StatusBadRequest}, []interface{}{models.NodeStatus{LastRound: 1235}}), ), - err: "failed to lookup catchpoint label list", + logs: []string{"failed to lookup catchpoint label list"}, }, { name: "wait for node to catchup error", adminToken: "admin", @@ -712,20 +714,20 @@ func TestNeedsCatchup(t *testing.T) { result: false, }, { - name: "Follower mode round 0, no delta", + name: "Follower mode round 0, no block", mode: followerMode, round: 0, responders: []algodCustomHandler{}, - logMsg: "No state deltas are ever available for round 0", + logMsg: "Unable to fetch block for round 0", result: true, }, { name: "Follower mode round 0, delta", mode: followerMode, round: 0, - responders: []algodCustomHandler{LedgerStateDeltaResponder}, - logMsg: "No state deltas are ever available for round 0", - result: true, + responders: []algodCustomHandler{BlockResponder}, + logMsg: "", + result: false, }, { name: "Archival mode, no block", diff --git a/pkg/cli/internal/initialize/conduit.yml.example b/pkg/cli/internal/initialize/conduit.yml.example index 5ab2072f..22556265 100644 --- a/pkg/cli/internal/initialize/conduit.yml.example +++ b/pkg/cli/internal/initialize/conduit.yml.example @@ -5,6 +5,7 @@ log-level: INFO #log-file: # Number of retries to perform after a pipeline plugin error. +# Set to 0 to retry forever. retry-count: 10 # Time duration to wait between retry attempts.