diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java index 68085b6dbaa..6d3d8cac9bd 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java @@ -43,6 +43,7 @@ import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; import tech.pegasys.teku.statetransition.block.BlockImporter; +import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic; import tech.pegasys.teku.statetransition.execution.ExecutionPayloadManager; import tech.pegasys.teku.statetransition.util.PendingPool; import tech.pegasys.teku.statetransition.validation.signatures.SignatureVerificationService; @@ -75,6 +76,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory { private final PendingPool pendingBlocks; private final PendingPool pendingAttestations; private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool; + private final DasSamplerBasic dasSamplerBasic; private final int getStartupTargetPeerCount; private final AsyncBLSSignatureVerifier signatureVerifier; private final Duration startupTimeout; @@ -98,6 +100,7 @@ public DefaultSyncServiceFactory( final PendingPool pendingBlocks, final PendingPool pendingAttestations, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, + final DasSamplerBasic dasSamplerBasic, final int getStartupTargetPeerCount, final SignatureVerificationService signatureVerifier, final Duration startupTimeout, @@ -119,6 +122,7 @@ public DefaultSyncServiceFactory( this.pendingBlocks = pendingBlocks; this.pendingAttestations = pendingAttestations; this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool; + this.dasSamplerBasic = dasSamplerBasic; this.getStartupTargetPeerCount = getStartupTargetPeerCount; this.signatureVerifier = signatureVerifier; this.startupTimeout = startupTimeout; @@ -141,6 +145,7 @@ public SyncService create(final EventChannels eventChannels) { pendingBlocks, pendingAttestations, blockBlobSidecarsTrackersPool, + dasSamplerBasic, forwardSyncService, fetchTaskFactory); final RecentBlobSidecarsFetcher recentBlobSidecarsFetcher = diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java index 604a945bd87..7f78013f1c2 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java @@ -28,7 +28,12 @@ public class SyncConfig { public static final int DEFAULT_FORWARD_SYNC_BATCH_SIZE = 25; public static final int DEFAULT_FORWARD_SYNC_MAX_PENDING_BATCHES = 5; - /** Aligned with {@link P2PConfig#DEFAULT_PEER_BLOCKS_RATE_LIMIT} */ + /** + * Must be >= FORWARD_SYNC_BATCH_SIZE * FORWARD_SYNC_MAX_PENDING_BATCHES to avoid evicting + * completed trackers before the sync pipeline imports them. + */ + public static final int DEFAULT_MAX_RECENTLY_SAMPLED_BLOCKS = 128; + public static final int DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE = 500; /** Aligned with {@link P2PConfig#DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT} */ @@ -43,6 +48,7 @@ public class SyncConfig { private final int forwardSyncMaxPendingBatches; private final int forwardSyncMaxBlocksPerMinute; private final int forwardSyncMaxBlobSidecarsPerMinute; + private final int maxRecentlySampledBlocks; private final OptionalInt forwardSyncMaxDistanceFromHead; private SyncConfig( @@ -55,6 +61,7 @@ private SyncConfig( final int forwardSyncMaxPendingBatches, final int forwardSyncMaxBlocksPerMinute, final int forwardSyncMaxBlobSidecarsPerMinute, + final int maxRecentlySampledBlocks, final OptionalInt forwardSyncMaxDistanceFromHead) { this.isEnabled = isEnabled; this.isMultiPeerSyncEnabled = isMultiPeerSyncEnabled; @@ -65,6 +72,7 @@ private SyncConfig( this.forwardSyncMaxPendingBatches = forwardSyncMaxPendingBatches; this.forwardSyncMaxBlocksPerMinute = forwardSyncMaxBlocksPerMinute; this.forwardSyncMaxBlobSidecarsPerMinute = forwardSyncMaxBlobSidecarsPerMinute; + this.maxRecentlySampledBlocks = maxRecentlySampledBlocks; this.forwardSyncMaxDistanceFromHead = forwardSyncMaxDistanceFromHead; } @@ -108,6 +116,10 @@ public int getForwardSyncMaxBlobSidecarsPerMinute() { return forwardSyncMaxBlobSidecarsPerMinute; } + public int getMaxRecentlySampledBlocks() { + return maxRecentlySampledBlocks; + } + public OptionalInt getForwardSyncMaxDistanceFromHead() { return forwardSyncMaxDistanceFromHead; } @@ -123,6 +135,7 @@ public static class Builder { private Integer forwardSyncMaxBlocksPerMinute = DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE; private Integer forwardSyncMaxBlobSidecarsPerMinute = DEFAULT_FORWARD_SYNC_MAX_BLOB_SIDECARS_PER_MINUTE; + private Integer maxRecentlySampledBlocks = DEFAULT_MAX_RECENTLY_SAMPLED_BLOCKS; private OptionalInt forwardSyncMaxDistanceFromHead = OptionalInt.empty(); private Builder() {} @@ -139,6 +152,7 @@ public SyncConfig build() { forwardSyncMaxPendingBatches, forwardSyncMaxBlocksPerMinute, forwardSyncMaxBlobSidecarsPerMinute, + maxRecentlySampledBlocks, forwardSyncMaxDistanceFromHead); } @@ -214,6 +228,12 @@ public Builder reconstructHistoricStatesEnabled( return this; } + public Builder maxRecentlySampledBlocks(final Integer maxRecentlySampledBlocks) { + checkNotNull(maxRecentlySampledBlocks); + this.maxRecentlySampledBlocks = maxRecentlySampledBlocks; + return this; + } + public Builder fetchAllHistoricBlocks(final boolean fetchAllHistoricBlocks) { this.fetchAllHistoricBlocks = fetchAllHistoricBlocks; return this; diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java index 6faa4bfbb25..c90904a0f64 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java @@ -26,6 +26,7 @@ import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; +import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic; import tech.pegasys.teku.statetransition.util.PendingPool; public class RecentBlocksFetchService @@ -42,12 +43,14 @@ public class RecentBlocksFetchService private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool; private final FetchTaskFactory fetchTaskFactory; private final Subscribers blockSubscribers = Subscribers.create(true); + private final DasSamplerBasic dasBasicSampler; RecentBlocksFetchService( final AsyncRunner asyncRunner, final PendingPool pendingBlockPool, final PendingPool pendingAttestationsPool, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, + final DasSamplerBasic dasBasicSampler, final ForwardSync forwardSync, final FetchTaskFactory fetchTaskFactory, final int maxConcurrentRequests) { @@ -56,6 +59,7 @@ public class RecentBlocksFetchService this.pendingBlockPool = pendingBlockPool; this.pendingAttestationsPool = pendingAttestationsPool; this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool; + this.dasBasicSampler = dasBasicSampler; this.fetchTaskFactory = fetchTaskFactory; } @@ -64,6 +68,7 @@ public static RecentBlocksFetchService create( final PendingPool pendingBlocksPool, final PendingPool pendingAttestations, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, + final DasSamplerBasic dasBasicSampler, final ForwardSync forwardSync, final FetchTaskFactory fetchTaskFactory) { return new RecentBlocksFetchService( @@ -71,6 +76,7 @@ public static RecentBlocksFetchService create( pendingBlocksPool, pendingAttestations, blockBlobSidecarsTrackersPool, + dasBasicSampler, forwardSync, fetchTaskFactory, MAX_CONCURRENT_REQUESTS); @@ -106,6 +112,10 @@ public void requestRecentBlock(final Bytes32 blockRoot) { // We already have this block, waiting for blobs return; } + if (dasBasicSampler.containsBlock(blockRoot)) { + // We already have this block in DAS sampler + return; + } final FetchBlockTask task = createTask(blockRoot); if (allTasks.putIfAbsent(blockRoot, task) != null) { // We're already tracking this task diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchServiceTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchServiceTest.java index dc067b4d158..1436a178fc4 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchServiceTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchServiceTest.java @@ -43,6 +43,7 @@ import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; +import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic; import tech.pegasys.teku.statetransition.util.PendingPool; public class RecentBlocksFetchServiceTest { @@ -60,6 +61,8 @@ public class RecentBlocksFetchServiceTest { private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool = mock(BlockBlobSidecarsTrackersPool.class); + private final DasSamplerBasic dasSamplerBasic = mock(DasSamplerBasic.class); + private final FetchTaskFactory fetchTaskFactory = mock(FetchTaskFactory.class); private final ForwardSync forwardSync = mock(ForwardSync.class); @@ -81,6 +84,7 @@ public void setup() { pendingBlockPool, pendingAttestationsPool, blockBlobSidecarsTrackersPool, + dasSamplerBasic, forwardSync, fetchTaskFactory, maxConcurrentRequests); diff --git a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java index 80b50457751..37acfd6459c 100644 --- a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java +++ b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java @@ -64,6 +64,7 @@ import tech.pegasys.teku.statetransition.block.BlockImporter; import tech.pegasys.teku.statetransition.block.BlockManager; import tech.pegasys.teku.statetransition.block.ReceivedBlockEventsChannel; +import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic; import tech.pegasys.teku.statetransition.datacolumns.DataAvailabilitySampler; import tech.pegasys.teku.statetransition.execution.ExecutionPayloadManager; import tech.pegasys.teku.statetransition.forkchoice.ForkChoice; @@ -224,6 +225,7 @@ public static SyncingNodeManager create( pendingBlocks, pendingAttestations, BlockBlobSidecarsTrackersPool.NOOP, + DasSamplerBasic.NOOP, syncService, fetchBlockTaskFactory); recentBlocksFetcher.subscribeBlockFetched(blockManager::importBlock); diff --git a/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/ForkChoiceTestExecutor.java b/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/ForkChoiceTestExecutor.java index 97f579d1bc4..96ecb89cc49 100644 --- a/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/ForkChoiceTestExecutor.java +++ b/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/ForkChoiceTestExecutor.java @@ -79,7 +79,7 @@ import tech.pegasys.teku.spec.logic.common.util.AsyncBLSSignatureVerifier; import tech.pegasys.teku.statetransition.datacolumns.CurrentSlotProvider; import tech.pegasys.teku.statetransition.datacolumns.DasCustodyStand; -import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic; +import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasicImpl; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarRecoveringCustody; import tech.pegasys.teku.statetransition.datacolumns.retriever.DataColumnSidecarRetrieverStub; import tech.pegasys.teku.statetransition.forkchoice.ForkChoice; @@ -161,8 +161,8 @@ spec, new SignedBlockAndState(anchorBlock, anchorState)), final StubBlobSidecarManager blobSidecarManager = new StubBlobSidecarManager(kzg); final CurrentSlotProvider currentSlotProvider = CurrentSlotProvider.create(spec, recentChainData.getStore()); - final DasSamplerBasic dasSampler = - new DasSamplerBasic( + final DasSamplerBasicImpl dasSampler = + new DasSamplerBasicImpl( spec, asyncRunnerFactory.create("das", 1), currentSlotProvider, @@ -173,7 +173,9 @@ spec, new SignedBlockAndState(anchorBlock, anchorState)), // and fetching from the config would break when not in fulu DasCustodyStand.createCustodyGroupCountManager(4, 8), recentChainData, - false); + false, + new StubMetricsSystem(), + 64); final StubDataColumnSidecarManager dataColumnSidecarManager = new StubDataColumnSidecarManager(spec, recentChainData, dasSampler); // forkChoiceLateBlockReorgEnabled is true here always because this is the reference test diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DasSamplerBasic.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DasSamplerBasic.java index a6adf9c6e92..e45de9cce4b 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DasSamplerBasic.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DasSamplerBasic.java @@ -13,286 +13,59 @@ package tech.pegasys.teku.statetransition.datacolumns; -import com.google.common.annotations.VisibleForTesting; -import java.time.Duration; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.ethereum.events.SlotEventsChannel; -import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil; -import tech.pegasys.teku.infrastructure.ssz.SszList; import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.SpecMilestone; -import tech.pegasys.teku.spec.config.SpecConfigFulu; -import tech.pegasys.teku.spec.datastructures.blobs.DataColumnSidecar; import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; -import tech.pegasys.teku.spec.logic.versions.fulu.helpers.MiscHelpersFulu; import tech.pegasys.teku.statetransition.blobs.RemoteOrigin; -import tech.pegasys.teku.statetransition.datacolumns.retriever.DataColumnSidecarRetriever; -import tech.pegasys.teku.statetransition.util.RPCFetchDelayProvider; -import tech.pegasys.teku.storage.client.RecentChainData; -public class DasSamplerBasic implements DataAvailabilitySampler, SlotEventsChannel { - private static final Logger LOG = LogManager.getLogger(); +public interface DasSamplerBasic extends DataAvailabilitySampler, SlotEventsChannel { - private final DataColumnSidecarCustody custody; - private final DataColumnSidecarRetriever retriever; + DasSamplerBasic NOOP = + new DasSamplerBasic() { - private final Spec spec; - private final CurrentSlotProvider currentSlotProvider; - private final CustodyGroupCountManager custodyGroupCountManager; - private final Map recentlySampledColumnsByRoot = - new ConcurrentHashMap<>(); + @Override + public void onNewValidatedDataColumnSidecar( + final DataColumnSlotAndIdentifier columnId, final RemoteOrigin remoteOrigin) {} - private final AsyncRunner asyncRunner; - private final RecentChainData recentChainData; - private final RPCFetchDelayProvider rpcFetchDelayProvider; - private final boolean halfColumnsSamplingCompletionEnabled; + @Override + public SafeFuture> checkDataAvailability( + final UInt64 slot, final Bytes32 blockRoot) { + return SafeFuture.completedFuture(List.of()); + } - public DasSamplerBasic( - final Spec spec, - final AsyncRunner asyncRunner, - final CurrentSlotProvider currentSlotProvider, - final RPCFetchDelayProvider rpcFetchDelayProvider, - final DataColumnSidecarCustody custody, - final DataColumnSidecarRetriever retriever, - final CustodyGroupCountManager custodyGroupCountManager, - final RecentChainData recentChainData, - final boolean halfColumnsSamplingCompletionEnabled) { - this.currentSlotProvider = currentSlotProvider; - this.rpcFetchDelayProvider = rpcFetchDelayProvider; - this.spec = spec; - this.asyncRunner = asyncRunner; - this.custody = custody; - this.retriever = retriever; - this.custodyGroupCountManager = custodyGroupCountManager; - this.recentChainData = recentChainData; - this.halfColumnsSamplingCompletionEnabled = halfColumnsSamplingCompletionEnabled; - } + @Override + public SamplingEligibilityStatus checkSamplingEligibility(final BeaconBlock block) { + return SamplingEligibilityStatus.NOT_REQUIRED_NO_BLOBS; + } - @VisibleForTesting - Map getRecentlySampledColumnsByRoot() { - return recentlySampledColumnsByRoot; - } + @Override + public boolean containsBlock(final Bytes32 blockRoot) { + return false; + } - /** - * When syncing or backfilling always make sure to call this method with known DataColumn *before* - * calling {@link DasSamplerBasic#checkDataAvailability(UInt64, Bytes32)} so that RPC fetch won't - * be executed on those columns. - */ - @Override - public void onNewValidatedDataColumnSidecar( - final DataColumnSlotAndIdentifier columnId, final RemoteOrigin remoteOrigin) { - LOG.debug("Sampler received data column {} - origin: {}", columnId, remoteOrigin); + @Override + public void onSlot(final UInt64 slot) {} - getOrCreateTracker(columnId.slot(), columnId.blockRoot()).add(columnId, remoteOrigin); - } + @Override + public void flush() {} - @Override - public SafeFuture> checkDataAvailability( - final UInt64 slot, final Bytes32 blockRoot) { - final DataColumnSamplingTracker tracker = getOrCreateTracker(slot, blockRoot); + @Override + public void onNewBlock( + final SignedBeaconBlock block, final Optional remoteOrigin) {} - if (tracker.completionFuture().isDone()) { - return tracker.completionFuture(); - } + @Override + public void removeAllForBlock(final SlotAndBlockRoot slotAndBlockRoot) {} - if (tracker.rpcFetchInProgress().compareAndSet(false, true)) { - fetchMissingColumnsViaRPC(slot, blockRoot, tracker); - } + @Override + public void enableBlockImportOnCompletion(final SignedBeaconBlock block) {} + }; - return tracker.completionFuture(); - } - - private void onFirstSeen( - final UInt64 slot, final Bytes32 blockRoot, final DataColumnSamplingTracker tracker) { - final Duration delay = rpcFetchDelayProvider.calculate(slot); - if (delay.isZero()) { - // in case of immediate RPC fetch, let's postpone the actual fetch when checkDataAvailability - // is called. - // this is needed because 0 delay means we are syncing\backfilling this slot, so we want to - // wait eventual known columns to be added via onAlreadyKnownDataColumn before fetching. - return; - } - tracker.rpcFetchInProgress().set(true); - asyncRunner - .getDelayedFuture(delay) - .always(() -> fetchMissingColumnsViaRPC(slot, blockRoot, tracker)); - } - - private void fetchMissingColumnsViaRPC( - final UInt64 slot, final Bytes32 blockRoot, final DataColumnSamplingTracker tracker) { - final List missingColumns = tracker.getMissingColumnIdentifiers(); - LOG.debug( - "checkDataAvailability(): missing columns for slot {} root {}: {}", - slot, - blockRoot, - missingColumns.size()); - - SafeFuture.collectAll( - missingColumns.stream().map(id -> retrieveColumnWithSamplingAndCustody(id, tracker))) - .thenAccept( - retrievedColumns -> { - if (retrievedColumns.size() == missingColumns.size()) { - LOG.debug( - "checkDataAvailability(): retrieved remaining {} (of {}) columns via Req/Resp for block {} ({})", - retrievedColumns.size(), - tracker.samplingRequirement().size(), - slot, - blockRoot); - } else { - throw new IllegalStateException( - String.format( - "Retrieved only(%d) out of %d missing columns for slot %s (%s) with %d required columns", - retrievedColumns.size(), - missingColumns.size(), - slot, - blockRoot, - tracker.samplingRequirement().size())); - } - }) - // let's reset the fetched flag so that this tracker can reissue RPC requests on DA check - // retry - .alwaysRun(() -> tracker.rpcFetchInProgress().set(false)) - .finish( - throwable -> { - if (ExceptionUtil.hasCause(throwable, CancellationException.class)) { - final String error = throwable.getMessage(); - LOG.debug( - "CancellationException in checkDataAvailability: {}", - () -> error == null ? "" : error); - - } else { - LOG.error("data availability check failed", throwable); - } - }); - } - - private DataColumnSamplingTracker getOrCreateTracker(final UInt64 slot, final Bytes32 blockRoot) { - return recentlySampledColumnsByRoot.computeIfAbsent( - blockRoot, - k -> { - final DataColumnSamplingTracker tracker = - DataColumnSamplingTracker.create( - slot, - blockRoot, - custodyGroupCountManager, - halfColumnsSamplingCompletionEnabled - ? Optional.of( - SpecConfigFulu.required(spec.atSlot(slot).getConfig()) - .getNumberOfColumns() - / 2) - : Optional.empty()); - onFirstSeen(slot, blockRoot, tracker); - return tracker; - }); - } - - private SafeFuture retrieveColumnWithSamplingAndCustody( - final DataColumnSlotAndIdentifier id, final DataColumnSamplingTracker tracker) { - return retriever - .retrieve(id) - .thenPeek( - sidecar -> { - if (tracker.add(id, RemoteOrigin.RPC)) { - // send to custody only if it was added to the tracker - // (i.e. not received from other sources in the meantime) - custody.onNewValidatedDataColumnSidecar(sidecar, RemoteOrigin.RPC).finishError(LOG); - } - }); - } - - @Override - public void flush() { - retriever.flush(); - } - - private boolean hasBlobs(final BeaconBlock block) { - return !block.getBody().getOptionalBlobKzgCommitments().map(SszList::isEmpty).orElse(true); - } - - private boolean isInCustodyPeriod(final BeaconBlock block) { - final MiscHelpersFulu miscHelpersFulu = - MiscHelpersFulu.required(spec.atSlot(block.getSlot()).miscHelpers()); - final UInt64 currentEpoch = spec.computeEpochAtSlot(currentSlotProvider.getCurrentSlot()); - return miscHelpersFulu.isAvailabilityOfDataColumnSidecarsRequiredAtEpoch( - currentEpoch, spec.computeEpochAtSlot(block.getSlot())); - } - - @Override - public SamplingEligibilityStatus checkSamplingEligibility(final BeaconBlock block) { - if (!spec.atSlot(block.getSlot()).getMilestone().isGreaterThanOrEqualTo(SpecMilestone.FULU)) { - return SamplingEligibilityStatus.NOT_REQUIRED_BEFORE_FULU; - } else if (!isInCustodyPeriod(block)) { - return SamplingEligibilityStatus.NOT_REQUIRED_OLD_EPOCH; - } else if (!hasBlobs(block)) { - return SamplingEligibilityStatus.NOT_REQUIRED_NO_BLOBS; - } else { - return SamplingEligibilityStatus.REQUIRED; - } - } - - @Override - public void onSlot(final UInt64 slot) { - final UInt64 firstNonFinalizedSlot = - spec.computeStartSlotAtEpoch(recentChainData.getFinalizedEpoch()).increment(); - recentlySampledColumnsByRoot - .values() - .removeIf( - tracker -> { - if (tracker.slot().isLessThan(firstNonFinalizedSlot) - || recentChainData.containsBlock(tracker.blockRoot())) { - // Outdated - if (!tracker.completionFuture().isDone()) { - // make sure the future releases any pending waiters - tracker - .completionFuture() - .completeExceptionally( - new RuntimeException("DAS sampling expired while slot finalized")); - // Slot less than finalized slot, but we didn't complete DA check, means it's - // probably orphaned block with data never available - we must prune this - // RecentChainData contains block, but we are here - shouldn't happen - return true; - } - // cleanup only if fully sampled - return tracker.fullySampled().get(); - } - - return false; - }); - } - - @Override - public void onNewBlock(final SignedBeaconBlock block, final Optional remoteOrigin) { - LOG.debug("Sampler received block {} - origin: {}", block.getSlotAndBlockRoot(), remoteOrigin); - if (hasBlobs(block.getMessage())) { - getOrCreateTracker(block.getSlot(), block.getRoot()); - } - } - - @Override - public void removeAllForBlock(final SlotAndBlockRoot slotAndBlockRoot) { - final DataColumnSamplingTracker removed = - recentlySampledColumnsByRoot.remove(slotAndBlockRoot.getBlockRoot()); - if (removed != null) { - removed.completionFuture().cancel(true); - LOG.debug("Removed data column sampling tracker {}", removed); - } - } - - @Override - public void enableBlockImportOnCompletion(final SignedBeaconBlock block) { - // nothing to do - } + boolean containsBlock(Bytes32 blockRoot); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DasSamplerBasicImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DasSamplerBasicImpl.java new file mode 100644 index 00000000000..e003701e71f --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DasSamplerBasicImpl.java @@ -0,0 +1,362 @@ +/* + * Copyright Consensys Software Inc., 2026 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.datacolumns; + +import static tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory.BEACON; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CancellationException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil; +import tech.pegasys.teku.infrastructure.ssz.SszList; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.config.SpecConfigFulu; +import tech.pegasys.teku.spec.datastructures.blobs.DataColumnSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; +import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; +import tech.pegasys.teku.spec.logic.versions.fulu.helpers.MiscHelpersFulu; +import tech.pegasys.teku.statetransition.blobs.RemoteOrigin; +import tech.pegasys.teku.statetransition.datacolumns.retriever.DataColumnSidecarRetriever; +import tech.pegasys.teku.statetransition.util.RPCFetchDelayProvider; +import tech.pegasys.teku.storage.client.RecentChainData; + +public class DasSamplerBasicImpl implements DasSamplerBasic { + private static final Logger LOG = LogManager.getLogger(); + + private final DataColumnSidecarCustody custody; + private final DataColumnSidecarRetriever retriever; + + private final Spec spec; + private final CurrentSlotProvider currentSlotProvider; + private final CustodyGroupCountManager custodyGroupCountManager; + private final int maxRecentlySampledBlocks; + private final Map recentlySampledColumnsByRoot; + + private final AsyncRunner asyncRunner; + private final RecentChainData recentChainData; + private final RPCFetchDelayProvider rpcFetchDelayProvider; + private final boolean halfColumnsSamplingCompletionEnabled; + + public DasSamplerBasicImpl( + final Spec spec, + final AsyncRunner asyncRunner, + final CurrentSlotProvider currentSlotProvider, + final RPCFetchDelayProvider rpcFetchDelayProvider, + final DataColumnSidecarCustody custody, + final DataColumnSidecarRetriever retriever, + final CustodyGroupCountManager custodyGroupCountManager, + final RecentChainData recentChainData, + final boolean halfColumnsSamplingCompletionEnabled, + final MetricsSystem metricsSystem, + final int maxRecentlySampledBlocks) { + this.currentSlotProvider = currentSlotProvider; + this.rpcFetchDelayProvider = rpcFetchDelayProvider; + this.spec = spec; + this.asyncRunner = asyncRunner; + this.custody = custody; + this.retriever = retriever; + this.custodyGroupCountManager = custodyGroupCountManager; + this.recentChainData = recentChainData; + this.halfColumnsSamplingCompletionEnabled = halfColumnsSamplingCompletionEnabled; + this.maxRecentlySampledBlocks = maxRecentlySampledBlocks; + this.recentlySampledColumnsByRoot = new LinkedHashMap<>(maxRecentlySampledBlocks); + metricsSystem.createGauge( + BEACON, + "das_recently_sampled_blocks_size", + "DAS recently sampled blocks size", + recentlySampledColumnsByRoot::size + ); + } + + @VisibleForTesting + Map getRecentlySampledColumnsByRoot() { + return recentlySampledColumnsByRoot; + } + + @Override + public synchronized boolean containsBlock(final Bytes32 blockRoot) { + return recentlySampledColumnsByRoot.containsKey(blockRoot); + } + + /** + * When syncing or backfilling always make sure to call this method with known DataColumn *before* + * calling {@link DasSamplerBasic#checkDataAvailability(UInt64, Bytes32)} so that RPC fetch won't + * be executed on those columns. + */ + @Override + public void onNewValidatedDataColumnSidecar( + final DataColumnSlotAndIdentifier columnId, final RemoteOrigin remoteOrigin) { + LOG.debug("Sampler received data column {} - origin: {}", columnId, remoteOrigin); + + getOrCreateTracker(columnId.slot(), columnId.blockRoot()).add(columnId, remoteOrigin); + } + + @Override + public SafeFuture> checkDataAvailability( + final UInt64 slot, final Bytes32 blockRoot) { + final DataColumnSamplingTracker tracker = getOrCreateTracker(slot, blockRoot); + + if (tracker.completionFuture().isDone()) { + return tracker.completionFuture(); + } + + if (tracker.rpcFetchInProgress().compareAndSet(false, true)) { + fetchMissingColumnsViaRPC(slot, blockRoot, tracker); + } + + return tracker.completionFuture(); + } + + private void onFirstSeen( + final UInt64 slot, final Bytes32 blockRoot, final DataColumnSamplingTracker tracker) { + final Duration delay = rpcFetchDelayProvider.calculate(slot); + if (delay.isZero()) { + // in case of immediate RPC fetch, let's postpone the actual fetch when checkDataAvailability + // is called. + // this is needed because 0 delay means we are syncing\backfilling this slot, so we want to + // wait eventual known columns to be added via onAlreadyKnownDataColumn before fetching. + return; + } + tracker.rpcFetchInProgress().set(true); + asyncRunner + .getDelayedFuture(delay) + .always(() -> fetchMissingColumnsViaRPC(slot, blockRoot, tracker)); + } + + private void fetchMissingColumnsViaRPC( + final UInt64 slot, final Bytes32 blockRoot, final DataColumnSamplingTracker tracker) { + final List missingColumns = tracker.getMissingColumnIdentifiers(); + LOG.debug( + "checkDataAvailability(): missing columns for slot {} root {}: {}", + slot, + blockRoot, + missingColumns.size()); + + SafeFuture.collectAll( + missingColumns.stream().map(id -> retrieveColumnWithSamplingAndCustody(id, tracker))) + .thenAccept( + retrievedColumns -> { + if (retrievedColumns.size() == missingColumns.size()) { + LOG.debug( + "checkDataAvailability(): retrieved remaining {} (of {}) columns via Req/Resp for block {} ({})", + retrievedColumns.size(), + tracker.samplingRequirement().size(), + slot, + blockRoot); + } else { + throw new IllegalStateException( + String.format( + "Retrieved only(%d) out of %d missing columns for slot %s (%s) with %d required columns", + retrievedColumns.size(), + missingColumns.size(), + slot, + blockRoot, + tracker.samplingRequirement().size())); + } + }) + // let's reset the fetched flag so that this tracker can reissue RPC requests on DA check + // retry + .alwaysRun(() -> tracker.rpcFetchInProgress().set(false)) + .finish( + throwable -> { + if (ExceptionUtil.hasCause(throwable, CancellationException.class)) { + final String error = throwable.getMessage(); + LOG.debug( + "CancellationException in checkDataAvailability: {}", + () -> error == null ? "" : error); + + } else { + LOG.error("data availability check failed", throwable); + } + }); + } + + private DataColumnSamplingTracker getOrCreateTracker(final UInt64 slot, final Bytes32 blockRoot) { + final DataColumnSamplingTracker tracker; + final boolean created; + synchronized (this) { + created = !recentlySampledColumnsByRoot.containsKey(blockRoot); + if (created) { + makeRoomForNewTracker(); + } + tracker = + recentlySampledColumnsByRoot.computeIfAbsent( + blockRoot, + k -> + DataColumnSamplingTracker.create( + slot, + blockRoot, + custodyGroupCountManager, + halfColumnsSamplingCompletionEnabled + ? Optional.of( + SpecConfigFulu.required(spec.atSlot(slot).getConfig()) + .getNumberOfColumns() + / 2) + : Optional.empty())); + } + if (created) { + onFirstSeen(slot, blockRoot, tracker); + } + return tracker; + } + + private void makeRoomForNewTracker() { + if (recentlySampledColumnsByRoot.size() < maxRecentlySampledBlocks) { + return; + } + // First pass: evict only completed trackers (oldest first via LinkedHashMap insertion order). + final Iterator> it = + recentlySampledColumnsByRoot.entrySet().iterator(); + while (it.hasNext() && recentlySampledColumnsByRoot.size() >= maxRecentlySampledBlocks) { + final DataColumnSamplingTracker tracker = it.next().getValue(); + if (tracker.completionFuture().isDone()) { + it.remove(); + } + } + // Hard cap: if we're at 4x the limit even after evicting completed trackers, + // force-evict the oldest incomplete ones to prevent unbounded growth. + final int hardLimit = maxRecentlySampledBlocks * 4; + while (recentlySampledColumnsByRoot.size() >= hardLimit) { + final Iterator> forceIt = + recentlySampledColumnsByRoot.entrySet().iterator(); + if (!forceIt.hasNext()) { + break; + } + final DataColumnSamplingTracker tracker = forceIt.next().getValue(); + forceIt.remove(); + if (!tracker.completionFuture().isDone()) { + LOG.warn( + "Force-evicting incomplete DAS tracker for slot {} root {} (hard cap reached)", + tracker.slot(), + tracker.blockRoot()); + tracker + .completionFuture() + .completeExceptionally(new RuntimeException("DAS sampling expired (hard cap)")); + } + } + } + + private SafeFuture retrieveColumnWithSamplingAndCustody( + final DataColumnSlotAndIdentifier id, final DataColumnSamplingTracker tracker) { + return retriever + .retrieve(id) + .thenPeek( + sidecar -> { + if (tracker.add(id, RemoteOrigin.RPC)) { + // send to custody only if it was added to the tracker + // (i.e. not received from other sources in the meantime) + custody.onNewValidatedDataColumnSidecar(sidecar, RemoteOrigin.RPC).finishError(LOG); + } + }); + } + + @Override + public void flush() { + retriever.flush(); + } + + private boolean hasBlobs(final BeaconBlock block) { + return !block.getBody().getOptionalBlobKzgCommitments().map(SszList::isEmpty).orElse(true); + } + + private boolean isInCustodyPeriod(final BeaconBlock block) { + final MiscHelpersFulu miscHelpersFulu = + MiscHelpersFulu.required(spec.atSlot(block.getSlot()).miscHelpers()); + final UInt64 currentEpoch = spec.computeEpochAtSlot(currentSlotProvider.getCurrentSlot()); + return miscHelpersFulu.isAvailabilityOfDataColumnSidecarsRequiredAtEpoch( + currentEpoch, spec.computeEpochAtSlot(block.getSlot())); + } + + @Override + public SamplingEligibilityStatus checkSamplingEligibility(final BeaconBlock block) { + if (!spec.atSlot(block.getSlot()).getMilestone().isGreaterThanOrEqualTo(SpecMilestone.FULU)) { + return SamplingEligibilityStatus.NOT_REQUIRED_BEFORE_FULU; + } else if (!isInCustodyPeriod(block)) { + return SamplingEligibilityStatus.NOT_REQUIRED_OLD_EPOCH; + } else if (!hasBlobs(block)) { + return SamplingEligibilityStatus.NOT_REQUIRED_NO_BLOBS; + } else { + return SamplingEligibilityStatus.REQUIRED; + } + } + + @Override + public void onSlot(final UInt64 slot) { + final UInt64 firstNonFinalizedSlot = + spec.computeStartSlotAtEpoch(recentChainData.getFinalizedEpoch()).increment(); + synchronized (this) { + recentlySampledColumnsByRoot + .values() + .removeIf( + tracker -> { + if (tracker.slot().isLessThan(firstNonFinalizedSlot) + || recentChainData.containsBlock(tracker.blockRoot())) { + // Outdated + if (!tracker.completionFuture().isDone()) { + // make sure the future releases any pending waiters + tracker + .completionFuture() + .completeExceptionally( + new RuntimeException("DAS sampling expired while slot finalized")); + // Slot less than finalized slot, but we didn't complete DA check, means it's + // probably orphaned block with data never available - we must prune this + // RecentChainData contains block, but we are here - shouldn't happen + return true; + } + // cleanup only if fully sampled + return tracker.fullySampled().get(); + } + return false; + }); + } + } + + @Override + public void onNewBlock(final SignedBeaconBlock block, final Optional remoteOrigin) { + LOG.debug("Sampler received block {} - origin: {}", block.getSlotAndBlockRoot(), remoteOrigin); + if (hasBlobs(block.getMessage())) { + getOrCreateTracker(block.getSlot(), block.getRoot()); + } + } + + @Override + public synchronized void removeAllForBlock(final SlotAndBlockRoot slotAndBlockRoot) { + final DataColumnSamplingTracker removed = + recentlySampledColumnsByRoot.remove(slotAndBlockRoot.getBlockRoot()); + if (removed != null) { + removed.completionFuture().cancel(true); + LOG.debug("Removed data column sampling tracker {}", removed); + } + } + + @Override + public void enableBlockImportOnCompletion(final SignedBeaconBlock block) { + // nothing to do + } +} diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasSamplerBasicTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasSamplerBasicTest.java index 4ba62b90e36..a85547c0806 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasSamplerBasicTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasSamplerBasicTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; +import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; import tech.pegasys.teku.infrastructure.time.StubTimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; @@ -73,7 +74,7 @@ public class DasSamplerBasicTest { private CurrentSlotProvider currentSlotProvider; private final DataStructureUtil dataStructureUtil = new DataStructureUtil(0, SPEC); - private DasSamplerBasic sampler; + private DasSamplerBasicImpl sampler; @BeforeEach public void setUp() { @@ -91,7 +92,7 @@ public void setUp() { when(custody.onNewValidatedDataColumnSidecar(any(), any())).thenReturn(SafeFuture.COMPLETE); sampler = - new DasSamplerBasic( + new DasSamplerBasicImpl( SPEC, asyncRunner, currentSlotProvider, @@ -100,7 +101,9 @@ public void setUp() { retriever, custodyGroupCountManager, recentChainData, - true); + true, + new StubMetricsSystem(), + 64); } @Test diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index a43b047d0f6..94ec76772c0 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -60,6 +60,7 @@ import tech.pegasys.teku.beacon.sync.gossip.executionpayloads.RecentExecutionPayloadsFetcher; import tech.pegasys.teku.beaconrestapi.BeaconRestApi; import tech.pegasys.teku.beaconrestapi.JsonTypeDefinitionBeaconRestApi; +import tech.pegasys.teku.dataproviders.lookup.SingleBlockProvider; import tech.pegasys.teku.ethereum.events.ExecutionClientEventsChannel; import tech.pegasys.teku.ethereum.events.SlotEventsChannel; import tech.pegasys.teku.ethereum.execution.types.Eth1Address; @@ -171,6 +172,7 @@ import tech.pegasys.teku.statetransition.datacolumns.DasCustodySync; import tech.pegasys.teku.statetransition.datacolumns.DasPreSampler; import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic; +import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasicImpl; import tech.pegasys.teku.statetransition.datacolumns.DasSamplerManager; import tech.pegasys.teku.statetransition.datacolumns.DataAvailabilitySampler; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarArchiveReconstructor; @@ -399,6 +401,7 @@ public class BeaconChainController extends Service implements BeaconChainControl Optional.empty(); protected volatile Optional simpleSidecarRetriever = Optional.empty(); protected volatile AvailabilityCheckerFactory dasSamplerManager; + protected volatile DasSamplerBasic dasSamplerBasic = DasSamplerBasic.NOOP; protected volatile DataAvailabilitySampler dataAvailabilitySampler; protected volatile Optional terminalPowBlockMonitor = Optional.empty(); protected volatile ProposersDataManager proposersDataManager; @@ -596,6 +599,15 @@ protected SafeFuture initialize() { final ValidatorIsConnectedProvider validatorIsConnectedProvider = new ValidatorIsConnectedProviderReference(() -> proposersDataManager); + + final SingleBlockProvider singleBlockProviderResolver = + new SingleBlockProviderResolver( + (blockRoot) -> blockBlobSidecarsTrackersPool.getBlock(blockRoot), + (blockRoot) -> + dasSamplerBasic.containsBlock(blockRoot) + ? blockBlobSidecarsTrackersPool.getBlock(blockRoot) + : Optional.empty()); + // Init other services return initWeakSubjectivity(storageQueryChannel, storageUpdateChannel) .thenCompose( @@ -604,7 +616,7 @@ protected SafeFuture initialize() { metricsSystem, storeConfig, beaconAsyncRunner, - (blockRoot) -> blockBlobSidecarsTrackersPool.getBlock(blockRoot), + singleBlockProviderResolver, (blockRoot, index) -> blockBlobSidecarsTrackersPool.getBlobSidecar(blockRoot, index), storageQueryChannel, @@ -1097,8 +1109,8 @@ protected void initDasCustody() { DEFAULT_MIN_WAIT_MILLIS, DEFAULT_TARGET_WAIT_MILLIS); - final DasSamplerBasic dasSampler = - new DasSamplerBasic( + final DasSamplerBasicImpl dasSampler = + new DasSamplerBasicImpl( spec, beaconAsyncRunner, currentSlotProvider, @@ -1107,12 +1119,15 @@ protected void initDasCustody() { recoveringSidecarRetriever, custodyGroupCountManager, recentChainData, - beaconConfig.p2pConfig().isColumnsDataAvailabilityHalfCheckEnabled()); + beaconConfig.p2pConfig().isColumnsDataAvailabilityHalfCheckEnabled(), + metricsSystem, + beaconConfig.syncConfig().getMaxRecentlySampledBlocks()); LOG.info( "DAS Basic Sampler initialized with {} groups to sample", custodyGroupCountManager.getSamplingGroupCount()); eventChannels.subscribe(SlotEventsChannel.class, dasSampler); + this.dasSamplerBasic = dasSampler; this.dataAvailabilitySampler = dasSampler; this.recoveringSidecarRetriever = Optional.of(recoveringSidecarRetriever); } @@ -2216,6 +2231,7 @@ protected SyncServiceFactory createSyncServiceFactory() { pendingBlocks, pendingAttestations, blockBlobSidecarsTrackersPool, + dasSamplerBasic, beaconConfig.eth2NetworkConfig().getStartupTargetPeerCount(), signatureVerificationService, Duration.ofSeconds(beaconConfig.eth2NetworkConfig().getStartupTimeoutSeconds()), diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SingleBlockProviderResolver.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SingleBlockProviderResolver.java new file mode 100644 index 00000000000..b26c6c4141d --- /dev/null +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SingleBlockProviderResolver.java @@ -0,0 +1,36 @@ +/* + * Copyright Consensys Software Inc., 2026 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.services.beaconchain; + +import java.util.Optional; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.dataproviders.lookup.SingleBlockProvider; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; + +public class SingleBlockProviderResolver implements SingleBlockProvider { + + private final SingleBlockProvider blockProviderFallback; + private final SingleBlockProvider blockProvider; + + public SingleBlockProviderResolver( + final SingleBlockProvider blockProviderFallback, final SingleBlockProvider blockProvider) { + this.blockProviderFallback = blockProviderFallback; + this.blockProvider = blockProvider; + } + + @Override + public Optional getBlock(final Bytes32 blockRoot) { + return blockProvider.getBlock(blockRoot).or(() -> blockProviderFallback.getBlock(blockRoot)); + } +} diff --git a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java index 1accd99a909..b4554af84a2 100644 --- a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java +++ b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java @@ -321,6 +321,15 @@ The network interface(s) on which the node listens for P2P communication. arity = "1") private Integer forwardSyncMaxDistanceFromHead; + @Option( + names = {"--Xp2p-max-recently-sampled-blocks"}, + paramLabel = "", + showDefaultValue = Visibility.ALWAYS, + description = "Maximum number recent blocks we should sample when syncing.", + hidden = true, + arity = "1") + private Integer maxRecentlySampledBlocks = SyncConfig.DEFAULT_MAX_RECENTLY_SAMPLED_BLOCKS; + @Option( names = {"--Xp2p-sync-blob-sidecars-rate-limit"}, paramLabel = "", @@ -795,7 +804,8 @@ public void configure(final TekuConfiguration.Builder builder) { .forwardSyncMaxBlobSidecarsPerMinute(forwardSyncBlobSidecarsRateLimit) .forwardSyncBatchSize(forwardSyncBatchSize) .forwardSyncMaxPendingBatches(forwardSyncMaxPendingBatches) - .forwardSyncMaxDistanceFromHead(forwardSyncMaxDistanceFromHead)); + .forwardSyncMaxDistanceFromHead(forwardSyncMaxDistanceFromHead) + .maxRecentlySampledBlocks(maxRecentlySampledBlocks)); if (subscribeAllSubnetsEnabled) { builder