Skip to content

Commit 7dfc0c4

Browse files
authored
[OPIK-5973] [BE] perf: optimize thread list/count queries and push down thread_id filter (#6358)
* [OPIK-5973] [BE] perf: optimize thread list/count queries and push down thread_id filter Thread list/count queries rewritten to use LIMIT 1 BY instead of FINAL on traces, spans, and trace_threads. Narrow the traces_final projection and add a separate traces_final_ids CTE so downstream CTEs (spans_deduped, trace_threads_final) subset via IN(id) rather than wide-row deduplication. Benchmark on production-equivalent data: ~3× faster, ~2.8× less memory. Split TraceThreadField.ID=EQUAL filters out of the outer trace_thread_filters and emit them against traces.thread_id inside traces_final_ids, so idx_traces_thread_id_bf can prune granules. Parameter :thread_id_pushdown is bound separately to avoid colliding with strategy-generated :filter{i} names. Rename ThreadService → TraceThreadQueryService and split the trace-thread filter helpers out of DatabaseUtils into a dedicated FilterUtils class. * [OPIK-5973] [BE] fix: update remaining callers after ThreadService/DatabaseUtils renames Missed in the previous commit — OpikApplication, TracesResource, and AssertionResultDAO all reference the old names and break compile. Swap imports and callsites to match the renamed classes. * [OPIK-5973] [BE] chore: add SETTINGS log_comment to thread queries Attaches log_comment to list, count, findById, search, and stats thread queries so they can be traced in ClickHouse query logs with the query name, workspace_id, user_name, and per-call details. Follows the same pattern used across the other DAOs. Restructured each method to put makeMonoContextAware / makeFluxContextAware at the outermost level so userName / workspaceId are available before template rendering. countThreadTotal now takes those values as parameters instead of wrapping in its own context lookup. * Fix filters * [OPIK-5973] [BE] fix: apply TraceThreadField.ID pushdown additively; drop filter strip The strip helper (withoutTraceThreadIdPushdown) caused two problems: - bindTraceThreadSearchCriteria still iterated the full filter list with TRACE_THREAD strategy, trying to bind :filter0 for the ID filter that newTraceThreadFindTemplate had emitted against the stripped list. The SQL had no :filter0 placeholder → R2DBC "non-existing identifier" and thread-stats endpoint 500'd when filtered by thread id. - TraceDAO callers of the same helper silently lost ID filters (baz #3). Since thread_id is stable per trace (the same invariant the pushdown optimization relies on), applying the filter at both positions — the outer aggregate (via the normal TRACE_THREAD strategy) and the inner raw scan (via :thread_id_pushdown) — is idempotent. The inner predicate still unlocks idx_traces_thread_id_bf; the outer becomes a no-op on an already-single-thread result set. Simpler code, no strip helper needed, stats test passes. * [OPIK-5973] [BE] refactor: align thread list/count queries with TraceDAO prefilter pattern Mirrors TraceDAO#shouldUseTraceIdPrefilter / trace_id_prefilter shape on the thread list and count queries: - traces_final_ids is now conditional (<if(traces_final_ids)>) — only emitted when filters/search narrow beyond the workspace/project/uuid- range scope, via new shouldUseTracesFinalIdsPrefilter() helper. - Dropped the inner ORDER BY … LIMIT 1 BY id inside traces_final_ids; it's now SELECT DISTINCT id, thread_id with filters inline. Accepts phantom reads at the prefilter level — authoritative filtering happens downstream. - traces_final wraps in a two-level subquery so filters/search_text are re-applied after the LIMIT 1 BY id dedup, dropping phantom rows whose latest version no longer matches. - Downstream CTEs (traces_final, spans_deduped, trace_threads_final) now branch via <if(traces_final_ids)>…<else> <if(uuid_from_time)>…<endif> <endif>, scoping to the ID set when the prefilter is active, otherwise applying the uuid range directly. - Applied to list and count queries. Stats query has different shape (still on FINAL) and doesn't use traces_final_ids; unchanged here. * [OPIK-5973] [BE] refactor: simplify count query, LIMIT 1 BY id, doc pushdown operator scope - Count query: drop spans_deduped/spans_agg/feedback_scores_agg CTEs and their joins, drop display-only columns (total_estimated_cost, usage, feedback_scores_list, feedback_scores). Mirrors TraceDAO#COUNT_BY_PROJECT_ID — only CTEs needed for filter evaluation remain. All filter paths preserved (TRACE-level, trace_thread_filters, feedback_scores_filters, feedback_scores_empty_filters, annotation_queue_filters). - spans_deduped LIMIT 1 BY: switch from full sort-key tuple (workspace_id, project_id, trace_id, parent_span_id, id) to id alone. Span id is UUID v7 globally unique, so the shorter key is equivalent and reads more clearly. - Indent the <if(traces_final_ids)> / <else> branches one level deeper inside traces_final, spans_deduped, and trace_threads_final so the conditional structure is visually aligned with its nesting. - FilterUtils.findTraceThreadIdPushdownFilter: add a comment explaining why the check is scoped to Operator.EQUAL — the pushdown SQL template is hardcoded to thread_id = :x, and only EQUAL benefits from the traces.thread_id bloom filter. Locks intent for future contributors. * [OPIK-5973] [BE] refactor: align stats query with list/count FINAL→LIMIT 1 BY pattern SELECT_TRACE_THREADS_STATS now mirrors list/count structure: - Replace FROM traces final / FROM spans final / FROM trace_threads FINAL with LIMIT 1 BY id patterns. - Add conditional traces_final_ids prefilter (gated by shouldUseTracesFinalIdsPrefilter) with <if(traces_final_ids)>…<else> <if(uuid_from_time)>…<endif> <endif> branches in traces_final, spans_deduped, and trace_threads_final. - Split spans_deduped → spans_agg so the narrow dedup runs before the per-thread aggregation. LIMIT 1 BY id (consistent with spans_deduped simplification elsewhere). - traces_final wraps in a two-level subquery so <filters> / <search_text> re-apply post-dedup (drops phantom reads). - Wire shouldUseTracesFinalIdsPrefilter in getThreadStats. * [OPIK-5973] [BE] fix: apply filters once in traces_final_ids prefilter Restructures the traces_final_ids CTE in list/count/stats so filters and search_text are applied on the outer SELECT DISTINCT of the prefilter rather than being duplicated in both the prefilter and the traces_final outer wrapper. traces_final now just dedups via LIMIT 1 BY id and projects; the id set from traces_final_ids is authoritative for downstream spans_deduped and trace_threads_final scans. * [OPIK-5973] [BE] refactor: simplify trace_threads_final dedup to LIMIT 1 BY id UUIDv7 id is globally unique, so LIMIT 1 BY id produces the same deduped set as the full (workspace_id, project_id, thread_id, id) tuple. Aligns list/count with the stats query which already uses LIMIT 1 BY id. * [OPIK-5973] [BE] perf: gate feedback_scores and annotation_queue CTEs on filter usage in count query feedback_scores_deduped → _grouped → _final and thread_annotation_queue_ids were always materialized in the count query, even when no filter referenced them. Their output (display-formatted category_name/reason concat, value_by_author maps, annotation_queue_ids arrays) is not used by count(DISTINCT id), only by the inner filter predicates that may or may not be present. Gated via <if(feedback_scores_needed)> (OR of feedback_scores_filters and feedback_scores_empty_filters) and <if(annotation_queue_filters)>. The common-path count (no feedback-score filter, no annotation-queue filter) now skips the feedback_scores union-all + two-level groupArray pipeline and the annotation_queue_items join entirely.
1 parent df6cffa commit 7dfc0c4

21 files changed

Lines changed: 377 additions & 171 deletions

apps/opik-backend/src/main/java/com/comet/opik/OpikApplication.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import com.comet.opik.api.error.JsonProcessingExceptionMapper;
44
import com.comet.opik.infrastructure.ConfigurationModule;
5-
import com.comet.opik.infrastructure.DatabaseUtils;
65
import com.comet.opik.infrastructure.EncryptionUtils;
6+
import com.comet.opik.infrastructure.FilterUtils;
77
import com.comet.opik.infrastructure.OpikConfiguration;
88
import com.comet.opik.infrastructure.auth.AuthModule;
99
import com.comet.opik.infrastructure.aws.AwsModule;
@@ -82,7 +82,7 @@ public void initialize(Bootstrap<OpikConfiguration> bootstrap) {
8282
bootstrap.addBundle(LiquibaseBundle.builder()
8383
.name(DB_APP_STATE_NAME)
8484
.migrationsFileName(DB_APP_STATE_MIGRATIONS_FILE_NAME)
85-
.dataSourceFactoryFunction(conf -> DatabaseUtils.filterProperties(conf.getDatabase()))
85+
.dataSourceFactoryFunction(conf -> FilterUtils.filterProperties(conf.getDatabase()))
8686
.build());
8787
bootstrap.addBundle(LiquibaseBundle.builder()
8888
.name(DB_APP_ANALYTICS_NAME)
@@ -92,7 +92,7 @@ public void initialize(Bootstrap<OpikConfiguration> bootstrap) {
9292
bootstrap.addBundle(GuiceBundle.builder()
9393
.bundles(JdbiBundle
9494
.<OpikConfiguration>forDatabase(
95-
(conf, env) -> DatabaseUtils.filterProperties(conf.getDatabase()))
95+
(conf, env) -> FilterUtils.filterProperties(conf.getDatabase()))
9696
.withPlugins(new SqlObjectPlugin(), new Jackson2Plugin()))
9797
.modules(new DatabaseAnalyticsModule(), new IdGeneratorModule(), new AuthModule(), new RedisModule(),
9898
new RateLimitModule(), new NameGeneratorModule(), new HttpModule(), new EventModule(),

apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/TracesResource.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@
3737
import com.comet.opik.domain.FeedbackScoreService;
3838
import com.comet.opik.domain.ProjectService;
3939
import com.comet.opik.domain.Streamer;
40-
import com.comet.opik.domain.ThreadService;
4140
import com.comet.opik.domain.TraceSearchCriteria;
4241
import com.comet.opik.domain.TraceService;
42+
import com.comet.opik.domain.TraceThreadQueryService;
4343
import com.comet.opik.domain.threads.TraceThreadService;
4444
import com.comet.opik.domain.workspaces.WorkspaceMetadataService;
4545
import com.comet.opik.infrastructure.auth.RequestContext;
@@ -109,7 +109,7 @@
109109
public class TracesResource {
110110

111111
private final @NonNull TraceService service;
112-
private final @NonNull ThreadService threadService;
112+
private final @NonNull TraceThreadQueryService traceThreadQueryService;
113113
private final @NonNull FeedbackScoreService feedbackScoreService;
114114
private final @NonNull CommentService commentService;
115115
private final @NonNull FiltersFactory filtersFactory;
@@ -661,7 +661,7 @@ public Response getTraceThreads(
661661

662662
log.info("Get trace threads by '{}' on workspaceId '{}'", searchCriteria, workspaceId);
663663

664-
TraceThreadPage traceThreadPage = threadService.find(page, size, searchCriteria)
664+
TraceThreadPage traceThreadPage = traceThreadQueryService.find(page, size, searchCriteria)
665665
.map(it -> {
666666
// Remove sortableBy fields if dynamic sorting is disabled due to workspace size
667667
if (metadata.cannotUseDynamicSorting()) {
@@ -712,7 +712,7 @@ public ChunkedOutput<JsonNode> searchTraceThreads(
712712
.uuidToTime(instantToUUIDMapper.toUpperBound(request.toTime()))
713713
.build();
714714

715-
Flux<TraceThread> items = threadService.search(request.limit(), searchCriteria)
715+
Flux<TraceThread> items = traceThreadQueryService.search(request.limit(), searchCriteria)
716716
.contextWrite(ctx -> ctx.put(RequestContext.WORKSPACE_ID, workspaceId)
717717
.put(RequestContext.USER_NAME, userName)
718718
.put(RequestContext.VISIBILITY, Optional.ofNullable(visibility).orElse(Visibility.PRIVATE)));
@@ -738,7 +738,7 @@ public Response getTraceThread(
738738
log.info("Getting trace thread by id '{}' and project id '{}' on workspace_id '{}' with truncate '{}'",
739739
identifier.threadId(), projectId, workspaceId, identifier.truncate());
740740

741-
TraceThread thread = threadService.getById(projectId, identifier.threadId(), identifier.truncate())
741+
TraceThread thread = traceThreadQueryService.getById(projectId, identifier.threadId(), identifier.truncate())
742742
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
743743
.block();
744744

@@ -905,7 +905,7 @@ public Response getThreadStats(@QueryParam("project_id") UUID projectId,
905905

906906
log.info("Get trace thread stats by '{}' on workspaceId '{}'", searchCriteria, workspaceId);
907907

908-
ProjectStats projectStats = threadService.getStats(searchCriteria)
908+
ProjectStats projectStats = traceThreadQueryService.getStats(searchCriteria)
909909
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
910910
.block();
911911

apps/opik-backend/src/main/java/com/comet/opik/domain/AssertionResultDAO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.Optional;
2424

2525
import static com.comet.opik.domain.AsyncContextUtils.bindUserNameAndWorkspace;
26-
import static com.comet.opik.infrastructure.DatabaseUtils.getLogComment;
26+
import static com.comet.opik.infrastructure.FilterUtils.getLogComment;
2727
import static com.comet.opik.utils.AsyncUtils.makeMonoContextAware;
2828

2929
@ImplementedBy(AssertionResultDAOImpl.class)

apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
import static com.comet.opik.api.DatasetItem.DatasetItemPage;
4141
import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToFlux;
42-
import static com.comet.opik.infrastructure.DatabaseUtils.getSTWithLogComment;
42+
import static com.comet.opik.infrastructure.FilterUtils.getSTWithLogComment;
4343
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.Segment;
4444
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.endSegment;
4545
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.startSegment;

apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
import java.util.stream.Stream;
6262

6363
import static com.comet.opik.api.DatasetItem.DatasetItemPage;
64-
import static com.comet.opik.infrastructure.DatabaseUtils.generateUuidPool;
64+
import static com.comet.opik.infrastructure.FilterUtils.generateUuidPool;
6565
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.READ_ONLY;
6666
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.WRITE;
6767

apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemVersionDAO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import java.util.stream.Collectors;
5252

5353
import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToFlux;
54-
import static com.comet.opik.infrastructure.DatabaseUtils.getSTWithLogComment;
54+
import static com.comet.opik.infrastructure.FilterUtils.getSTWithLogComment;
5555
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.Segment;
5656
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.endSegment;
5757
import static com.comet.opik.infrastructure.instrumentation.InstrumentAsyncUtils.startSegment;

apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetVersionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import java.util.UUID;
3636
import java.util.stream.Collectors;
3737

38-
import static com.comet.opik.infrastructure.DatabaseUtils.generateUuidPool;
38+
import static com.comet.opik.infrastructure.FilterUtils.generateUuidPool;
3939
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.READ_ONLY;
4040
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.WRITE;
4141

apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentDAO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070

7171
import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToFlux;
7272
import static com.comet.opik.domain.CommentResultMapper.parseCommentsFromJson;
73-
import static com.comet.opik.infrastructure.DatabaseUtils.getSTWithLogComment;
73+
import static com.comet.opik.infrastructure.FilterUtils.getSTWithLogComment;
7474
import static com.comet.opik.utils.AsyncUtils.makeFluxContextAware;
7575
import static com.comet.opik.utils.JsonUtils.getJsonNodeOrDefault;
7676
import static com.comet.opik.utils.JsonUtils.getStringOrDefault;

apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import java.util.UUID;
3232

3333
import static com.comet.opik.domain.AsyncContextUtils.bindWorkspaceIdToFlux;
34-
import static com.comet.opik.infrastructure.DatabaseUtils.getSTWithLogComment;
34+
import static com.comet.opik.infrastructure.FilterUtils.getSTWithLogComment;
3535
import static com.comet.opik.utils.AsyncUtils.makeFluxContextAware;
3636
import static com.comet.opik.utils.AsyncUtils.makeMonoContextAware;
3737
import static com.comet.opik.utils.template.TemplateUtils.QueryItem;

apps/opik-backend/src/main/java/com/comet/opik/domain/FeedbackScoreDAO.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import java.util.stream.Collectors;
3333

3434
import static com.comet.opik.domain.AsyncContextUtils.bindUserNameAndWorkspace;
35-
import static com.comet.opik.infrastructure.DatabaseUtils.getLogComment;
36-
import static com.comet.opik.infrastructure.DatabaseUtils.getSTWithLogComment;
35+
import static com.comet.opik.infrastructure.FilterUtils.getLogComment;
36+
import static com.comet.opik.infrastructure.FilterUtils.getSTWithLogComment;
3737
import static com.comet.opik.utils.AsyncUtils.makeMonoContextAware;
3838

3939
@ImplementedBy(FeedbackScoreDAOImpl.class)

0 commit comments

Comments
 (0)