Skip to content

Commit 2a0537c

Browse files
authored
algod importer: check if node needs catchup. (#70)
1 parent dd2865a commit 2a0537c

File tree

3 files changed

+287
-150
lines changed

3 files changed

+287
-150
lines changed

conduit/plugins/importers/algod/algod_importer.go

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,58 @@ func checkRounds(logger *logrus.Logger, catchpointRound, nodeRound, targetRound
228228
return false, nil
229229
}
230230

231-
func (algodImp *algodImporter) catchupNode(catchpoint string, targetRound uint64) error {
231+
func (algodImp *algodImporter) needsCatchup(targetRound uint64) bool {
232+
if algodImp.mode == followerMode {
233+
// If we are in follower mode, check if the round delta is available.
234+
_, err := algodImp.getDelta(targetRound)
235+
if err != nil {
236+
algodImp.logger.Infof("Unable to fetch state delta for round %d: %s", targetRound, err)
237+
}
238+
return err != nil
239+
}
240+
241+
// Otherwise just check if the block is available.
242+
_, err := algodImp.aclient.Block(targetRound).Do(algodImp.ctx)
243+
if err != nil {
244+
algodImp.logger.Infof("Unable to fetch block for round %d: %s", targetRound, err)
245+
}
246+
// If the block is not available, we must catchup.
247+
return err != nil
248+
}
249+
250+
// catchupNode facilitates catching up via fast catchup, or waiting for the
251+
// node to slow catchup.
252+
func (algodImp *algodImporter) catchupNode(network string, targetRound uint64) error {
253+
if !algodImp.needsCatchup(targetRound) {
254+
algodImp.logger.Infof("No catchup required to reach round %d", targetRound)
255+
return nil
256+
}
257+
258+
algodImp.logger.Infof("Catchup required to reach round %d", targetRound)
259+
260+
catchpoint := ""
261+
262+
// If there is an admin token, look for a catchpoint to use.
263+
if algodImp.cfg.CatchupConfig.AdminToken != "" {
264+
if algodImp.cfg.CatchupConfig.Catchpoint != "" {
265+
catchpoint = algodImp.cfg.CatchupConfig.Catchpoint
266+
} else {
267+
URL := fmt.Sprintf(catchpointsURL, network)
268+
var err error
269+
catchpoint, err = getMissingCatchpointLabel(URL, targetRound)
270+
if err != nil {
271+
return fmt.Errorf("unable to lookup catchpoint: %w", err)
272+
}
273+
}
274+
}
275+
232276
if catchpoint != "" {
233277
cpRound, err := parseCatchpointRound(catchpoint)
234278
if err != nil {
235279
return err
236280
}
281+
282+
// Get the node status.
237283
nStatus, err := algodImp.aclient.Status().Do(algodImp.ctx)
238284
if err != nil {
239285
return fmt.Errorf("received unexpected error failed to get node status: %w", err)
@@ -327,22 +373,7 @@ func (algodImp *algodImporter) Init(ctx context.Context, initProvider data.InitP
327373
return nil, fmt.Errorf("unable to fetch genesis file from API at %s", algodImp.cfg.NetAddr)
328374
}
329375

330-
catchpoint := ""
331-
332-
// If there is an admin token, look for a catchpoint to use.
333-
if algodImp.cfg.CatchupConfig.AdminToken != "" {
334-
if algodImp.cfg.CatchupConfig.Catchpoint != "" {
335-
catchpoint = algodImp.cfg.CatchupConfig.Catchpoint
336-
} else {
337-
URL := fmt.Sprintf(catchpointsURL, genesis.Network)
338-
catchpoint, err = getMissingCatchpointLabel(URL, uint64(initProvider.NextDBRound()))
339-
if err != nil {
340-
return nil, fmt.Errorf("unable to lookup catchpoint: %w", err)
341-
}
342-
}
343-
}
344-
345-
err = algodImp.catchupNode(catchpoint, uint64(initProvider.NextDBRound()))
376+
err = algodImp.catchupNode(genesis.Network, uint64(initProvider.NextDBRound()))
346377

347378
return &genesis, err
348379
}

0 commit comments

Comments
 (0)