Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type Syncer struct {
gracePeriodMultiplier *atomic.Pointer[float64]
blockFullnessEMA *atomic.Pointer[float64]
gracePeriodConfig forcedInclusionGracePeriodConfig
p2pHeightHints map[uint64]uint64 // map[height]daHeight

// Lifecycle
ctx context.Context
Expand Down Expand Up @@ -177,6 +178,7 @@ func NewSyncer(
gracePeriodMultiplier: gracePeriodMultiplier,
blockFullnessEMA: blockFullnessEMA,
gracePeriodConfig: newForcedInclusionGracePeriodConfig(),
p2pHeightHints: make(map[uint64]uint64),
}
}

Expand Down Expand Up @@ -541,6 +543,8 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
Uint64("da_height_hint", daHeightHint).
Msg("P2P event with DA height hint, triggering targeted DA retrieval")

s.p2pHeightHints[height] = daHeightHint

// Trigger targeted DA retrieval in background via worker pool
s.asyncDARetriever.RequestRetrieval(daHeightHint)
}
Expand Down Expand Up @@ -581,7 +585,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
}

// only save to p2p stores if the event came from DA
if event.Source == common.SourceDA { // TODO(@julienrbrt): To be reverted once DA Hints are merged (https://github.com/evstack/ev-node/pull/2891)
if event.Source == common.SourceDA {
g, ctx := errgroup.WithContext(s.ctx)
g.Go(func() error {
// broadcast header locally only — prevents spamming the p2p network with old height notifications,
Expand Down Expand Up @@ -636,13 +640,17 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
}

// Verify forced inclusion transactions if configured
if event.Source == common.SourceDA {
if err := s.verifyForcedInclusionTxs(currentState, data); err != nil {
s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed")
if errors.Is(err, errMaliciousProposer) {
s.cache.RemoveHeaderDAIncluded(headerHash)
return err
}
currentDaHeight, ok := s.p2pHeightHints[nextHeight]
if !ok {
currentDaHeight = currentState.DAHeight
} else {
delete(s.p2pHeightHints, nextHeight)
}
if err := s.verifyForcedInclusionTxs(currentDaHeight, data); err != nil {
s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed")
if errors.Is(err, errMaliciousProposer) {
s.cache.RemoveHeaderDAIncluded(headerHash)
return err
}
Comment on lines +650 to 654
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Failure to verify forced inclusion transactions should not be ignored. Currently, only errMaliciousProposer errors are returned from trySyncNextBlock, while other errors (e.g., transient DA layer connection issues) are only logged. This could lead to the node accepting a block that is invalid because it's missing forced transactions. All errors from verifyForcedInclusionTxs should be returned so the caller can handle them, for instance by retrying the block processing later.

Suggested change
s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed")
if errors.Is(err, errMaliciousProposer) {
s.cache.RemoveHeaderDAIncluded(headerHash)
return err
}
s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed")
if errors.Is(err, errMaliciousProposer) {
s.cache.RemoveHeaderDAIncluded(headerHash)
}
return err

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current behavior is correct. a fetch issue due to the sync node have issues or the da layer down should not crash the node.

}

Expand Down Expand Up @@ -855,7 +863,7 @@ func (s *Syncer) getEffectiveGracePeriod() uint64 {
// Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions
// to future blocks (smoothing). This is legitimate behavior within an epoch.
// However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later).
func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.Data) error {
func (s *Syncer) verifyForcedInclusionTxs(daHeight uint64, data *types.Data) error {
if s.fiRetriever == nil {
return nil
}
Expand All @@ -865,7 +873,7 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.
s.updateDynamicGracePeriod(blockFullness)

// Retrieve forced inclusion transactions from DA for current epoch
forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(s.ctx, currentState.DAHeight)
forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(s.ctx, daHeight)
if err != nil {
if errors.Is(err, da.ErrForceInclusionNotConfigured) {
s.logger.Debug().Msg("forced inclusion namespace not configured, skipping verification")
Expand Down Expand Up @@ -928,10 +936,10 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.
effectiveGracePeriod := s.getEffectiveGracePeriod()
graceBoundary := pending.EpochEnd + (effectiveGracePeriod * s.genesis.DAEpochForcedInclusion)

if currentState.DAHeight > graceBoundary {
if daHeight > graceBoundary {
maliciousTxs = append(maliciousTxs, pending)
s.logger.Warn().
Uint64("current_da_height", currentState.DAHeight).
Uint64("current_da_height", daHeight).
Uint64("epoch_end", pending.EpochEnd).
Uint64("grace_boundary", graceBoundary).
Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod).
Expand All @@ -941,7 +949,7 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.
Msg("forced inclusion transaction past grace boundary - marking as malicious")
} else {
remainingPending = append(remainingPending, pending)
if currentState.DAHeight > pending.EpochEnd {
if daHeight > pending.EpochEnd {
txsInGracePeriod++
}
}
Expand All @@ -965,7 +973,7 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.
effectiveGracePeriod := s.getEffectiveGracePeriod()
s.logger.Error().
Uint64("height", data.Height()).
Uint64("current_da_height", currentState.DAHeight).
Uint64("current_da_height", daHeight).
Int("malicious_count", len(maliciousTxs)).
Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod).
Uint64("effective_grace_periods", effectiveGracePeriod).
Expand All @@ -985,7 +993,7 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.

s.logger.Info().
Uint64("height", data.Height()).
Uint64("da_height", currentState.DAHeight).
Uint64("da_height", daHeight).
Uint64("epoch_start", forcedIncludedTxsEvent.StartDaHeight).
Uint64("epoch_end", forcedIncludedTxsEvent.EndDaHeight).
Int("included_count", includedCount).
Expand Down
Loading
Loading