Skip to content

Commit d131547

Browse files
authored
fix(rust): import_batch in multithread env (#796)
* fix(rust): import_batch in multithread env Batch import may cause the doc enter a temporary detached state. If there are local edits happen during this phase, they may end up with errors like 'AutoCommitNotStarted' or read-only errors. This temporary phase should not be visible to users. This fix will lock the transaction object during the temporary detached state so it's not visible to another thread. * docs: add comment to code
1 parent 0fe9681 commit d131547

File tree

4 files changed

+102
-8
lines changed

4 files changed

+102
-8
lines changed

crates/loro-internal/src/loro.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,12 @@ impl LoroDoc {
208208
/// It only returns Some(options_of_the_empty_txn) when the txn is empty
209209
#[inline]
210210
#[must_use]
211-
pub fn commit_then_stop(&self) -> (Option<CommitOptions>, LoroMutexGuard<'_, Option<Transaction>>) {
211+
pub fn commit_then_stop(
212+
&self,
213+
) -> (
214+
Option<CommitOptions>,
215+
LoroMutexGuard<'_, Option<Transaction>>,
216+
) {
212217
let (a, b) = self.commit_with(CommitOptions::new().immediate_renew(false));
213218
(a, b.unwrap())
214219
}
@@ -1176,13 +1181,35 @@ impl LoroDoc {
11761181
});
11771182

11781183
let (options, txn) = self.commit_then_stop();
1179-
drop(txn);
1184+
// Why we should keep locking `txn` here
1185+
//
1186+
// In a multi-threaded environment, `import_batch` used to drop the txn lock
1187+
// (via `commit_then_stop` + `drop(txn)`) and call `detach()`/`checkout_to_latest()`
1188+
// around the batch import. That created a race where another thread could
1189+
// start or renew the auto-commit txn and perform local edits while we were
1190+
// importing and temporarily detached. Those interleaved local edits could
1191+
// violate invariants between `OpLog` and `DocState` (e.g., state being
1192+
// updated when we expect it not to, missed events, or inconsistent
1193+
// frontiers), as exposed by the loom test `local_edits_during_batch_import`.
1194+
//
1195+
// The fix is to hold the txn mutex for the entire critical section:
1196+
// - Stop the current txn and keep the mutex guard.
1197+
// - Force-detach with `set_detached(true)` (avoids `detach()` side effects),
1198+
// then run each `_import_with(...)` while detached so imports only touch
1199+
// the `OpLog`.
1200+
// - After importing, reattach by checking out to latest and renew the txn
1201+
// using `_checkout_to_latest_with_guard`, which keeps the mutex held while
1202+
// (re)starting the auto-commit txn.
1203+
//
1204+
// Holding the lock ensures no concurrent thread can create/renew a txn and
1205+
// do local edits in the middle of the batch import, making the whole
1206+
// operation atomic with respect to local edits.
11801207
let is_detached = self.is_detached();
1181-
self.detach();
1208+
self.set_detached(true);
11821209
self.oplog.lock().unwrap().batch_importing = true;
11831210
let mut err = None;
11841211
for (_meta, data) in meta_arr {
1185-
match self.import(data) {
1212+
match self._import_with(data, Default::default()) {
11861213
Ok(s) => {
11871214
for (peer, (start, end)) in s.success.iter() {
11881215
match success.0.entry(*peer) {
@@ -1218,9 +1245,10 @@ impl LoroDoc {
12181245
let mut oplog = self.oplog.lock().unwrap();
12191246
oplog.batch_importing = false;
12201247
drop(oplog);
1221-
12221248
if !is_detached {
1223-
self.checkout_to_latest();
1249+
self._checkout_to_latest_with_guard(txn);
1250+
} else {
1251+
drop(txn);
12241252
}
12251253

12261254
self.renew_txn_if_auto_commit(options);
@@ -1269,6 +1297,16 @@ impl LoroDoc {
12691297
self.renew_txn_if_auto_commit(options);
12701298
}
12711299

1300+
fn _checkout_to_latest_with_guard(&self, guard: LoroMutexGuard<Option<Transaction>>) {
1301+
if !self.is_detached() {
1302+
self._renew_txn_if_auto_commit_with_guard(None, guard);
1303+
return;
1304+
}
1305+
1306+
self._checkout_to_latest_without_commit(true);
1307+
self._renew_txn_if_auto_commit_with_guard(None, guard);
1308+
}
1309+
12721310
/// NOTE: The caller of this method should ensure the txn is locked and set to None
12731311
pub(crate) fn _checkout_to_latest_without_commit(&self, to_commit_then_renew: bool) {
12741312
tracing::info_span!("CheckoutToLatest", peer = self.peer_id()).in_scope(|| {

crates/loro-internal/src/txn.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
event::{Diff, ListDeltaMeta, TextDiff},
2727
handler::{Handler, ValueOrHandler},
2828
id::{Counter, PeerID, ID},
29-
lock::LoroMutex,
29+
lock::{LoroMutex, LoroMutexGuard},
3030
loro::CommitOptions,
3131
op::{Op, RawOp, RawOpContent},
3232
pre_commit::{ChangeModifier, PreCommitCallbackPayload},
@@ -120,6 +120,25 @@ impl crate::LoroDoc {
120120
self_txn.replace(txn);
121121
}
122122
}
123+
124+
#[inline]
125+
pub(crate) fn _renew_txn_if_auto_commit_with_guard(
126+
&self,
127+
options: Option<CommitOptions>,
128+
mut guard: LoroMutexGuard<Option<Transaction>>,
129+
) {
130+
if self.auto_commit.load(std::sync::atomic::Ordering::Acquire) && self.can_edit() {
131+
if guard.is_some() {
132+
return;
133+
}
134+
135+
let mut txn = self.txn().unwrap();
136+
if let Some(options) = options {
137+
txn.set_options(options);
138+
}
139+
guard.replace(txn);
140+
}
141+
}
123142
}
124143

125144
pub(crate) type OnCommitFn =

crates/loro/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ impl Default for LoroDoc {
112112
}
113113

114114
impl Clone for LoroDoc {
115+
/// This creates a reference clone, not a deep clone. The cloned doc will share the same
116+
/// underlying doc as the original one.
117+
///
118+
/// For deep clone, please use the `.fork()` method.
115119
fn clone(&self) -> Self {
116120
let doc = self.doc.clone();
117121
LoroDoc::_new(doc)

crates/loro/tests/multi_thread_test.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
mod loom_test {
33

44
use loom::{explore, stop_exploring, sync::atomic::AtomicUsize, thread::yield_now};
5-
use loro::{ExportMode, LoroDoc};
5+
use loro::{ExportMode, LoroDoc, UpdateOptions};
66
use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
77

88
#[test]
@@ -264,4 +264,37 @@ mod loom_test {
264264
}
265265
});
266266
}
267+
268+
#[test]
269+
fn local_edits_during_batch_import() {
270+
let mut builder = loom::model::Builder::new();
271+
builder.max_branches = 3000;
272+
builder.check(move || {
273+
let doc = LoroDoc::new();
274+
doc.get_text("text").insert(0, "hello").unwrap();
275+
let update_a = doc.export(ExportMode::all_updates()).unwrap();
276+
doc.get_text("text")
277+
.update("yo! hello", UpdateOptions::default())
278+
.unwrap();
279+
let update_b = doc.export(ExportMode::all_updates()).unwrap();
280+
281+
let mut handlers = vec![];
282+
let doc = LoroDoc::new();
283+
let doc_clone = doc.clone();
284+
285+
handlers.push(loom::thread::spawn(move || {
286+
doc_clone.get_text("text").insert(0, "1").unwrap();
287+
doc_clone.commit();
288+
doc_clone.get_text("text").insert(0, "1").unwrap();
289+
doc_clone.commit();
290+
}));
291+
292+
handlers.push(loom::thread::spawn(move || {
293+
doc.import_batch(&[update_a, update_b]).unwrap();
294+
}));
295+
for h in handlers {
296+
h.join().unwrap();
297+
}
298+
});
299+
}
267300
}

0 commit comments

Comments
 (0)