Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Fix missing `and` keyword in `JDBCEBPFProfilingTaskDAO.getTaskRecord()` SQL query, which caused a syntax error on every invocation.
* Fix duplicate `TABLE_COLUMN` condition in `JDBCMetadataQueryDAO.findEndpoint()`, which was binding the same parameter twice due to a copy-paste error.
* Support MCP (Model Context Protocol) observability for Envoy AI Gateway: MCP metrics (request CPM/latency, method breakdown, backend breakdown, initialization latency, capabilities), MCP access log sampling (errors only), `ai_route_type` searchable log tag, and MCP dashboard tabs.
* Add weighted handler support to `BatchQueue` adaptive partitioning. MAL metrics use weight 0.05 at L1 (vs 1.0 for OAL), reducing partition count and memory overhead when many MAL metric types are registered.

#### UI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.meter.Meter;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
Expand Down Expand Up @@ -121,7 +122,16 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
QUEUE_USAGE_GAUGE = gauge;
}

l1Queue.addHandler(metricsClass, new L1Handler());
// OAL metrics receive items on every incoming request (high, continuous throughput),
// so each type benefits from a dedicated partition — weight 1.0.
// MAL metrics receive items only once per scrape interval (typically 1 emit/min),
// producing at most ~500 items per type per burst. With a 20,000-slot buffer,
// ~40 MAL types can safely share one partition (20,000 / 500 = 40). We use
// weight 0.05 (≈ 1/20) to give 2x headroom over the theoretical sharing limit.
// This significantly reduces partition count and memory overhead when many MAL
// metric types are registered (e.g., from otel-rules).
final double weight = Meter.class.isAssignableFrom(metricsClass) ? 0.05 : 1.0;
l1Queue.addHandler(metricsClass, new L1Handler(), weight);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ public class MetricsPersistentMinWorker extends MetricsPersistentWorker {
QUEUE_USAGE_GAUGE = gauge;
}

// No weight differentiation at L2. After L1 pre-aggregation, both OAL and MAL
// produce one item per (metric_type × entity) per minute — similar burst patterns
// and throughput. The OAL per-request amplification is absorbed by L1, so at L2
// there is no meaningful throughput difference to justify partition weight tuning.
// L2 buffer is also much smaller (2,000 vs 20,000), so the memory overhead of
// extra partitions is modest (~16 MB total vs L1's ~167 MB).
l2Queue.addHandler(metricsClass, new L2Handler());
}

Expand Down
18 changes: 18 additions & 0 deletions oap-server/server-library/library-batch-queue/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ Adaptive growth (default multiplier 25, with 8 threads -> threshold 200):
- 100 handlers -> 100 partitions (1:1)
- 500 handlers -> 350 partitions (200 + 300/2)

### Weighted handlers

`addHandler(type, handler, weight)` allows different handler types to contribute different
amounts to the partition count. The adaptive formula uses the weighted sum instead of raw
handler count. Partition assignment remains hash-based (`typeHash()`) — weight only affects
how many partitions exist, not which partition a type lands on.

L1 uses weight 0.05 for MAL metrics (vs 1.0 for OAL). Rationale: MAL emits ~500 items/type
per scrape interval. With 20,000-slot buffers, ~40 MAL types can safely share one partition
(20,000 / 500 = 40). Weight 0.05 ≈ 1/20 gives 2x headroom.

Example (8 threads, 642 OAL + 1,247 MAL):
- Without weight: 1,889 handlers -> 1,045 partitions (167 MB array overhead at L1)
- With weight: 642*1.0 + 1,247*0.05 = 705 effective -> 452 partitions (72 MB)

L2 uses default weight 1.0 for all types because after L1 pre-aggregation both OAL and MAL
have similar per-minute burst patterns.

## Drain Rebalancing

Static round-robin partition assignment creates thread imbalance when metric types have varying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ public class BatchQueue<T> {
*/
private final ConcurrentHashMap<Class<?>, HandlerConsumer<T>> handlerMap;

/**
* Running weighted sum of registered handlers, used by adaptive partition policy.
* Each handler contributes its weight (default 1.0) when registered via
* {@link #addHandler(Class, HandlerConsumer, double)}.
*/
private double weightedHandlerCount;

/**
* Tracks unregistered types that have already been warned about,
* to avoid flooding the log with repeated errors.
Expand Down Expand Up @@ -382,23 +389,43 @@ private void scheduleDrain(final int taskIndex) {
}

/**
* Register a type-based handler. Items whose {@code getClass()} matches the given
* type will be batched together and dispatched to this handler.
* Register a type-based handler with default weight 1.0.
*
* @param type the class of items to route to this handler
* @param handler the consumer that processes batches of the given type
* @see #addHandler(Class, HandlerConsumer, double)
*/
public void addHandler(final Class<? extends T> type, final HandlerConsumer<T> handler) {
addHandler(type, handler, 1.0);
}

/**
* Register a type-based handler with an explicit weight for adaptive partition sizing.
*
* <p>For adaptive partition policies, adding a handler recalculates the partition
* count and grows the partition array if needed. For non-adaptive policies the
* resolved count never changes, so this is a no-op beyond the registration.
* Drain loop threads pick up new partitions on their next cycle via volatile reads.
* <p>The weight controls how much this handler contributes to partition growth.
* A weight of 1.0 means one handler ≈ one partition (below the adaptive threshold).
* A lower weight (e.g., 0.05) means many handlers share a partition, suitable for
* low-throughput types. The weighted sum of all handlers replaces the raw handler
* count in the adaptive partition formula.
*
* <p>For non-adaptive partition policies the weight is ignored and this behaves
* the same as {@link #addHandler(Class, HandlerConsumer)}.
*
* @param type the class of items to route to this handler
* @param handler the consumer that processes batches of the given type
* @param weight partition weight for this handler (default 1.0). Must be &gt; 0.
*/
@SuppressWarnings("unchecked")
public void addHandler(final Class<? extends T> type, final HandlerConsumer<T> handler) {
public void addHandler(final Class<? extends T> type, final HandlerConsumer<T> handler,
final double weight) {
if (weight <= 0) {
throw new IllegalArgumentException("Handler weight must be > 0, got: " + weight);
}
handlerMap.put(type, handler);
weightedHandlerCount += weight;

final int newPartitionCount = config.getPartitions()
.resolve(resolvedThreadCount, handlerMap.size());
.resolve(resolvedThreadCount, weightedHandlerCount);
final ArrayBlockingQueue<T>[] currentPartitions = this.partitions;
if (newPartitionCount > currentPartitions.length) {
final int oldCount = currentPartitions.length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
* excess handlers share partitions at 1:2 ratio.</li>
* </ul>
*
* <p>All policies are resolved via {@link #resolve(int, int)}. For non-adaptive
* policies the handlerCount parameter is ignored. At queue creation time, if the
* <p>All policies are resolved via {@link #resolve(int, double)}. For non-adaptive
* policies the weightedHandlerCount parameter is ignored. At queue creation time, if the
* resolved partition count is less than the thread count, the thread count is
* reduced to match and a warning is logged.
*/
Expand Down Expand Up @@ -130,29 +130,38 @@ public static PartitionPolicy adaptive(final int multiplier) {
* <ul>
* <li>fixed: returns the pre-set count (both parameters ignored).</li>
* <li>threadMultiply: returns multiplier * resolvedThreadCount (handlerCount ignored).</li>
* <li>adaptive: when handlerCount is 0, returns resolvedThreadCount as a sensible
* initial count. Otherwise, threshold = threadCount * multiplier; if handlerCount
* &lt;= threshold, returns handlerCount (1:1). If above, returns
* threshold + (handlerCount - threshold) / 2.</li>
* <li>adaptive: when weightedHandlerCount is 0, returns resolvedThreadCount as a sensible
* initial count. Otherwise, threshold = threadCount * multiplier; if weightedHandlerCount
* &lt;= threshold, returns weightedHandlerCount (1:1). If above, returns
* threshold + (weightedHandlerCount - threshold) / 2.</li>
* </ul>
*
* @param resolvedThreadCount the resolved number of drain threads
* @param handlerCount the current number of registered type handlers
* @param weightedHandlerCount the weighted sum of registered type handlers. Each handler
* contributes its weight (default 1.0) to this sum.
* High-weight handlers grow the partition count faster,
* reducing the chance of hash collisions for those types.
* Low-weight handlers grow the count slowly, so they are
* more likely to share partitions with other types via
* {@code typeHash()} routing. Note that partition assignment
* is hash-based, not weight-based — there is no guarantee
* that any type gets a dedicated partition.
* @return the resolved partition count, always &gt;= 1
*/
public int resolve(final int resolvedThreadCount, final int handlerCount) {
public int resolve(final int resolvedThreadCount, final double weightedHandlerCount) {
if (fixedCount > 0) {
return fixedCount;
}
if (adaptive) {
if (handlerCount == 0) {
final int effectiveCount = (int) Math.ceil(weightedHandlerCount);
if (effectiveCount == 0) {
return Math.max(1, resolvedThreadCount);
}
final int threshold = Math.max(1, multiplier * resolvedThreadCount);
if (handlerCount <= threshold) {
return handlerCount;
if (effectiveCount <= threshold) {
return effectiveCount;
}
return threshold + (handlerCount - threshold) / 2;
return threshold + (effectiveCount - threshold) / 2;
}
return Math.max(1, multiplier * resolvedThreadCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,29 @@ public void testAdaptiveRejectsInvalidMultiplier() {
() -> PartitionPolicy.adaptive(0));
}

@Test
public void testAdaptiveWithWeightedHandlers() {
// Simulate 642 OAL (weight 1.0) + 1247 MAL (weight 0.05)
// Weighted count = 642 + 1247 * 0.05 = 642 + 62.35 = 704.35, ceil = 705
// 8 threads * 25 = 200 threshold, 705 > 200
// Result = 200 + (705 - 200) / 2 = 200 + 252 = 452
final double weightedCount = 642 * 1.0 + 1247 * 0.05;
assertEquals(452, PartitionPolicy.adaptive().resolve(8, weightedCount));
}

@Test
public void testAdaptiveWithLowWeightOnly() {
// 100 MAL-only handlers at weight 0.05 = effective 5, ceil = 5
// 8 threads, threshold = 200, 5 < 200 -> 1:1 -> 5 partitions
assertEquals(5, PartitionPolicy.adaptive().resolve(8, 100 * 0.05));
}

@Test
public void testAdaptiveWithZeroWeightedCount() {
// weightedCount = 0.0 should return threadCount
assertEquals(8, PartitionPolicy.adaptive().resolve(8, 0.0));
}

@Test
public void testToString() {
assertEquals("fixed(4)", PartitionPolicy.fixed(4).toString());
Expand Down
Loading