Skip to content

Update internal SharedFlow comments#4591

Draft
dkhalanskyjb wants to merge 4 commits intodevelopfrom
dkhalanskyjb/fix-sharedflow-cancellation
Draft

Update internal SharedFlow comments#4591
dkhalanskyjb wants to merge 4 commits intodevelopfrom
dkhalanskyjb/fix-sharedflow-cancellation

Conversation

@dkhalanskyjb
Copy link
Copy Markdown
Collaborator

Fixes #4583

Diff in the JS code (above is the result before the change):

diff build?/compileSync/js/test/testDevelopmentExecutable/kotlin/kotlinx-coroutines-core.js
13594c13594,13595
<             if (compare(element.index_1, new Long(0, 0)) >= 0 && compare(element.index_1, newMinCollectorIndex) < 0)
---
>             var containsArg = element.index_1;
>             if (compare(new Long(0, 0), containsArg) <= 0 ? compare(containsArg, newMinCollectorIndex) < 0 : false)
Fixes #4583

See the `SharedFlow.testCancellingSubscriberAndEmitterWithNoBuffer`
test for the problematic scenario, which may arise when
the next emitter and the last collector get cancelled simultaneously.

After the first iteration of the test,
the internal state of the `SharedFlow` was:

* buffer = [null, NO_VALUE]
* replayIndex = 2
* minCollectorIndex = 1
* bufferSize = 1
* queueSize = 0
* nCollectors = 0
* nextIndex = 0

This is clearly invalid, as `bufferSize` can not be anything but 0
with the default `MutableSharedFlow` configuration.

The problem was within the logic that handles unsubscription from
a `SharedFlow` without a buffer if the first emitter is also
cancelled.

There were two separate checks for the cancellation case--
importantly, in that order:
- If the last collector was cancelled,
  the part of the buffer outside the replay buffer was erased,
  since no one was going to consume it.
  This was achieved by ordering the start of the buffer to be
  at *right buffer boundary*.
- If the next emitter was cancelled *and* the buffer is disabled,
  the *right boundary* of the empty buffer was moved past the
  cancelled emitter.

Taken together, these two checks conspired to break the invariants:
if *both* conditions are true, then first, the buffer was set to be
empty, and then, its right boundary was shifted, resulting in a
non-empty buffer when it should be impossible logically.
/**
* A common data structure for `StateFlow` and `SharedFlow`.
*/
internal abstract class AbstractSharedFlow<This, S : AbstractSharedFlowSlot<This>> : SynchronizedObject() {
Copy link
Copy Markdown
Contributor

@LouisCAD LouisCAD Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is This a new Kotlin feature to refer to the type of the instance's subclass?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only a generic parameter name. Could as well be named T. That said, I was also startled when I saw This. Possibly rename into something less misleading.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, you can read it as T, I just decided to use a more accurate name.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I just love Github's time travelling machine :)

* `null` by default, created on demand.
* Each cell is also `null` by default, and the specific slot object is [created][createSlot] on demand.
* The whole array being `null` or a cell being `null` is equivalent to the cell not being
* [*allocated*][AbstractSharedFlowSlot.allocateLocked]--not to be confused with memory allocation, this means
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allocate/free slot was misleading naming, but at least it was consistent.

The current snapshot of all comments uses: register, inhabit, allocate to mean allocate.

Consider picking one term to describe comments. Possibly even replace it in code as well, so there's no need for a lengthy explanation.

Here's some options:
inhabit / free
register / deregister (a bit tricky because you can both register a collector and a slot)
inhabited slot / registered slot / occupied slot / assigned slot

(Another reason for this is because "allocate" used in its true sense at least twice in this file, on L81 and L87)

protected abstract fun createSlotArray(size: Int): Array<S?>

@Suppress("UNCHECKED_CAST")
/** Register a new collector and return its newly allocated slot. A slot may be [created][createSlot] or reused. */
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"may be either"

@@ -75,7 +125,8 @@ internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : Sync
subscriptionCount = _subscriptionCount // retrieve under lock if initialized
// Reset next index oracle if we have no more active collectors for more predictable behavior next time
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you removed the other mention/introduction of the term oracle, so now this comment is less clear

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The word oracle was not the best choice, but it was used in the same sense as "hint" in C++ map.emplace_hint. No action, just saying.

// take into account a special case of sync shared flow that can go past 1st queued emitter
if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++
forEachSlotLocked { slot ->
@Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you get rid of this now irrelevant suppress in all of the codebase (1 other usage)

/** A `StateFlow` representing [nCollectors], potentially with some delay. A user-visible API. */
val subscriptionCount: StateFlow<Int>
get() = synchronized(this) {
// allocate under lock in sync with nCollectors variable
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allocate -> initialize

}
}

/** Allocate a new implementation-representation of a collector, but do not register it anywhere yet. */
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the function's meaning but I can't parse "implementation-representation"

}
}

/** Allocate a new implementation-representation of a collector, but do not register it anywhere yet. */
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid "allocate" since it's implementation defined where the implementer gets their slots from. There may not be any allocations. Also, I'm not sure if it is common to refer to memory allocations in the JVM world?

/**
* A common data structure for `StateFlow` and `SharedFlow`.
*/
internal abstract class AbstractSharedFlow<This, S : AbstractSharedFlowSlot<This>> : SynchronizedObject() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only a generic parameter name. Could as well be named T. That said, I was also startled when I saw This. Possibly rename into something less misleading.

Comment on lines +56 to +63
/**
* A good starting index for looking for the next non-*allocated* slot in [slots].
*
* It is not guaranteed that this slot will not be *allocated*, nor is it guaranteed that it will be the first
* non-*allocated* slot.
* This is just a heuristic to have a better guess in common scenarios.
*/
private var nextIndex = 0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment says that nextIndex is a "good starting index" / "heuristics" / "guess" but doesn't say how this heuristic is used and what this even means. It does provide two options of what it is definitely not, which is useful, but just would be nice to define the object.

The next collector will be assigned to the first free slot at nextIndex or on the cyclical right of nextIndex.

@@ -10,19 +10,66 @@ import kotlin.jvm.*
@JvmField
internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First batch of comments. Feel free to address now or wait for the rest to come.

Comment on lines +28 to +29
* These continuations represent suspended emitters that were waiting for the slowest collector to move on
* so that the next value can be placed into the buffer.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the slowest collector (assigned to this slot) to move on

so that the next value -> so that their values

* get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers
* can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved
* using the `extraBufferCapacity` parameter.
*
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also fix all other IDE warnings/grammar/style in both files?

Image


internal class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
/**
* The first index in the buffer that was not yet consumed by this collector,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: that -> which, was not yet -> has not yet been

Comment on lines +305 to +306
* This field is used by the collector each time when it can not acquire the next value and suspends while
* waiting for the next one.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used how? Please formulate more precisely, otherwise it adds more mystery than removes. ("The collector's continuation is saved/written/preserved here" or something)

"cannot", "suspends waiting for the next value to arrive."

Comment on lines 343 to 346
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

head + totalSize is off by 1 character, could you move them?

Comment on lines +591 to +593
/* Walk through at most `maxResumeCount` emitters, moving their values to the buffer,
* zeroing out the cells the emitters were occupying, and storing their continuations in an array that
* will be returned from this function and processed once the lock gets released. */
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[IDE hint] Long sentences (41 words here) are harder to read, according to the research; consider splitting

A human confirms. Consider making it into a list, or somehow else rewriting it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make -> mark

) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
) : AbstractSharedFlow<SharedFlowImpl<*>, SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
/*
Logical structure of the buffer
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: buffer is a confusing naming and is used to mean 3 different things:

  1. the buffer array of buffered values (including the replay cache) and queued emitters
  2. the buffer of buffered values including the replay cache
  3. the buffer of buffered values excluding the replay cache

2 and 3 is more or less consistent in the kdoc and code and steer towards 2

But the meanings 1 and 2 are used intermittently throughout the code. Could we rename the buffer in this sense into the buffer array? To refer to the code representation.

Comment on lines +612 to +614
/** Represents how many values are stored in the buffer after processing the emitters.
* The value may be larger than [bufferCapacity] if [nCollectors] was 0,
* but the `minOf(replay)` will clamp this value to a valid one, since `replay <= bufferCapacity`. */
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value is always non-smaller than [bufferCapacity], and could be larger in another case. The head hasn't moved and still points to the original slowest possible collector. The newBufferEndIndex has started out as bufferEndIndex == head + bufferSize (at max that would be head + bufferCapacity) and could only increase.

Also "Represents how many values" -> "The number of values stored in the buffer after processing the emitters."

Comment on lines +623 to +625
/** If [nCollectors] was 0, there are no active collector slots,
* and the invariant dictates that [minCollectorIndex] should be [bufferEndIndex] (`replayIndex + bufferSize`).
* Ensure this directly. */
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original comment asserted that this invariant can in fact be broken, and thus this assignment is indeed required, and not just some unnecessary duct tape.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this point. The invariant can't be broken, we're under the lock. However, to ensure the invariant stays non-broken, this assignment is required.

val head = head
var newMinCollectorIndex = head + bufferSize
var newMinCollectorIndex = head + bufferSize // = this.bufferEndIndex
// take into account a special case of sync shared flow that can go past 1st queued emitter
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also replace this cryptic "sync shared flow" to "shared flow with no buffer", here and below, to consistently call this situation the same name.

@murfel murfel self-requested a review December 17, 2025 16:23
@dkhalanskyjb dkhalanskyjb changed the title Fix SharedFlow entering an invalid state Update internal SharedFlow comments Mar 31, 2026
@dkhalanskyjb dkhalanskyjb marked this pull request as draft March 31, 2026 11:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants