[OPIK-4383] [BE] Add experiment aggregate event listener and no-op publisher#5510
Open
thiagohora wants to merge 9 commits intothiagohora/OPIK-4383-redis-stream-subscriber-experiment-aggregatesfrom
Conversation
Contributor
Backend Tests Results 451 files + 15 451 suites +15 1h 2m 43s ⏱️ +57s For more details on these failures and errors, see this check. Results for commit 6634138. ± Comparison against base commit 4c9f6fc. ♻️ This comment has been updated with latest results. |
Contributor
SDK E2E Tests Results0 tests 0 ✅ 0s ⏱️ Results for commit 2d80a7d. ♻️ This comment has been updated with latest results. |
09e8736 to
2d80a7d
Compare
apps/opik-backend/src/main/java/com/comet/opik/api/events/CommentsCreated.java
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java
Outdated
Show resolved
Hide resolved
…-aggregates' into thiagohora/OPIK-4383_add_experiment_aggregate_event_listener
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
Contributor
…tener' of https://github.com/comet-ml/opik into thiagohora/OPIK-4383_add_experiment_aggregate_event_listener
apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentService.java
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentService.java
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/SpanService.java
Outdated
Show resolved
Hide resolved
Skip delete, cascading operations, and SpansDeleted event when getSpanIdsForTraces returns an empty set, preserving the original no-op behaviour and avoiding the Preconditions.checkArgument failure in SpanDAO.deleteByIds.
Two bugs prevented spans and attachments from being deleted when a trace was deleted via the event-driven cascade: 1. FeedbackScoreService.deleteByTraceIds/deleteBySpanIds had @nonnull on projectId which threw NPE when TracesDeleted.projectId() was null. EventInterceptor swallowed the NPE, stopping the entire cascade chain. Fix: remove @nonnull since the DAO already handles null safely via Optional.ofNullable(projectId). 2. SpanDAO.DELETE_BY_IDS had the wrong column (trace_id) and parameter name (span_ids) — the ClickHouse R2DBC driver could not resolve :span_ids as a named parameter in the DELETE statement. Fixed by using id IN :ids to match the working pattern in TraceDAO.DELETE_BY_ID.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Details
Implements the event-driven pipeline that triggers experiment aggregation recalculation when related entities are mutated.
New event classes
SpansUpdated,SpansDeleted— posted bySpanServiceFeedbackScoresCreated,FeedbackScoresUpdated,FeedbackScoresDeleted— posted byFeedbackScoreServiceCommentsCreated,CommentsUpdated,CommentsDeleted— posted byCommentServiceExperimentItemsCreated,ExperimentItemsDeleted— posted byExperimentItemServiceandExperimentItemBulkIngestionServiceExperimentUpdated— posted byExperimentServiceTracesUpdated— modified to carrytraceIdsin addition toprojectIdsNew files
ExperimentAggregateEventListener— central Guava@EagerSingletonevent bus listener that resolves experiment IDs, checks experiment status (COMPLETED/CANCELLED), and delegates to the publisherExperimentAggregationPublisher— interface + no-op stub implementation (TODO: debounce + Redis stream in follow-up PR)Modified services
SpanService,FeedbackScoreService,CommentService,ExperimentItemService,ExperimentService,ExperimentItemBulkIngestionService— injectEventBusand post the appropriate events after mutationsExperimentItemDAO— addsgetExperimentIdsByTraceIds()andgetExperimentIdsByItemIds()ClickHouse queriesPublisher status
The publisher is intentionally a no-op in this PR. The actual Redis stream enqueue with debounce will be implemented in a follow-up task once the subscriber infrastructure is merged.
Change checklist
Issues
Testing
ExperimentAggregateEventListenerTestcovers all event types, status filtering, disabled-config guards, and traceId → experimentId resolution pathspublisher.publish()is called with the correct experiment IDs only for experiments inCOMPLETED/CANCELLEDstatusDocumentation
N/A — internal pipeline change, no API or configuration changes visible to users