util: enable observing of util.bufferedinputpipe
authorGregory Szorc <gregory.szorc@gmail.com>
Sat, 24 Feb 2018 12:24:03 -0800
changeset 36525 3158052720ae
parent 36524 bfe38f787d5b
child 36526 7cc4a9b9732a
util: enable observing of util.bufferedinputpipe Our file object proxy is useful. But it doesn't capture all I/O. The "os" module offers low-level interfaces to various system calls. For example, os.read() exposes read(2) to read from a file descriptor. bufferedinputpipe is special in a few ways. First, it acts as a proxy of sorts around our [potentially proxied] file object. In addition, it uses os.read() to satisfy all I/O. This means that our observer doesn't see notifications for reads on this type. This is preventing us from properly instrumenting reads on ssh peers. This commit teaches bufferedinputpipe to be aware of our observed file objects. We do this by introducing a class variation that notifies our observer of os.read() events. Since read() and readline() bypass os.read(), we also teach this instance to notify the observer for buffered variations of these reads as well. We don't report them as actual read() and readline() calls because these methods are never called on the actual file object but rather a buffered version of it. We introduce bufferedinputpipe.__new__ to swap in the new class if the passed file object is a fileobjectproxy. This makes hooking up the observer automatic. And it is a zero cost abstraction for I/O operations on non-proxied file objects. Differential Revision: https://phab.mercurial-scm.org/D2404
mercurial/util.py
--- a/mercurial/util.py	Sat Feb 24 12:22:20 2018 -0800
+++ b/mercurial/util.py	Sat Feb 24 12:24:03 2018 -0800
@@ -373,6 +373,13 @@
     This class lives in the 'util' module because it makes use of the 'os'
     module from the python stdlib.
     """
+    def __new__(cls, fh):
+        # If we receive a fileobjectproxy, we need to use a variation of this
+        # class that notifies observers about activity.
+        if isinstance(fh, fileobjectproxy):
+            cls = observedbufferedinputpipe
+
+        return super(bufferedinputpipe, cls).__new__(cls)
 
     def __init__(self, input):
         self._input = input
@@ -453,6 +460,8 @@
             self._lenbuf += len(data)
             self._buffer.append(data)
 
+        return data
+
 def mmapread(fp):
     try:
         fd = getattr(fp, 'fileno', lambda: fp)()
@@ -505,6 +514,8 @@
 
     def __getattribute__(self, name):
         ours = {
+            r'_observer',
+
             # IOBase
             r'close',
             # closed if a property
@@ -639,6 +650,46 @@
         return object.__getattribute__(self, r'_observedcall')(
             r'read1', *args, **kwargs)
 
+class observedbufferedinputpipe(bufferedinputpipe):
+    """A variation of bufferedinputpipe that is aware of fileobjectproxy.
+
+    ``bufferedinputpipe`` makes low-level calls to ``os.read()`` that
+    bypass ``fileobjectproxy``. Because of this, we need to make
+    ``bufferedinputpipe`` aware of these operations.
+
+    This variation of ``bufferedinputpipe`` can notify observers about
+    ``os.read()`` events. It also re-publishes other events, such as
+    ``read()`` and ``readline()``.
+    """
+    def _fillbuffer(self):
+        res = super(observedbufferedinputpipe, self)._fillbuffer()
+
+        fn = getattr(self._input._observer, r'osread', None)
+        if fn:
+            fn(res, _chunksize)
+
+        return res
+
+    # We use different observer methods because the operation isn't
+    # performed on the actual file object but on us.
+    def read(self, size):
+        res = super(observedbufferedinputpipe, self).read(size)
+
+        fn = getattr(self._input._observer, r'bufferedread', None)
+        if fn:
+            fn(res, size)
+
+        return res
+
+    def readline(self, *args, **kwargs):
+        res = super(observedbufferedinputpipe, self).readline(*args, **kwargs)
+
+        fn = getattr(self._input._observer, r'bufferedreadline', None)
+        if fn:
+            fn(res)
+
+        return res
+
 DATA_ESCAPE_MAP = {pycompat.bytechr(i): br'\x%02x' % i for i in range(256)}
 DATA_ESCAPE_MAP.update({
     b'\\': b'\\\\',
@@ -702,6 +753,16 @@
 
         self.fh.write('%s> flush() -> %r\n' % (self.name, res))
 
+    # For observedbufferedinputpipe.
+    def bufferedread(self, res, size):
+        self.fh.write('%s> bufferedread(%d) -> %d' % (
+            self.name, size, len(res)))
+        self._writedata(res)
+
+    def bufferedreadline(self, res):
+        self.fh.write('%s> bufferedreadline() -> %d' % (self.name, len(res)))
+        self._writedata(res)
+
 def makeloggingfileobject(logh, fh, name, reads=True, writes=True,
                           logdata=False):
     """Turn a file object into a logging file object."""