rust/hg-cpython/src/copy_tracing.rs
changeset 46588 47557ea79fc7
parent 46587 cb4b0b0c6de4
child 46589 620c88fb42a2
--- a/rust/hg-cpython/src/copy_tracing.rs	Tue Jan 05 21:02:00 2021 +0100
+++ b/rust/hg-cpython/src/copy_tracing.rs	Wed Jan 06 14:09:01 2021 +0100
@@ -22,6 +22,7 @@
     children_count: PyDict,
     target_rev: Revision,
     rev_info: PyObject,
+    multi_thread: bool,
 ) -> PyResult<PyDict> {
     let children_count = children_count
         .items(py)
@@ -42,20 +43,81 @@
         Ok((rev, p1, p2, opt_bytes))
     });
 
-    let mut combine_changeset_copies =
-        CombineChangesetCopies::new(children_count);
+    let path_copies = if !multi_thread {
+        let mut combine_changeset_copies =
+            CombineChangesetCopies::new(children_count);
+
+        for rev_info in revs_info {
+            let (rev, p1, p2, opt_bytes) = rev_info?;
+            let files = match &opt_bytes {
+                Some(bytes) => ChangedFiles::new(bytes.data(py)),
+                // Python None was extracted to Option::None,
+                // meaning there was no copy data.
+                None => ChangedFiles::new_empty(),
+            };
+
+            combine_changeset_copies.add_revision(rev, p1, p2, files)
+        }
+        combine_changeset_copies.finish(target_rev)
+    } else {
+        // Use a bounded channel to provide back-pressure:
+        // if the child thread is slower to process revisions than this thread
+        // is to gather data for them, an unbounded channel would keep
+        // growing and eat memory.
+        //
+        // TODO: tweak the bound?
+        let (rev_info_sender, rev_info_receiver) =
+            crossbeam_channel::bounded::<RevInfo>(1000);
 
-    for rev_info in revs_info {
-        let (rev, p1, p2, opt_bytes) = rev_info?;
-        let files = match &opt_bytes {
-            Some(bytes) => ChangedFiles::new(bytes.data(py)),
-            // value was presumably None, meaning they was no copy data.
-            None => ChangedFiles::new_empty(),
-        };
+        // Start a thread that does CPU-heavy processing in parallel with the
+        // loop below.
+        //
+        // If the parent thread panics, `rev_info_sender` will be dropped and
+        // “disconnected”. `rev_info_receiver` will be notified of this and
+        // exit its own loop.
+        let thread = std::thread::spawn(move || {
+            let mut combine_changeset_copies =
+                CombineChangesetCopies::new(children_count);
+            for (rev, p1, p2, opt_bytes) in rev_info_receiver {
+                let gil = Python::acquire_gil();
+                let py = gil.python();
+                let files = match &opt_bytes {
+                    Some(raw) => ChangedFiles::new(raw.data(py)),
+                    // Python None was extracted to Option::None,
+                    // meaning there was no copy data.
+                    None => ChangedFiles::new_empty(),
+                };
+                combine_changeset_copies.add_revision(rev, p1, p2, files)
+            }
+
+            combine_changeset_copies.finish(target_rev)
+        });
 
-        combine_changeset_copies.add_revision(rev, p1, p2, files)
-    }
-    let path_copies = combine_changeset_copies.finish(target_rev);
+        for rev_info in revs_info {
+            let (rev, p1, p2, opt_bytes) = rev_info?;
+
+            // We’d prefer to avoid the child thread calling into Python code,
+            // but this avoids a potential deadlock on the GIL if it does:
+            py.allow_threads(|| {
+                rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
+                    "combine_changeset_copies: channel is disconnected",
+                );
+            });
+        }
+        // We’d prefer to avoid the child thread calling into Python code,
+        // but this avoids a potential deadlock on the GIL if it does:
+        py.allow_threads(|| {
+            // Disconnect the channel to signal the child thread to stop:
+            // the `for … in rev_info_receiver` loop will end.
+            drop(rev_info_sender);
+
+            // Wait for the child thread to stop, and propagate any panic.
+            thread.join().unwrap_or_else(|panic_payload| {
+                std::panic::resume_unwind(panic_payload)
+            })
+        })
+    };
+
     let out = PyDict::new(py);
     for (dest, source) in path_copies.into_iter() {
         out.set_item(
@@ -84,7 +146,8 @@
                 revs: PyList,
                 children: PyDict,
                 target_rev: Revision,
-                rev_info: PyObject
+                rev_info: PyObject,
+                multi_thread: bool
             )
         ),
     )?;