Skip to content

Commit d330b2d

Browse files
committed
refactor: enhance WAITFOR EVENT handling with result propagation, improve iteration and recovery logic, and add timeout-aware event filtering
1 parent 9d92d56 commit d330b2d

File tree

3 files changed

+49
-11
lines changed

3 files changed

+49
-11
lines changed

pkg/compiler/internal/wait_event.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (c *WaitCompiler) compileEvent(ctx fql.IWaitForEventExpressionContext) byte
3434
end := c.ctx.Program.Emitter.NewLabel()
3535

3636
c.ctx.Program.Emitter.MarkLabel(start)
37-
c.emitWaitEventIteration(ctx, state, streamReg, bytecode.NoopOperand, start, end)
37+
c.emitWaitEventIteration(ctx, state, streamReg, resultReg, bytecode.NoopOperand, start, end)
3838

3939
c.ctx.Program.Emitter.MarkLabel(end)
4040
c.emitWaitEventCleanup(state, streamReg)
@@ -65,7 +65,7 @@ func (c *WaitCompiler) compileEventWithTimeoutRecovery(
6565
cleanup := c.ctx.Program.Emitter.NewLabel()
6666

6767
c.ctx.Program.Emitter.MarkLabel(start)
68-
c.emitWaitEventIteration(ctx, state, streamReg, timeoutStateReg, start, iterationDone)
68+
c.emitWaitEventIteration(ctx, state, streamReg, resultReg, timeoutStateReg, start, iterationDone)
6969

7070
c.ctx.Program.Emitter.EmitJump(cleanup)
7171
c.ctx.Program.Emitter.MarkLabel(iterationDone)
@@ -112,7 +112,7 @@ func (c *WaitCompiler) emitWaitEventStreamSetup(state waitEventCompileState, str
112112
func (c *WaitCompiler) emitWaitEventIteration(
113113
ctx fql.IWaitForEventExpressionContext,
114114
state waitEventCompileState,
115-
streamReg, timeoutStateReg bytecode.Operand,
115+
streamReg, resultReg, timeoutStateReg bytecode.Operand,
116116
restartLabel, doneLabel core.Label,
117117
) {
118118
c.ctx.Program.Emitter.WithSpan(state.span, func() {
@@ -124,16 +124,26 @@ func (c *WaitCompiler) emitWaitEventIteration(
124124
c.ctx.Program.Emitter.EmitIterNext(streamReg, doneLabel)
125125
})
126126

127-
if filter := ctx.EventFilterClause(); filter != nil {
128-
eventValReg, _ := c.ctx.Function.Symbols.DeclareLocal(core.PseudoVariable, core.TypeUnknown)
129-
127+
filter := ctx.EventFilterClause()
128+
if filter == nil {
130129
c.ctx.Program.Emitter.WithSpan(state.span, func() {
131-
c.ctx.Program.Emitter.EmitAB(bytecode.OpIterValue, eventValReg, streamReg)
130+
c.ctx.Program.Emitter.EmitIterValue(resultReg, streamReg)
132131
})
133-
134-
cond := c.exprs.CompileWithImplicitCurrent(filter.Expression())
135-
c.ctx.Program.Emitter.EmitJumpIfFalse(cond, restartLabel)
132+
return
136133
}
134+
135+
eventValReg, _ := c.ctx.Function.Symbols.DeclareLocal(core.PseudoVariable, core.TypeUnknown)
136+
137+
c.ctx.Program.Emitter.WithSpan(state.span, func() {
138+
c.ctx.Program.Emitter.EmitIterValue(eventValReg, streamReg)
139+
})
140+
141+
cond := c.exprs.CompileWithImplicitCurrent(filter.Expression())
142+
c.ctx.Program.Emitter.EmitJumpIfFalse(cond, restartLabel)
143+
144+
c.ctx.Program.Emitter.WithSpan(state.span, func() {
145+
c.ctx.Program.Emitter.EmitMove(resultReg, eventValReg)
146+
})
137147
}
138148

139149
func (c *WaitCompiler) emitWaitEventCleanup(state waitEventCompileState, streamReg bytecode.Operand) {

pkg/compiler/internal/wait_recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func (c *WaitCompiler) buildProtectedEventRecovery(
152152
routeRecovery := c.ctx.Program.Emitter.NewLabel("waitfor", "event", "recover")
153153

154154
c.ctx.Program.Emitter.MarkLabel(start)
155-
c.emitWaitEventIteration(ctx, state, streamReg, timeoutStateReg, start, iterationDone)
155+
c.emitWaitEventIteration(ctx, state, streamReg, resultReg, timeoutStateReg, start, iterationDone)
156156

157157
c.ctx.Program.Emitter.EmitJump(cleanup)
158158
c.ctx.Program.Emitter.MarkLabel(iterationDone)

test/integration/vm/vm_waitfor_event_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ RETURN status`, "error", "WAITFOR EVENT should choose ON ERROR for runtime failu
4242
LET status = (WAITFOR EVENT "test" IN obj TIMEOUT 1ms) ON TIMEOUT RETURN "timeout" ON ERROR RETURN "error"
4343
4444
RETURN status`, "error", "Grouped WAITFOR EVENT should choose ON ERROR for runtime failures"),
45+
S(`LET obs = @obs
46+
47+
LET evt = WAITFOR EVENT "test" IN obs
48+
49+
RETURN evt.type`, "match", "WAITFOR EVENT should return the received event value").Env(vm.WithParams(map[string]runtime.Value{
50+
"obs": matchFirst,
51+
})),
4552
Fn(`LET obs = @obs
4653
WAITFOR EVENT "test" IN obs WHEN .type == "match"
4754
RETURN 1`, ObservableReturnOneAndReads(matchFirst, 1)).Env(vm.WithParams(map[string]runtime.Value{
@@ -54,6 +61,27 @@ RETURN 1`, ObservableReturnOneAndReads(matchSecond, 2)).Env(vm.WithParams(map[st
5461
})),
5562
S(`LET obs = @obs
5663
64+
LET evt = WAITFOR EVENT "test" IN obs WHEN .type == "match"
65+
66+
RETURN evt.type`, "match", "WAITFOR EVENT filter should return the matched event value").Env(vm.WithParams(map[string]runtime.Value{
67+
"obs": matchSecond,
68+
})),
69+
S(`LET obs = @obs
70+
71+
LET evt = WAITFOR EVENT "test" IN obs TIMEOUT 1ms ON TIMEOUT RETURN NONE
72+
73+
RETURN evt.type`, "match", "WAITFOR EVENT timeout-aware success should return the event value").Env(vm.WithParams(map[string]runtime.Value{
74+
"obs": matchFirst,
75+
})),
76+
S(`LET obs = @obs
77+
78+
LET evt = WAITFOR EVENT "test" IN obs ON ERROR RETURN NONE
79+
80+
RETURN evt.type`, "match", "WAITFOR EVENT protected recovery success should return the event value").Env(vm.WithParams(map[string]runtime.Value{
81+
"obs": matchFirst,
82+
})),
83+
S(`LET obs = @obs
84+
5785
LET status = WAITFOR EVENT "test" IN obs TIMEOUT 1ms ON TIMEOUT RETURN "timeout" ON ERROR RETURN "error"
5886
5987
RETURN status`, "timeout", "WAITFOR EVENT should choose ON TIMEOUT when the stream times out").Env(vm.WithParams(map[string]runtime.Value{

0 commit comments

Comments
 (0)