Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
811317a
single-block-provider-fulu with soft and hard limit for sampled blocks
gfukushima Mar 26, 2026
cf749fe
spotless
gfukushima Mar 26, 2026
6295275
Merge branch 'master' into single-block-provider-fulu
gfukushima Mar 26, 2026
9d7c3c6
cursor review
gfukushima Mar 30, 2026
914d7bd
Merge branch 'master' into single-block-provider-fulu
gfukushima Mar 30, 2026
d8f21bf
add flag to modify default number of blocks to be sampled
gfukushima Mar 30, 2026
bad9e31
spotless
gfukushima Mar 30, 2026
ca49195
change config to minimum of 128 sampled blocks as this is what curren…
gfukushima Mar 31, 2026
5daa841
spotless
gfukushima Mar 31, 2026
bbde4b2
Merge branch 'master' into single-block-provider-fulu
gfukushima Mar 31, 2026
39304b1
added sampler back to recent block fetcher service
gfukushima Apr 9, 2026
0becf16
fix constructor in test class
gfukushima Apr 13, 2026
ba1926a
comments
gfukushima Apr 17, 2026
c05a6e2
revert metric change
gfukushima Apr 17, 2026
50e41fb
Merge branch 'master' into single-block-provider-fulu
gfukushima Apr 20, 2026
6b055b2
change back to store the block DataColumnSamplingTracker
gfukushima Apr 21, 2026
0af9b80
Merge branch 'master' into single-block-provider-fulu
gfukushima Apr 21, 2026
f424ca9
spotless
gfukushima Apr 21, 2026
e8fe9c2
add timestamp to tracker to use that when making room for new trackers
gfukushima Apr 21, 2026
5f5701d
spotless
gfukushima Apr 21, 2026
ccba0fc
Merge branch 'master' into single-block-provider-fulu
gfukushima May 4, 2026
056e068
Merge branch 'master' into single-block-provider-fulu
gfukushima May 5, 2026
79f7f1e
add comment and rename variable
gfukushima May 5, 2026
a0e55f9
Merge remote-tracking branch 'origin/single-block-provider-fulu' into…
gfukushima May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic;
import tech.pegasys.teku.statetransition.execution.ExecutionPayloadManager;
import tech.pegasys.teku.statetransition.util.PendingPool;
import tech.pegasys.teku.statetransition.validation.signatures.SignatureVerificationService;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory {
private final PendingPool<SignedBeaconBlock> pendingBlocks;
private final PendingPool<ValidatableAttestation> pendingAttestations;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final DasSamplerBasic dasSamplerBasic;
private final int getStartupTargetPeerCount;
private final AsyncBLSSignatureVerifier signatureVerifier;
private final Duration startupTimeout;
Expand All @@ -98,6 +100,7 @@ public DefaultSyncServiceFactory(
final PendingPool<SignedBeaconBlock> pendingBlocks,
final PendingPool<ValidatableAttestation> pendingAttestations,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final DasSamplerBasic dasSamplerBasic,
final int getStartupTargetPeerCount,
final SignatureVerificationService signatureVerifier,
final Duration startupTimeout,
Expand All @@ -119,6 +122,7 @@ public DefaultSyncServiceFactory(
this.pendingBlocks = pendingBlocks;
this.pendingAttestations = pendingAttestations;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.dasSamplerBasic = dasSamplerBasic;
this.getStartupTargetPeerCount = getStartupTargetPeerCount;
this.signatureVerifier = signatureVerifier;
this.startupTimeout = startupTimeout;
Expand All @@ -141,6 +145,7 @@ public SyncService create(final EventChannels eventChannels) {
pendingBlocks,
pendingAttestations,
blockBlobSidecarsTrackersPool,
dasSamplerBasic,
forwardSyncService,
fetchTaskFactory);
final RecentBlobSidecarsFetcher recentBlobSidecarsFetcher =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ public class SyncConfig {
public static final int DEFAULT_FORWARD_SYNC_BATCH_SIZE = 25;
public static final int DEFAULT_FORWARD_SYNC_MAX_PENDING_BATCHES = 5;

/** Aligned with {@link P2PConfig#DEFAULT_PEER_BLOCKS_RATE_LIMIT} */
/**
* Must be >= FORWARD_SYNC_BATCH_SIZE * FORWARD_SYNC_MAX_PENDING_BATCHES to avoid evicting
* completed trackers before the sync pipeline imports them.
*/
public static final int DEFAULT_MAX_RECENTLY_SAMPLED_BLOCKS = 128;

public static final int DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE = 500;

/** Aligned with {@link P2PConfig#DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT} */
Expand All @@ -43,6 +48,7 @@ public class SyncConfig {
private final int forwardSyncMaxPendingBatches;
private final int forwardSyncMaxBlocksPerMinute;
private final int forwardSyncMaxBlobSidecarsPerMinute;
private final int maxRecentlySampledBlocks;
private final OptionalInt forwardSyncMaxDistanceFromHead;

private SyncConfig(
Expand All @@ -55,6 +61,7 @@ private SyncConfig(
final int forwardSyncMaxPendingBatches,
final int forwardSyncMaxBlocksPerMinute,
final int forwardSyncMaxBlobSidecarsPerMinute,
final int maxRecentlySampledBlocks,
final OptionalInt forwardSyncMaxDistanceFromHead) {
this.isEnabled = isEnabled;
this.isMultiPeerSyncEnabled = isMultiPeerSyncEnabled;
Expand All @@ -65,6 +72,7 @@ private SyncConfig(
this.forwardSyncMaxPendingBatches = forwardSyncMaxPendingBatches;
this.forwardSyncMaxBlocksPerMinute = forwardSyncMaxBlocksPerMinute;
this.forwardSyncMaxBlobSidecarsPerMinute = forwardSyncMaxBlobSidecarsPerMinute;
this.maxRecentlySampledBlocks = maxRecentlySampledBlocks;
this.forwardSyncMaxDistanceFromHead = forwardSyncMaxDistanceFromHead;
}

Expand Down Expand Up @@ -108,6 +116,10 @@ public int getForwardSyncMaxBlobSidecarsPerMinute() {
return forwardSyncMaxBlobSidecarsPerMinute;
}

public int getMaxRecentlySampledBlocks() {
return maxRecentlySampledBlocks;
}

public OptionalInt getForwardSyncMaxDistanceFromHead() {
return forwardSyncMaxDistanceFromHead;
}
Expand All @@ -123,6 +135,7 @@ public static class Builder {
private Integer forwardSyncMaxBlocksPerMinute = DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE;
private Integer forwardSyncMaxBlobSidecarsPerMinute =
DEFAULT_FORWARD_SYNC_MAX_BLOB_SIDECARS_PER_MINUTE;
private Integer maxRecentlySampledBlocks = DEFAULT_MAX_RECENTLY_SAMPLED_BLOCKS;
private OptionalInt forwardSyncMaxDistanceFromHead = OptionalInt.empty();

private Builder() {}
Expand All @@ -139,6 +152,7 @@ public SyncConfig build() {
forwardSyncMaxPendingBatches,
forwardSyncMaxBlocksPerMinute,
forwardSyncMaxBlobSidecarsPerMinute,
maxRecentlySampledBlocks,
forwardSyncMaxDistanceFromHead);
}

Expand Down Expand Up @@ -214,6 +228,12 @@ public Builder reconstructHistoricStatesEnabled(
return this;
}

public Builder maxRecentlySampledBlocks(final Integer maxRecentlySampledBlocks) {
checkNotNull(maxRecentlySampledBlocks);
this.maxRecentlySampledBlocks = maxRecentlySampledBlocks;
return this;
}

public Builder fetchAllHistoricBlocks(final boolean fetchAllHistoricBlocks) {
this.fetchAllHistoricBlocks = fetchAllHistoricBlocks;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic;
import tech.pegasys.teku.statetransition.util.PendingPool;

public class RecentBlocksFetchService
Expand All @@ -42,12 +43,14 @@ public class RecentBlocksFetchService
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final FetchTaskFactory fetchTaskFactory;
private final Subscribers<BlockSubscriber> blockSubscribers = Subscribers.create(true);
private final DasSamplerBasic dasBasicSampler;

RecentBlocksFetchService(
final AsyncRunner asyncRunner,
final PendingPool<SignedBeaconBlock> pendingBlockPool,
final PendingPool<ValidatableAttestation> pendingAttestationsPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final DasSamplerBasic dasBasicSampler,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory,
final int maxConcurrentRequests) {
Expand All @@ -56,6 +59,7 @@ public class RecentBlocksFetchService
this.pendingBlockPool = pendingBlockPool;
this.pendingAttestationsPool = pendingAttestationsPool;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.dasBasicSampler = dasBasicSampler;
this.fetchTaskFactory = fetchTaskFactory;
}

Expand All @@ -64,13 +68,15 @@ public static RecentBlocksFetchService create(
final PendingPool<SignedBeaconBlock> pendingBlocksPool,
final PendingPool<ValidatableAttestation> pendingAttestations,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final DasSamplerBasic dasBasicSampler,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory) {
return new RecentBlocksFetchService(
asyncRunner,
pendingBlocksPool,
pendingAttestations,
blockBlobSidecarsTrackersPool,
dasBasicSampler,
forwardSync,
fetchTaskFactory,
MAX_CONCURRENT_REQUESTS);
Expand Down Expand Up @@ -106,6 +112,10 @@ public void requestRecentBlock(final Bytes32 blockRoot) {
// We already have this block, waiting for blobs
return;
}
if (dasBasicSampler.containsBlock(blockRoot)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we put containsBlock in BlockEventsListener interface and check it both for dataColumnSidecars and blobSidecars in router? Maybe add an extra interface for clarity or rename this one but it looks like we are doing the same job here, isn't it?

// We already have this block in DAS sampler
return;
}
final FetchBlockTask task = createTask(blockRoot);
if (allTasks.putIfAbsent(blockRoot, task) != null) {
// We're already tracking this task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic;
import tech.pegasys.teku.statetransition.util.PendingPool;

public class RecentBlocksFetchServiceTest {
Expand All @@ -60,6 +61,8 @@ public class RecentBlocksFetchServiceTest {
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool =
mock(BlockBlobSidecarsTrackersPool.class);

private final DasSamplerBasic dasSamplerBasic = mock(DasSamplerBasic.class);

private final FetchTaskFactory fetchTaskFactory = mock(FetchTaskFactory.class);

private final ForwardSync forwardSync = mock(ForwardSync.class);
Expand All @@ -81,6 +84,7 @@ public void setup() {
pendingBlockPool,
pendingAttestationsPool,
blockBlobSidecarsTrackersPool,
dasSamplerBasic,
forwardSync,
fetchTaskFactory,
maxConcurrentRequests);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.block.ReceivedBlockEventsChannel;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic;
import tech.pegasys.teku.statetransition.datacolumns.DataAvailabilitySampler;
import tech.pegasys.teku.statetransition.execution.ExecutionPayloadManager;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
Expand Down Expand Up @@ -224,6 +225,7 @@ public static SyncingNodeManager create(
pendingBlocks,
pendingAttestations,
BlockBlobSidecarsTrackersPool.NOOP,
DasSamplerBasic.NOOP,
syncService,
fetchBlockTaskFactory);
recentBlocksFetcher.subscribeBlockFetched(blockManager::importBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
import tech.pegasys.teku.spec.logic.common.util.AsyncBLSSignatureVerifier;
import tech.pegasys.teku.statetransition.datacolumns.CurrentSlotProvider;
import tech.pegasys.teku.statetransition.datacolumns.DasCustodyStand;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasicImpl;
import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarRecoveringCustody;
import tech.pegasys.teku.statetransition.datacolumns.retriever.DataColumnSidecarRetrieverStub;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
Expand Down Expand Up @@ -161,8 +161,8 @@ spec, new SignedBlockAndState(anchorBlock, anchorState)),
final StubBlobSidecarManager blobSidecarManager = new StubBlobSidecarManager(kzg);
final CurrentSlotProvider currentSlotProvider =
CurrentSlotProvider.create(spec, recentChainData.getStore());
final DasSamplerBasic dasSampler =
new DasSamplerBasic(
final DasSamplerBasicImpl dasSampler =
new DasSamplerBasicImpl(
spec,
asyncRunnerFactory.create("das", 1),
currentSlotProvider,
Expand All @@ -173,7 +173,9 @@ spec, new SignedBlockAndState(anchorBlock, anchorState)),
// and fetching from the config would break when not in fulu
DasCustodyStand.createCustodyGroupCountManager(4, 8),
recentChainData,
false);
false,
new StubMetricsSystem(),
64);
final StubDataColumnSidecarManager dataColumnSidecarManager =
new StubDataColumnSidecarManager(spec, recentChainData, dasSampler);
// forkChoiceLateBlockReorgEnabled is true here always because this is the reference test
Expand Down
Loading
Loading