copies-rust: move CPU-heavy Rust processing into a child thread
authorSimon Sapin <simon.sapin@octobus.net>
Wed, 06 Jan 2021 14:09:01 +0100
changeset 46588 47557ea79fc7
parent 46587 cb4b0b0c6de4
child 46589 620c88fb42a2
copies-rust: move CPU-heavy Rust processing into a child thread … that runs in parallel with the parent thread fetching data. This can be disabled through a new config. CLI example: hg --config=devel.copy-tracing.multi-thread=no For now both threads use the GIL, later commits will reduce this. Differential Revision: https://phab.mercurial-scm.org/D9684
mercurial/configitems.py
mercurial/copies.py
rust/Cargo.lock
rust/hg-cpython/Cargo.toml
rust/hg-cpython/src/copy_tracing.rs
--- a/mercurial/configitems.py	Tue Jan 05 21:02:00 2021 +0100
+++ b/mercurial/configitems.py	Wed Jan 06 14:09:01 2021 +0100
@@ -700,6 +700,11 @@
 )
 coreconfigitem(
     b'devel',
+    b'copy-tracing.multi-thread',
+    default=True,
+)
+coreconfigitem(
+    b'devel',
     b'debug.extensions',
     default=False,
 )
--- a/mercurial/copies.py	Tue Jan 05 21:02:00 2021 +0100
+++ b/mercurial/copies.py	Wed Jan 06 14:09:01 2021 +0100
@@ -274,6 +274,7 @@
     revs = cl.findmissingrevs(common=[a.rev()], heads=[b.rev()])
     roots = set()
     has_graph_roots = False
+    multi_thread = repo.ui.configbool(b'devel', b'copy-tracing.multi-thread')
 
     # iterate over `only(B, A)`
     for r in revs:
@@ -321,7 +322,13 @@
                     children_count[p] += 1
         revinfo = _revinfo_getter(repo, match)
         return _combine_changeset_copies(
-            revs, children_count, b.rev(), revinfo, match, isancestor
+            revs,
+            children_count,
+            b.rev(),
+            revinfo,
+            match,
+            isancestor,
+            multi_thread,
         )
     else:
         # When not using side-data, we will process the edges "from" the parent.
@@ -346,7 +353,7 @@
 
 
 def _combine_changeset_copies(
-    revs, children_count, targetrev, revinfo, match, isancestor
+    revs, children_count, targetrev, revinfo, match, isancestor, multi_thread
 ):
     """combine the copies information for each item of iterrevs
 
@@ -363,7 +370,7 @@
 
     if rustmod is not None:
         final_copies = rustmod.combine_changeset_copies(
-            list(revs), children_count, targetrev, revinfo
+            list(revs), children_count, targetrev, revinfo, multi_thread
         )
     else:
         isancestor = cached_is_ancestor(isancestor)
--- a/rust/Cargo.lock	Tue Jan 05 21:02:00 2021 +0100
+++ b/rust/Cargo.lock	Wed Jan 06 14:09:01 2021 +0100
@@ -331,6 +331,7 @@
 version = "0.1.0"
 dependencies = [
  "cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-channel 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
  "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "hg-core 0.1.0",
  "libc 0.2.81 (registry+https://github.com/rust-lang/crates.io-index)",
--- a/rust/hg-cpython/Cargo.toml	Tue Jan 05 21:02:00 2021 +0100
+++ b/rust/hg-cpython/Cargo.toml	Wed Jan 06 14:09:01 2021 +0100
@@ -22,6 +22,7 @@
 python3-bin = ["cpython/python3-sys"]
 
 [dependencies]
+crossbeam-channel = "0.4"
 hg-core = { path = "../hg-core"}
 libc = '*'
 log = "0.4.8"
--- a/rust/hg-cpython/src/copy_tracing.rs	Tue Jan 05 21:02:00 2021 +0100
+++ b/rust/hg-cpython/src/copy_tracing.rs	Wed Jan 06 14:09:01 2021 +0100
@@ -22,6 +22,7 @@
     children_count: PyDict,
     target_rev: Revision,
     rev_info: PyObject,
+    multi_thread: bool,
 ) -> PyResult<PyDict> {
     let children_count = children_count
         .items(py)
@@ -42,20 +43,81 @@
         Ok((rev, p1, p2, opt_bytes))
     });
 
-    let mut combine_changeset_copies =
-        CombineChangesetCopies::new(children_count);
+    let path_copies = if !multi_thread {
+        let mut combine_changeset_copies =
+            CombineChangesetCopies::new(children_count);
+
+        for rev_info in revs_info {
+            let (rev, p1, p2, opt_bytes) = rev_info?;
+            let files = match &opt_bytes {
+                Some(bytes) => ChangedFiles::new(bytes.data(py)),
+                // Python None was extracted to Option::None,
+                // meaning there was no copy data.
+                None => ChangedFiles::new_empty(),
+            };
+
+            combine_changeset_copies.add_revision(rev, p1, p2, files)
+        }
+        combine_changeset_copies.finish(target_rev)
+    } else {
+        // Use a bounded channel to provide back-pressure:
+        // if the child thread is slower to process revisions than this thread
+        // is to gather data for them, an unbounded channel would keep
+        // growing and eat memory.
+        //
+        // TODO: tweak the bound?
+        let (rev_info_sender, rev_info_receiver) =
+            crossbeam_channel::bounded::<RevInfo>(1000);
 
-    for rev_info in revs_info {
-        let (rev, p1, p2, opt_bytes) = rev_info?;
-        let files = match &opt_bytes {
-            Some(bytes) => ChangedFiles::new(bytes.data(py)),
-            // value was presumably None, meaning they was no copy data.
-            None => ChangedFiles::new_empty(),
-        };
+        // Start a thread that does CPU-heavy processing in parallel with the
+        // loop below.
+        //
+        // If the parent thread panics, `rev_info_sender` will be dropped and
+        // “disconnected”. `rev_info_receiver` will be notified of this and
+        // exit its own loop.
+        let thread = std::thread::spawn(move || {
+            let mut combine_changeset_copies =
+                CombineChangesetCopies::new(children_count);
+            for (rev, p1, p2, opt_bytes) in rev_info_receiver {
+                let gil = Python::acquire_gil();
+                let py = gil.python();
+                let files = match &opt_bytes {
+                    Some(raw) => ChangedFiles::new(raw.data(py)),
+                    // Python None was extracted to Option::None,
+                    // meaning there was no copy data.
+                    None => ChangedFiles::new_empty(),
+                };
+                combine_changeset_copies.add_revision(rev, p1, p2, files)
+            }
+
+            combine_changeset_copies.finish(target_rev)
+        });
 
-        combine_changeset_copies.add_revision(rev, p1, p2, files)
-    }
-    let path_copies = combine_changeset_copies.finish(target_rev);
+        for rev_info in revs_info {
+            let (rev, p1, p2, opt_bytes) = rev_info?;
+
+            // We’d prefer to avoid the child thread calling into Python code,
+            // but this avoids a potential deadlock on the GIL if it does:
+            py.allow_threads(|| {
+                rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
+                    "combine_changeset_copies: channel is disconnected",
+                );
+            });
+        }
+        // We’d prefer to avoid the child thread calling into Python code,
+        // but this avoids a potential deadlock on the GIL if it does:
+        py.allow_threads(|| {
+            // Disconnect the channel to signal the child thread to stop:
+            // the `for … in rev_info_receiver` loop will end.
+            drop(rev_info_sender);
+
+            // Wait for the child thread to stop, and propagate any panic.
+            thread.join().unwrap_or_else(|panic_payload| {
+                std::panic::resume_unwind(panic_payload)
+            })
+        })
+    };
+
     let out = PyDict::new(py);
     for (dest, source) in path_copies.into_iter() {
         out.set_item(
@@ -84,7 +146,8 @@
                 revs: PyList,
                 children: PyDict,
                 target_rev: Revision,
-                rev_info: PyObject
+                rev_info: PyObject,
+                multi_thread: bool
             )
         ),
     )?;