Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic;
import tech.pegasys.teku.statetransition.execution.ExecutionPayloadManager;
import tech.pegasys.teku.statetransition.util.PendingPool;
import tech.pegasys.teku.statetransition.validation.signatures.SignatureVerificationService;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory {
private final PendingPool<SignedBeaconBlock> pendingBlocks;
private final PendingPool<ValidatableAttestation> pendingAttestations;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final DasSamplerBasic dasSamplerBasic;
private final int getStartupTargetPeerCount;
private final AsyncBLSSignatureVerifier signatureVerifier;
private final Duration startupTimeout;
Expand All @@ -98,6 +100,7 @@ public DefaultSyncServiceFactory(
final PendingPool<SignedBeaconBlock> pendingBlocks,
final PendingPool<ValidatableAttestation> pendingAttestations,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final DasSamplerBasic dasSamplerBasic,
final int getStartupTargetPeerCount,
final SignatureVerificationService signatureVerifier,
final Duration startupTimeout,
Expand All @@ -119,6 +122,7 @@ public DefaultSyncServiceFactory(
this.pendingBlocks = pendingBlocks;
this.pendingAttestations = pendingAttestations;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.dasSamplerBasic = dasSamplerBasic;
this.getStartupTargetPeerCount = getStartupTargetPeerCount;
this.signatureVerifier = signatureVerifier;
this.startupTimeout = startupTimeout;
Expand All @@ -141,6 +145,7 @@ public SyncService create(final EventChannels eventChannels) {
pendingBlocks,
pendingAttestations,
blockBlobSidecarsTrackersPool,
dasSamplerBasic,
forwardSyncService,
fetchTaskFactory);
final RecentBlobSidecarsFetcher recentBlobSidecarsFetcher =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ public class SyncConfig {
public static final int DEFAULT_FORWARD_SYNC_BATCH_SIZE = 25;
public static final int DEFAULT_FORWARD_SYNC_MAX_PENDING_BATCHES = 5;

/** Aligned with {@link P2PConfig#DEFAULT_PEER_BLOCKS_RATE_LIMIT} */
/**
* Must be >= FORWARD_SYNC_BATCH_SIZE * FORWARD_SYNC_MAX_PENDING_BATCHES to avoid evicting
* completed trackers before the sync pipeline imports them.
*/
public static final int DEFAULT_MAX_RECENTLY_SAMPLED_BLOCKS = 128;

public static final int DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE = 500;

/** Aligned with {@link P2PConfig#DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT} */
Expand All @@ -43,6 +48,7 @@ public class SyncConfig {
private final int forwardSyncMaxPendingBatches;
private final int forwardSyncMaxBlocksPerMinute;
private final int forwardSyncMaxBlobSidecarsPerMinute;
private final int maxRecentlySampledBlocks;
private final OptionalInt forwardSyncMaxDistanceFromHead;

private SyncConfig(
Expand All @@ -55,6 +61,7 @@ private SyncConfig(
final int forwardSyncMaxPendingBatches,
final int forwardSyncMaxBlocksPerMinute,
final int forwardSyncMaxBlobSidecarsPerMinute,
final int maxRecentlySampledBlocks,
final OptionalInt forwardSyncMaxDistanceFromHead) {
this.isEnabled = isEnabled;
this.isMultiPeerSyncEnabled = isMultiPeerSyncEnabled;
Expand All @@ -65,6 +72,7 @@ private SyncConfig(
this.forwardSyncMaxPendingBatches = forwardSyncMaxPendingBatches;
this.forwardSyncMaxBlocksPerMinute = forwardSyncMaxBlocksPerMinute;
this.forwardSyncMaxBlobSidecarsPerMinute = forwardSyncMaxBlobSidecarsPerMinute;
this.maxRecentlySampledBlocks = maxRecentlySampledBlocks;
this.forwardSyncMaxDistanceFromHead = forwardSyncMaxDistanceFromHead;
}

Expand Down Expand Up @@ -108,6 +116,10 @@ public int getForwardSyncMaxBlobSidecarsPerMinute() {
return forwardSyncMaxBlobSidecarsPerMinute;
}

public int getMaxRecentlySampledBlocks() {
return maxRecentlySampledBlocks;
}

public OptionalInt getForwardSyncMaxDistanceFromHead() {
return forwardSyncMaxDistanceFromHead;
}
Expand All @@ -123,6 +135,7 @@ public static class Builder {
private Integer forwardSyncMaxBlocksPerMinute = DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE;
private Integer forwardSyncMaxBlobSidecarsPerMinute =
DEFAULT_FORWARD_SYNC_MAX_BLOB_SIDECARS_PER_MINUTE;
private Integer maxRecentlySampledBlocks = DEFAULT_MAX_RECENTLY_SAMPLED_BLOCKS;
private OptionalInt forwardSyncMaxDistanceFromHead = OptionalInt.empty();

private Builder() {}
Expand All @@ -139,6 +152,7 @@ public SyncConfig build() {
forwardSyncMaxPendingBatches,
forwardSyncMaxBlocksPerMinute,
forwardSyncMaxBlobSidecarsPerMinute,
maxRecentlySampledBlocks,
forwardSyncMaxDistanceFromHead);
}

Expand Down Expand Up @@ -214,6 +228,12 @@ public Builder reconstructHistoricStatesEnabled(
return this;
}

public Builder maxRecentlySampledBlocks(final Integer maxRecentlySampledBlocks) {
checkNotNull(maxRecentlySampledBlocks);
this.maxRecentlySampledBlocks = maxRecentlySampledBlocks;
return this;
}

public Builder fetchAllHistoricBlocks(final boolean fetchAllHistoricBlocks) {
this.fetchAllHistoricBlocks = fetchAllHistoricBlocks;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic;
import tech.pegasys.teku.statetransition.util.PendingPool;

public class RecentBlocksFetchService
Expand All @@ -42,12 +43,14 @@ public class RecentBlocksFetchService
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final FetchTaskFactory fetchTaskFactory;
private final Subscribers<BlockSubscriber> blockSubscribers = Subscribers.create(true);
private final DasSamplerBasic dasBasicSampler;

RecentBlocksFetchService(
final AsyncRunner asyncRunner,
final PendingPool<SignedBeaconBlock> pendingBlockPool,
final PendingPool<ValidatableAttestation> pendingAttestationsPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final DasSamplerBasic dasBasicSampler,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory,
final int maxConcurrentRequests) {
Expand All @@ -56,6 +59,7 @@ public class RecentBlocksFetchService
this.pendingBlockPool = pendingBlockPool;
this.pendingAttestationsPool = pendingAttestationsPool;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.dasBasicSampler = dasBasicSampler;
this.fetchTaskFactory = fetchTaskFactory;
}

Expand All @@ -64,13 +68,15 @@ public static RecentBlocksFetchService create(
final PendingPool<SignedBeaconBlock> pendingBlocksPool,
final PendingPool<ValidatableAttestation> pendingAttestations,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final DasSamplerBasic dasBasicSampler,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory) {
return new RecentBlocksFetchService(
asyncRunner,
pendingBlocksPool,
pendingAttestations,
blockBlobSidecarsTrackersPool,
dasBasicSampler,
forwardSync,
fetchTaskFactory,
MAX_CONCURRENT_REQUESTS);
Expand Down Expand Up @@ -106,6 +112,10 @@ public void requestRecentBlock(final Bytes32 blockRoot) {
// We already have this block, waiting for blobs
return;
}
if (dasBasicSampler.containsBlock(blockRoot)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we put containsBlock in BlockEventsListener interface and check it both for dataColumnSidecars and blobSidecars in router? Maybe add an extra interface for clarity or rename this one but it looks like we are doing the same job here, isn't it?

// We already have this block in DAS sampler
return;
}
final FetchBlockTask task = createTask(blockRoot);
if (allTasks.putIfAbsent(blockRoot, task) != null) {
// We're already tracking this task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic;
import tech.pegasys.teku.statetransition.util.PendingPool;

public class RecentBlocksFetchServiceTest {
Expand All @@ -60,6 +61,8 @@ public class RecentBlocksFetchServiceTest {
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool =
mock(BlockBlobSidecarsTrackersPool.class);

private final DasSamplerBasic dasSamplerBasic = mock(DasSamplerBasic.class);

private final FetchTaskFactory fetchTaskFactory = mock(FetchTaskFactory.class);

private final ForwardSync forwardSync = mock(ForwardSync.class);
Expand All @@ -81,6 +84,7 @@ public void setup() {
pendingBlockPool,
pendingAttestationsPool,
blockBlobSidecarsTrackersPool,
dasSamplerBasic,
forwardSync,
fetchTaskFactory,
maxConcurrentRequests);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.statetransition.block.BlockManager;
import tech.pegasys.teku.statetransition.block.ReceivedBlockEventsChannel;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic;
import tech.pegasys.teku.statetransition.datacolumns.DataAvailabilitySampler;
import tech.pegasys.teku.statetransition.execution.ExecutionPayloadManager;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
Expand Down Expand Up @@ -224,6 +225,7 @@ public static SyncingNodeManager create(
pendingBlocks,
pendingAttestations,
BlockBlobSidecarsTrackersPool.NOOP,
DasSamplerBasic.NOOP,
syncService,
fetchBlockTaskFactory);
recentBlocksFetcher.subscribeBlockFetched(blockManager::importBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
import tech.pegasys.teku.spec.logic.common.util.AsyncBLSSignatureVerifier;
import tech.pegasys.teku.statetransition.datacolumns.CurrentSlotProvider;
import tech.pegasys.teku.statetransition.datacolumns.DasCustodyStand;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic;
import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasicImpl;
import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarRecoveringCustody;
import tech.pegasys.teku.statetransition.datacolumns.retriever.DataColumnSidecarRetrieverStub;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
Expand Down Expand Up @@ -161,8 +161,8 @@ spec, new SignedBlockAndState(anchorBlock, anchorState)),
final StubBlobSidecarManager blobSidecarManager = new StubBlobSidecarManager(kzg);
final CurrentSlotProvider currentSlotProvider =
CurrentSlotProvider.create(spec, recentChainData.getStore());
final DasSamplerBasic dasSampler =
new DasSamplerBasic(
final DasSamplerBasicImpl dasSampler =
new DasSamplerBasicImpl(
spec,
asyncRunnerFactory.create("das", 1),
currentSlotProvider,
Expand All @@ -173,7 +173,9 @@ spec, new SignedBlockAndState(anchorBlock, anchorState)),
// and fetching from the config would break when not in fulu
DasCustodyStand.createCustodyGroupCountManager(4, 8),
recentChainData,
false);
false,
new StubMetricsSystem(),
64);
final StubDataColumnSidecarManager dataColumnSidecarManager =
new StubDataColumnSidecarManager(spec, recentChainData, dasSampler);
// forkChoiceLateBlockReorgEnabled is true here always because this is the reference test
Expand Down
Loading
Loading