worker: avoid reading 1 byte at a time from the OS pipe
authorArseniy Alekseyev <aalekseyev@janestreet.com>
Fri, 06 Jan 2023 15:17:14 +0000
changeset 49899 3eef8baf6b92
parent 49898 024e0580b853
child 49900 d2e80d27c524
worker: avoid reading 1 byte at a time from the OS pipe Apparently `pickle.load` does a lot of small reads, many of them literally 1-byte, so it benefits greatly from buffering. This change enables the buffering, at the cost of more complicated interaction with the `selector` API. On one repository with ~400k files this reduces the time by about ~30s, from ~60 to ~30s. The difference is so large because the actual updating work is parallellized, while these small reads are bottlenecking the central hg process.
mercurial/worker.py
--- a/mercurial/worker.py	Tue Jan 10 12:55:49 2023 -0500
+++ b/mercurial/worker.py	Fri Jan 06 15:17:14 2023 +0000
@@ -61,45 +61,6 @@
     return threading.current_thread() == threading.main_thread()
 
 
-class _blockingreader:
-    """Wrap unbuffered stream such that pickle.load() works with it.
-
-    pickle.load() expects that calls to read() and readinto() read as many
-    bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
-    pickle.load() raises an EOFError.
-    """
-
-    def __init__(self, wrapped):
-        self._wrapped = wrapped
-
-    def readline(self):
-        return self._wrapped.readline()
-
-    def readinto(self, buf):
-        pos = 0
-        size = len(buf)
-
-        with memoryview(buf) as view:
-            while pos < size:
-                with view[pos:] as subview:
-                    ret = self._wrapped.readinto(subview)
-                if not ret:
-                    break
-                pos += ret
-
-        return pos
-
-    # issue multiple reads until size is fulfilled (or EOF is encountered)
-    def read(self, size=-1):
-        if size < 0:
-            return self._wrapped.readall()
-
-        buf = bytearray(size)
-        n_read = self.readinto(buf)
-        del buf[n_read:]
-        return bytes(buf)
-
-
 if pycompat.isposix or pycompat.iswindows:
     _STARTUP_COST = 0.01
     # The Windows worker is thread based. If tasks are CPU bound, threads
@@ -276,11 +237,26 @@
     selector = selectors.DefaultSelector()
     for rfd, wfd in pipes:
         os.close(wfd)
-        # The stream has to be unbuffered. Otherwise, if all data is read from
-        # the raw file into the buffer, the selector thinks that the FD is not
-        # ready to read while pickle.load() could read from the buffer. This
-        # would delay the processing of readable items.
-        selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
+        # Buffering is needed for performance, but it also presents a problem:
+        # selector doesn't take the buffered data into account,
+        # so we have to arrange it so that the buffers are empty when select is called
+        # (see [peek_nonblock])
+        selector.register(os.fdopen(rfd, 'rb', 4096), selectors.EVENT_READ)
+
+    def peek_nonblock(f):
+        os.set_blocking(f.fileno(), False)
+        res = f.peek()
+        os.set_blocking(f.fileno(), True)
+        return res
+
+    def load_all(f):
+        # The pytype error likely goes away on a modern version of
+        # pytype having a modern typeshed snapshot.
+        # pytype: disable=wrong-arg-types
+        yield pickle.load(f)
+        while len(peek_nonblock(f)) > 0:
+            yield pickle.load(f)
+        # pytype: enable=wrong-arg-types
 
     def cleanup():
         signal.signal(signal.SIGINT, oldhandler)
@@ -294,15 +270,11 @@
         while openpipes > 0:
             for key, events in selector.select():
                 try:
-                    # The pytype error likely goes away on a modern version of
-                    # pytype having a modern typeshed snapshot.
-                    # pytype: disable=wrong-arg-types
-                    res = pickle.load(_blockingreader(key.fileobj))
-                    # pytype: enable=wrong-arg-types
-                    if hasretval and res[0]:
-                        retval.update(res[1])
-                    else:
-                        yield res
+                    for res in load_all(key.fileobj):
+                        if hasretval and res[0]:
+                            retval.update(res[1])
+                        else:
+                            yield res
                 except EOFError:
                     selector.unregister(key.fileobj)
                     # pytype: disable=attribute-error