diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 23db9ce1f..dc219192c 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -61,6 +61,10 @@ type SyncService[H header.Header[H]] struct { getterByHeight GetterByHeightFunc[H] rangeGetter RangeGetterFunc[H] storeInitialized atomic.Bool + + // context for background operations + bgCtx context.Context + bgCancel context.CancelFunc } // DataSyncService is the P2P Sync Service for blocks. @@ -153,6 +157,8 @@ func newSyncService[H header.Header[H]]( return nil, fmt.Errorf("failed to initialize the %s store: %w", syncType, err) } + bgCtx, bgCancel := context.WithCancel(context.Background()) + svc := &SyncService[H]{ conf: conf, genesis: genesis, @@ -164,6 +170,8 @@ func newSyncService[H header.Header[H]]( syncType: syncType, logger: logger, syncerStatus: new(SyncerStatus), + bgCtx: bgCtx, + bgCancel: bgCancel, } return svc, nil @@ -389,6 +397,42 @@ func (syncService *SyncService[H]) startSubscriber(ctx context.Context) error { return nil } +// tryInit attempts to initialize the syncer from P2P once. +// Returns true if successful, false otherwise with an error. +func (syncService *SyncService[H]) tryInit(ctx context.Context) (bool, error) { + var ( + trusted H + err error + heightToQuery uint64 + ) + + head, headErr := syncService.store.Head(ctx) + switch { + case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore): + heightToQuery = syncService.genesis.InitialHeight + case headErr != nil: + return false, fmt.Errorf("failed to inspect local store head: %w", headErr) + default: + heightToQuery = head.Height() + } + + if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil { + return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err) + } + + if syncService.storeInitialized.CompareAndSwap(false, true) { + if _, err := syncService.initStore(ctx, trusted); err != nil { + syncService.storeInitialized.Store(false) + return false, fmt.Errorf("failed to initialize the store: %w", err) + } + } + if err := syncService.startSyncer(ctx); err != nil { + return false, err + } + + return true, nil +} + // initFromP2PWithRetry initializes the syncer from P2P with a retry mechanism. // It inspects the local store to determine the first height to request: // - when the store already contains items, it reuses the latest height as the starting point; @@ -398,48 +442,15 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee return nil } - tryInit := func(ctx context.Context) (bool, error) { - var ( - trusted H - err error - heightToQuery uint64 - ) - - head, headErr := syncService.store.Head(ctx) - switch { - case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore): - heightToQuery = syncService.genesis.InitialHeight - case headErr != nil: - return false, fmt.Errorf("failed to inspect local store head: %w", headErr) - default: - heightToQuery = head.Height() - } - - if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil { - return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err) - } - - if syncService.storeInitialized.CompareAndSwap(false, true) { - if _, err := syncService.initStore(ctx, trusted); err != nil { - syncService.storeInitialized.Store(false) - return false, fmt.Errorf("failed to initialize the store: %w", err) - } - } - if err := syncService.startSyncer(ctx); err != nil { - return false, err - } - return true, nil - } - // block with exponential backoff until initialization succeeds or context is canceled. backoff := 1 * time.Second maxBackoff := 10 * time.Second - timeoutTimer := time.NewTimer(time.Minute * 10) + timeoutTimer := time.NewTimer(time.Minute * 2) defer timeoutTimer.Stop() for { - ok, err := tryInit(ctx) + ok, err := syncService.tryInit(ctx) if ok { return nil } @@ -450,7 +461,9 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee case <-ctx.Done(): return ctx.Err() case <-timeoutTimer.C: - return fmt.Errorf("timeout reached while trying to initialize the store after 10 minutes: %w", err) + syncService.logger.Warn().Err(err).Msg("timeout reached while trying to initialize the store, scheduling background retry") + go syncService.retryInitInBackground() + return nil case <-time.After(backoff): } @@ -461,10 +474,40 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee } } +// retryInitInBackground continues attempting to initialize the syncer in the background. +func (syncService *SyncService[H]) retryInitInBackground() { + backoff := 15 * time.Second + maxBackoff := 5 * time.Minute + + for { + select { + case <-syncService.bgCtx.Done(): + syncService.logger.Info().Msg("background retry cancelled") + return + case <-time.After(backoff): + } + + ok, err := syncService.tryInit(syncService.bgCtx) + if ok { + syncService.logger.Info().Msg("successfully initialized store from P2P in background") + return + } + + syncService.logger.Info().Err(err).Dur("retry_in", backoff).Msg("background retry: headers not yet available from peers") + + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } +} + // Stop is a part of Service interface. // // `store` is closed last because it's used by other services. func (syncService *SyncService[H]) Stop(ctx context.Context) error { + syncService.bgCancel() + // unsubscribe from topic first so that sub.Stop() does not fail syncService.topicSubscription.Cancel() err := errors.Join( diff --git a/pkg/sync/sync_service_test.go b/pkg/sync/sync_service_test.go index cd434bc7b..6cf8afa2b 100644 --- a/pkg/sync/sync_service_test.go +++ b/pkg/sync/sync_service_test.go @@ -3,11 +3,14 @@ package sync import ( "context" cryptoRand "crypto/rand" + "errors" "math/rand" "path/filepath" + "sync/atomic" "testing" "time" + "github.com/celestiaorg/go-header" "github.com/evstack/ev-node/pkg/config" genesispkg "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/p2p" @@ -191,3 +194,111 @@ func bytesN(r *rand.Rand, n int) []byte { _, _ = r.Read(data) return data } + +// TestBackgroundRetryEventuallySucceeds verifies that when the sync service cannot +// initially connect to peers, the background retry mechanism is triggered and +// eventually succeeds once headers become available from the DA store. +func TestBackgroundRetryEventuallySucceeds(t *testing.T) { + mn := mocknet.New() + defer mn.Close() + + pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) + require.NoError(t, err) + noopSigner, err := noop.NewNoopSigner(pk) + require.NoError(t, err) + rnd := rand.New(rand.NewSource(1)) // nolint:gosec // test code only + + chainId := "test-chain-id" + proposerAddr := []byte("test") + genesisDoc := genesispkg.Genesis{ + ChainID: chainId, + StartTime: time.Now(), + InitialHeight: 1, + ProposerAddress: proposerAddr, + } + + // Use a shared DA store that we can populate later + mainKV := sync.MutexWrap(datastore.NewMapDatastore()) + rktStore := store.New(mainKV) + + conf := config.DefaultConfig() + conf.RootDir = t.TempDir() + nodeKey, err := key.LoadOrGenNodeKey(filepath.Dir(conf.ConfigPath())) + require.NoError(t, err) + logger := zerolog.Nop() + + h, err := mn.AddPeer(nodeKey.PrivKey, nil) + require.NoError(t, err) + + p2pClient, err := p2p.NewClientWithHost(conf.P2P, nodeKey.PrivKey, mainKV, chainId, logger, p2p.NopMetrics(), h) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + require.NoError(t, p2pClient.Start(ctx)) + t.Cleanup(func() { _ = p2pClient.Close() }) + + // Create the sync service - it has no peers to connect to, so it won't be able to sync via P2P + // But it DOES have a DA store (rktStore) that we can populate + svc, err := NewHeaderSyncService(mainKV, rktStore, conf, genesisDoc, p2pClient, logger) + require.NoError(t, err) + + // Start the service - this will return without starting the syncer because there are no peers + require.NoError(t, svc.Start(ctx)) + t.Cleanup(func() { _ = svc.Stop(context.Background()) }) + + // Verify the syncer hasn't started yet (no peers to get headers from) + require.False(t, svc.syncerStatus.isStarted(), "syncer should not be started without peers") + + // Verify that querying the header store before syncer starts returns an empty store error + _, headErr := svc.Store().Head(ctx) + require.Error(t, headErr, "querying head before syncer starts should return an error") + require.True(t, errors.Is(headErr, header.ErrNotFound) || errors.Is(headErr, header.ErrEmptyStore), + "error should be ErrNotFound or ErrEmptyStore, got: %v", headErr) + + // Create the genesis header that we'll add to the DA store + headerConfig := types.HeaderConfig{ + Height: genesisDoc.InitialHeight, + DataHash: bytesN(rnd, 32), + AppHash: bytesN(rnd, 32), + Signer: noopSigner, + } + signedHeader, err := types.GetRandomSignedHeaderCustom(&headerConfig, genesisDoc.ChainID) + require.NoError(t, err) + require.NoError(t, signedHeader.Validate()) + + // Track background retry completion + var retryCompleted atomic.Bool + + // Manually trigger the background retry (simulating what happens after 2min timeout in initFromP2PWithRetry) + go func() { + svc.retryInitInBackground() + retryCompleted.Store(true) + }() + + // Give the retry a moment to start and fail at least once (no headers available yet) + time.Sleep(100 * time.Millisecond) + + // Now add the header to the DA store - the background retry's next attempt should find it + // via the exchangeWrapper's getterByHeight which checks the DA store first + batch, err := rktStore.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(signedHeader, &types.Data{}, &types.Signature{})) + require.NoError(t, batch.SetHeight(signedHeader.Height())) + require.NoError(t, batch.Commit()) + + // Wait for the background retry to succeed and start the syncer + require.Eventually(t, func() bool { + return svc.syncerStatus.isStarted() + }, 30*time.Second, 100*time.Millisecond, "syncer should eventually start after background retry finds header in DA store") + + // Verify the retry goroutine completed successfully + require.Eventually(t, func() bool { + return retryCompleted.Load() + }, 5*time.Second, 100*time.Millisecond, "background retry goroutine should complete") + + // Verify the store was initialized with the header + head, err := svc.Store().Head(ctx) + require.NoError(t, err) + require.Equal(t, genesisDoc.InitialHeight, head.Height()) +}