--- a/rust/hg-cpython/src/copy_tracing.rs Tue Jan 05 21:02:00 2021 +0100
+++ b/rust/hg-cpython/src/copy_tracing.rs Wed Jan 06 14:09:01 2021 +0100
@@ -22,6 +22,7 @@
children_count: PyDict,
target_rev: Revision,
rev_info: PyObject,
+ multi_thread: bool,
) -> PyResult<PyDict> {
let children_count = children_count
.items(py)
@@ -42,20 +43,81 @@
Ok((rev, p1, p2, opt_bytes))
});
- let mut combine_changeset_copies =
- CombineChangesetCopies::new(children_count);
+ let path_copies = if !multi_thread {
+ let mut combine_changeset_copies =
+ CombineChangesetCopies::new(children_count);
+
+ for rev_info in revs_info {
+ let (rev, p1, p2, opt_bytes) = rev_info?;
+ let files = match &opt_bytes {
+ Some(bytes) => ChangedFiles::new(bytes.data(py)),
+ // Python None was extracted to Option::None,
+ // meaning there was no copy data.
+ None => ChangedFiles::new_empty(),
+ };
+
+ combine_changeset_copies.add_revision(rev, p1, p2, files)
+ }
+ combine_changeset_copies.finish(target_rev)
+ } else {
+ // Use a bounded channel to provide back-pressure:
+ // if the child thread is slower to process revisions than this thread
+ // is to gather data for them, an unbounded channel would keep
+ // growing and eat memory.
+ //
+ // TODO: tweak the bound?
+ let (rev_info_sender, rev_info_receiver) =
+ crossbeam_channel::bounded::<RevInfo>(1000);
- for rev_info in revs_info {
- let (rev, p1, p2, opt_bytes) = rev_info?;
- let files = match &opt_bytes {
- Some(bytes) => ChangedFiles::new(bytes.data(py)),
- // value was presumably None, meaning they was no copy data.
- None => ChangedFiles::new_empty(),
- };
+ // Start a thread that does CPU-heavy processing in parallel with the
+ // loop below.
+ //
+ // If the parent thread panics, `rev_info_sender` will be dropped and
+ // “disconnected”. `rev_info_receiver` will be notified of this and
+ // exit its own loop.
+ let thread = std::thread::spawn(move || {
+ let mut combine_changeset_copies =
+ CombineChangesetCopies::new(children_count);
+ for (rev, p1, p2, opt_bytes) in rev_info_receiver {
+ let gil = Python::acquire_gil();
+ let py = gil.python();
+ let files = match &opt_bytes {
+ Some(raw) => ChangedFiles::new(raw.data(py)),
+ // Python None was extracted to Option::None,
+ // meaning there was no copy data.
+ None => ChangedFiles::new_empty(),
+ };
+ combine_changeset_copies.add_revision(rev, p1, p2, files)
+ }
+
+ combine_changeset_copies.finish(target_rev)
+ });
- combine_changeset_copies.add_revision(rev, p1, p2, files)
- }
- let path_copies = combine_changeset_copies.finish(target_rev);
+ for rev_info in revs_info {
+ let (rev, p1, p2, opt_bytes) = rev_info?;
+
+ // We’d prefer to avoid the child thread calling into Python code,
+ // but this avoids a potential deadlock on the GIL if it does:
+ py.allow_threads(|| {
+ rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
+ "combine_changeset_copies: channel is disconnected",
+ );
+ });
+ }
+ // We’d prefer to avoid the child thread calling into Python code,
+ // but this avoids a potential deadlock on the GIL if it does:
+ py.allow_threads(|| {
+ // Disconnect the channel to signal the child thread to stop:
+ // the `for … in rev_info_receiver` loop will end.
+ drop(rev_info_sender);
+
+ // Wait for the child thread to stop, and propagate any panic.
+ thread.join().unwrap_or_else(|panic_payload| {
+ std::panic::resume_unwind(panic_payload)
+ })
+ })
+ };
+
let out = PyDict::new(py);
for (dest, source) in path_copies.into_iter() {
out.set_item(
@@ -84,7 +146,8 @@
revs: PyList,
children: PyDict,
target_rev: Revision,
- rev_info: PyObject
+ rev_info: PyObject,
+ multi_thread: bool
)
),
)?;