Skip to content

Commit de7779c

Browse files
VelikovPetarclaude
andauthored
Improve Message.createdLocallyAt creation logic using estimated server time (#6199)
* Fix createdLocallyAt using NTP-style server clock offset estimation Co-Authored-By: Claude <noreply@anthropic.com> * Pr remarks * Adjust thread message createdLocallyAt. * Ensure exceedsSyncThreshold is compared against estimated server time (where applicable). * Add max allowed offset. --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent d7ce8ed commit de7779c

File tree

15 files changed

+664
-12
lines changed

15 files changed

+664
-12
lines changed

stream-chat-android-client/src/main/java/io/getstream/chat/android/client/ChatClient.kt

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ import io.getstream.chat.android.client.extensions.ATTACHMENT_TYPE_FILE
101101
import io.getstream.chat.android.client.extensions.ATTACHMENT_TYPE_IMAGE
102102
import io.getstream.chat.android.client.extensions.cidToTypeAndId
103103
import io.getstream.chat.android.client.extensions.extractBaseUrl
104+
import io.getstream.chat.android.client.extensions.getCreatedAtOrNull
104105
import io.getstream.chat.android.client.extensions.internal.isLaterThanDays
105106
import io.getstream.chat.android.client.header.VersionPrefixHeader
106107
import io.getstream.chat.android.client.helpers.AppSettingManager
@@ -157,6 +158,7 @@ import io.getstream.chat.android.client.user.storage.SharedPreferencesCredential
157158
import io.getstream.chat.android.client.user.storage.UserCredentialStorage
158159
import io.getstream.chat.android.client.utils.ProgressCallback
159160
import io.getstream.chat.android.client.utils.TokenUtils
161+
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
160162
import io.getstream.chat.android.client.utils.mergePartially
161163
import io.getstream.chat.android.client.utils.message.ensureId
162164
import io.getstream.chat.android.client.utils.observable.ChatEventsObservable
@@ -286,6 +288,8 @@ internal constructor(
286288
@InternalStreamChatApi
287289
public val audioPlayer: AudioPlayer,
288290
private val now: () -> Date = ::Date,
291+
@InternalStreamChatApi
292+
public val serverClockOffset: ServerClockOffset,
289293
private val repository: ChatClientRepository,
290294
private val messageReceiptReporter: MessageReceiptReporter,
291295
internal val messageReceiptManager: MessageReceiptManager,
@@ -2588,16 +2592,34 @@ internal constructor(
25882592

25892593
/**
25902594
* Ensure the message has a [Message.createdLocallyAt] timestamp.
2591-
* If not, set it to the max of the channel's [Channel.lastMessageAt] + 1 millisecond and [now].
2592-
* This ensures that the message appears in the correct order in the channel.
2595+
* If not, set it to the max of the channel's [Channel.lastMessageAt] + 1 millisecond and the
2596+
* estimated server time. Using estimated server time (instead of raw local clock) prevents
2597+
* cross-user ordering issues when the device clock is skewed.
25932598
*/
25942599
private suspend fun Message.ensureCreatedLocallyAt(cid: String): Message {
2595-
val lastMessageAt = repositoryFacade.selectChannel(cid = cid)?.lastMessageAt
2596-
val lastMessageAtPlusOneMillisecond = lastMessageAt?.let {
2597-
Date(it.time + 1)
2600+
val parentId = this.parentId
2601+
if (parentId != null) {
2602+
// Thread reply
2603+
val lastMessage = repositoryFacade.selectMessagesForThread(parentId, limit = 1).lastOrNull()
2604+
val lastMessageAt = lastMessage?.getCreatedAtOrNull()
2605+
val lastMessageAtPlusOneMillisecond = lastMessageAt?.let {
2606+
Date(it.time + 1)
2607+
}
2608+
val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, serverClockOffset.estimatedServerTime())
2609+
return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt)
2610+
} else {
2611+
// Regular message
2612+
val (type, id) = cid.cidToTypeAndId()
2613+
// Fetch channel lastMessageAt from state, fallback to offline storage
2614+
val channelState = logicRegistry?.channelStateLogic(type, id)?.listenForChannelState()
2615+
val lastMessageAt = channelState?.channelData?.value?.lastMessageAt
2616+
?: repositoryFacade.selectChannel(cid = cid)?.lastMessageAt
2617+
val lastMessageAtPlusOneMillisecond = lastMessageAt?.let {
2618+
Date(it.time + 1)
2619+
}
2620+
val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, serverClockOffset.estimatedServerTime())
2621+
return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt)
25982622
}
2599-
val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, now())
2600-
return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt)
26012623
}
26022624

26032625
/**
@@ -5037,6 +5059,8 @@ internal constructor(
50375059
warmUpReflection()
50385060
}
50395061

5062+
val serverClockOffset = ServerClockOffset()
5063+
50405064
val module =
50415065
ChatModule(
50425066
appContext = appContext,
@@ -5055,6 +5079,7 @@ internal constructor(
50555079
lifecycle = lifecycle,
50565080
appName = this.appName,
50575081
appVersion = this.appVersion,
5082+
serverClockOffset = serverClockOffset,
50585083
)
50595084

50605085
val api = module.api()
@@ -5091,6 +5116,7 @@ internal constructor(
50915116
retryPolicy = retryPolicy,
50925117
appSettingsManager = appSettingsManager,
50935118
chatSocket = module.chatSocket,
5119+
serverClockOffset = serverClockOffset,
50945120
pluginFactories = pluginFactories,
50955121
repositoryFactoryProvider = repositoryFactoryProvider
50965122
?: pluginFactories

stream-chat-android-client/src/main/java/io/getstream/chat/android/client/di/ChatModule.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ import io.getstream.chat.android.client.uploader.FileUploader
8383
import io.getstream.chat.android.client.uploader.StreamFileUploader
8484
import io.getstream.chat.android.client.user.CurrentUserFetcher
8585
import io.getstream.chat.android.client.utils.HeadersUtil
86+
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
8687
import io.getstream.chat.android.models.UserId
8788
import io.getstream.log.StreamLog
8889
import okhttp3.Interceptor
@@ -116,6 +117,7 @@ import java.util.concurrent.TimeUnit
116117
* @param lifecycle Host [Lifecycle] used to observe app foreground/background and manage socket behavior.
117118
* @param appName Optional app name added to default headers for tracking.
118119
* @param appVersion Optional app version added to default headers for tracking.
120+
* @param serverClockOffset Shared clock-offset tracker used by the socket layer for time synchronisation.
119121
*/
120122
@Suppress("TooManyFunctions")
121123
internal class ChatModule
@@ -137,6 +139,7 @@ constructor(
137139
private val lifecycle: Lifecycle,
138140
private val appName: String?,
139141
private val appVersion: String?,
142+
private val serverClockOffset: ServerClockOffset,
140143
) {
141144

142145
private val headersUtil = HeadersUtil(appContext, appName, appVersion)
@@ -311,6 +314,7 @@ constructor(
311314
lifecycleObserver,
312315
networkStateProvider,
313316
clientDebugger,
317+
serverClockOffset,
314318
)
315319

316320
private fun buildApi(chatConfig: ChatClientConfig): ChatApi = ProxyChatApi(

stream-chat-android-client/src/main/java/io/getstream/chat/android/client/socket/ChatSocket.kt

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import io.getstream.chat.android.client.network.NetworkStateProvider
3030
import io.getstream.chat.android.client.scope.UserScope
3131
import io.getstream.chat.android.client.socket.ChatSocketStateService.State
3232
import io.getstream.chat.android.client.token.TokenManager
33+
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
3334
import io.getstream.chat.android.core.internal.coroutines.DispatcherProvider
3435
import io.getstream.chat.android.models.User
3536
import io.getstream.log.taggedLogger
@@ -52,6 +53,7 @@ internal open class ChatSocket(
5253
private val lifecycleObserver: StreamLifecycleObserver,
5354
private val networkStateProvider: NetworkStateProvider,
5455
private val clientDebugger: ChatClientDebugger? = null,
56+
private val serverClockOffset: ServerClockOffset,
5557
) {
5658
private var streamWebSocket: StreamWebSocket? = null
5759
private val logger by taggedLogger(TAG)
@@ -61,7 +63,13 @@ internal open class ChatSocket(
6163
private var socketStateObserverJob: Job? = null
6264
private val healthMonitor = HealthMonitor(
6365
userScope = userScope,
64-
checkCallback = { (chatSocketStateService.currentState as? State.Connected)?.event?.let(::sendEvent) },
66+
checkCallback = {
67+
(chatSocketStateService.currentState as? State.Connected)?.event?.let {
68+
if (sendEvent(it)) {
69+
serverClockOffset.onHealthCheckSent()
70+
}
71+
}
72+
},
6573
reconnectCallback = { chatSocketStateService.onWebSocketEventLost() },
6674
)
6775
private val lifecycleHandler = object : LifecycleHandler {
@@ -84,6 +92,7 @@ internal open class ChatSocket(
8492
socketListenerJob?.cancel()
8593
when (networkStateProvider.isConnected()) {
8694
true -> {
95+
serverClockOffset.onConnectionStarted()
8796
streamWebSocket = socketFactory.createSocket(connectionConf).apply {
8897
socketListenerJob = listen().onEach {
8998
when (it) {
@@ -194,8 +203,14 @@ internal open class ChatSocket(
194203

195204
private suspend fun handleEvent(chatEvent: ChatEvent) {
196205
when (chatEvent) {
197-
is ConnectedEvent -> chatSocketStateService.onConnectionEstablished(chatEvent)
198-
is HealthEvent -> healthMonitor.ack()
206+
is ConnectedEvent -> {
207+
serverClockOffset.onConnected(chatEvent.createdAt)
208+
chatSocketStateService.onConnectionEstablished(chatEvent)
209+
}
210+
is HealthEvent -> {
211+
serverClockOffset.onHealthCheck(chatEvent.createdAt)
212+
healthMonitor.ack()
213+
}
199214
else -> callListeners { listener -> listener.onEvent(chatEvent) }
200215
}
201216
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
3+
*
4+
* Licensed under the Stream License;
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://github.com/GetStream/stream-chat-android/blob/main/LICENSE
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.getstream.chat.android.client.utils.internal
18+
19+
import io.getstream.chat.android.client.events.ConnectedEvent
20+
import io.getstream.chat.android.client.events.HealthEvent
21+
import io.getstream.chat.android.core.internal.InternalStreamChatApi
22+
import java.util.Date
23+
24+
/**
25+
* Tracks the offset between the local device clock and the server clock using
26+
* NTP-style estimation from WebSocket health check round-trips.
27+
*
28+
* The algorithm keeps only the sample with the lowest observed RTT, since a
29+
* smaller round-trip means less room for network asymmetry to distort the
30+
* measurement. Under the assumption that clock skew is constant for the
31+
* duration of a session, the estimate monotonically improves over time.
32+
*
33+
* Thread-safe: single-field writes use [Volatile] for visibility; compound
34+
* read-modify-write sequences are guarded by [lock] for atomicity.
35+
*
36+
* @param localTimeMs Clock source for the local device time (injectable for tests).
37+
* @param maxRttMs Upper bound on plausible RTT. Samples exceeding this are
38+
* discarded as stale or mismatched. Defaults to the health check cycle
39+
* interval (MONITOR_INTERVAL + HEALTH_CHECK_INTERVAL = 11 000 ms).
40+
* @param maxOffsetMs Upper bound on the absolute value of the computed clock offset.
41+
* If the derived offset exceeds this threshold the sample is considered unreliable
42+
* (e.g. a stale / static server timestamp in a test environment) and the offset is
43+
* reset to zero so that [estimatedServerTime] falls back to the raw local time.
44+
* Defaults to 1 hour, which is already far beyond any real-world NTP drift.
45+
*/
46+
@InternalStreamChatApi
47+
public class ServerClockOffset(
48+
private val localTimeMs: () -> Long = { System.currentTimeMillis() },
49+
private val maxRttMs: Long = DEFAULT_MAX_RTT_MS,
50+
private val maxOffsetMs: Long = DEFAULT_MAX_OFFSET_MS,
51+
) {
52+
53+
private val lock = Any()
54+
55+
@Volatile
56+
private var offsetMs: Long = 0L
57+
58+
@Volatile
59+
private var bestRttMs: Long = Long.MAX_VALUE
60+
61+
@Volatile
62+
private var healthCheckSentAtMs: Long = 0L
63+
64+
@Volatile
65+
private var connectionStartedAtMs: Long = 0L
66+
67+
/**
68+
* Record the local time immediately before starting a WebSocket connection.
69+
* When the next [ConnectedEvent] arrives, [onConnected] will pair with this
70+
* timestamp to compute the offset using the NTP midpoint formula.
71+
*/
72+
internal fun onConnectionStarted() {
73+
connectionStartedAtMs = localTimeMs()
74+
}
75+
76+
/**
77+
* Record the local time immediately before sending a health check echo.
78+
* The next [onHealthCheck] call will pair with this timestamp to compute RTT.
79+
*/
80+
internal fun onHealthCheckSent() {
81+
healthCheckSentAtMs = localTimeMs()
82+
}
83+
84+
/**
85+
* Calibration from a [ConnectedEvent].
86+
*
87+
* If [onConnectionStarted] was called before this connection (e.g. right before
88+
* opening the WebSocket), uses the NTP midpoint of (connectionStartedAt, receivedAt)
89+
* and serverTime for a more accurate offset. Otherwise falls back to a naive
90+
* `localTime - serverTime` estimate.
91+
*
92+
* Resets health check state, since a new connection means any in-flight health
93+
* check from the previous connection is stale.
94+
*/
95+
internal fun onConnected(serverTime: Date) {
96+
synchronized(lock) {
97+
bestRttMs = Long.MAX_VALUE
98+
healthCheckSentAtMs = 0L
99+
offsetMs = 0L
100+
101+
val receivedAtMs = localTimeMs()
102+
val startedAtMs = connectionStartedAtMs
103+
connectionStartedAtMs = 0L
104+
105+
if (startedAtMs > 0L) {
106+
val rtt = receivedAtMs - startedAtMs
107+
if (rtt in 1..maxRttMs) {
108+
acceptOffset((startedAtMs + receivedAtMs) / 2 - serverTime.time)
109+
bestRttMs = rtt
110+
return
111+
}
112+
}
113+
acceptOffset(receivedAtMs - serverTime.time)
114+
}
115+
}
116+
117+
/**
118+
* Refine the offset using a [HealthEvent] paired with [onHealthCheckSent].
119+
*
120+
* Computes RTT from the stored send time and the current receive time,
121+
* then applies the NTP midpoint formula:
122+
* ```
123+
* offset = (sentAt + receivedAt) / 2 - serverTime
124+
* ```
125+
*
126+
* The sample is accepted only if:
127+
* - There is a pending [onHealthCheckSent] timestamp.
128+
* - RTT is positive (guards against clock anomalies).
129+
* - RTT is below [maxRttMs] (rejects stale / mismatched pairs).
130+
* - RTT is lower than any previous sample (min-RTT selection).
131+
*/
132+
internal fun onHealthCheck(serverTime: Date) {
133+
synchronized(lock) {
134+
val sentAtMs = healthCheckSentAtMs
135+
if (sentAtMs <= 0L) return
136+
healthCheckSentAtMs = 0L
137+
138+
val receivedAtMs = localTimeMs()
139+
val rtt = receivedAtMs - sentAtMs
140+
if (rtt !in 1..maxRttMs) return
141+
142+
if (rtt < bestRttMs) {
143+
bestRttMs = rtt
144+
acceptOffset((sentAtMs + receivedAtMs) / 2 - serverTime.time)
145+
}
146+
}
147+
}
148+
149+
/**
150+
* Returns the current time adjusted to the server timescale.
151+
*
152+
* Before the first [onConnected] call, this returns the raw local time
153+
* (offset = 0).
154+
*/
155+
@InternalStreamChatApi
156+
public fun estimatedServerTime(): Date =
157+
Date(localTimeMs() - offsetMs)
158+
159+
/**
160+
* Accepts [candidate] as the new [offsetMs] only when its absolute value is within
161+
* [maxOffsetMs]. Offsets that are implausibly large (e.g. produced by a stale or
162+
* static server timestamp) are silently discarded and [offsetMs] is left unchanged.
163+
*
164+
* Note: callers that want a rejected offset to reset to zero (e.g. [onConnected])
165+
* should set [offsetMs] = 0 before calling this function.
166+
*/
167+
private fun acceptOffset(candidate: Long) {
168+
if (kotlin.math.abs(candidate) <= maxOffsetMs) {
169+
offsetMs = candidate
170+
}
171+
}
172+
173+
internal companion object {
174+
internal const val DEFAULT_MAX_RTT_MS = 11_000L
175+
internal const val DEFAULT_MAX_OFFSET_MS = 3_600_000L // 1 hour
176+
}
177+
}

stream-chat-android-client/src/test/java/io/getstream/chat/android/client/ChatClientConnectionTests.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import io.getstream.chat.android.client.token.FakeTokenManager
3434
import io.getstream.chat.android.client.user.CredentialConfig
3535
import io.getstream.chat.android.client.user.storage.UserCredentialStorage
3636
import io.getstream.chat.android.client.utils.TokenUtils
37+
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
3738
import io.getstream.chat.android.models.ConnectionData
3839
import io.getstream.chat.android.models.EventType
3940
import io.getstream.chat.android.models.GuestUser
@@ -126,6 +127,7 @@ internal class ChatClientConnectionTests {
126127
retryPolicy = mock(),
127128
appSettingsManager = mock(),
128129
chatSocket = fakeChatSocket,
130+
serverClockOffset = ServerClockOffset(),
129131
pluginFactories = emptyList(),
130132
repositoryFactoryProvider = NoOpRepositoryFactory.Provider,
131133
mutableClientState = mutableClientState,

stream-chat-android-client/src/test/java/io/getstream/chat/android/client/ChatClientTest.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import io.getstream.chat.android.client.scope.UserTestScope
3737
import io.getstream.chat.android.client.socket.FakeChatSocket
3838
import io.getstream.chat.android.client.token.FakeTokenManager
3939
import io.getstream.chat.android.client.utils.TokenUtils
40+
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
4041
import io.getstream.chat.android.client.utils.retry.NoRetryPolicy
4142
import io.getstream.chat.android.models.ConnectionState
4243
import io.getstream.chat.android.models.EventType
@@ -138,6 +139,7 @@ internal class ChatClientTest {
138139
retryPolicy = NoRetryPolicy(),
139140
appSettingsManager = mock(),
140141
chatSocket = fakeChatSocket,
142+
serverClockOffset = ServerClockOffset(),
141143
pluginFactories = emptyList(),
142144
mutableClientState = Mother.mockedClientState(),
143145
repositoryFactoryProvider = NoOpRepositoryFactory.Provider,

0 commit comments

Comments
 (0)