hgext/remotefilelog/cacheclient.py
author Augie Fackler <augie@google.com>
Thu, 27 Sep 2018 13:03:19 -0400
changeset 40495 3a333a582d7b
permissions -rwxr-xr-x
remotefilelog: import pruned-down remotefilelog extension from hg-experimental This is remotefilelog as of my recent patches for compatibility with current tip of hg, minus support for old versions of Mercurial and some FB-specific features like their treemanifest extension and fetching linkrev data from a patched phabricator. The file extutil.py moved from hgext3rd to remotefilelog. This is not yet ready to be landed, consider it a preview for now. Planned changes include: * replace lz4 with zstd * rename some capabilities, requirements and wireproto commands to mark them as experimental * consolidate bits of shallowutil with related functions (eg readfile) I'm certainly open to other (small) changes, but my rough mission is to land this largely as-is so we can use it as a model of the functionality we need going forward for lazy-fetching of file contents from a server. # no-check-commit because of a few foo_bar functions Differential Revision: https://phab.mercurial-scm.org/D4782

#!/usr/bin/env python
# cacheclient.py - example cache client implementation
#
# Copyright 2013 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

# The remotefilelog extension can optionally use a caching layer to serve
# file revision requests. This is an example implementation that uses
# the python-memcached library: https://pypi.python.org/pypi/python-memcached/
# A better implementation would make all of the requests non-blocking.
from __future__ import absolute_import

import os
import sys

import memcache

stdin = sys.stdin
stdout = sys.stdout
stderr = sys.stderr

mc = None
keyprefix = None
cachepath = None

# Max number of keys per request
batchsize = 1000

# Max value size per key (in bytes)
maxsize = 512 * 1024

def readfile(path):
    f = open(path, "r")
    try:
        return f.read()
    finally:
        f.close()

def writefile(path, content):
    dirname = os.path.dirname(path)
    if not os.path.exists(dirname):
        os.makedirs(dirname)

    f = open(path, "w")
    try:
        f.write(content)
    finally:
        f.close()

def compress(value):
    # Real world implementations will want to compress values.
    # Insert your favorite compression here, ex:
    # return lz4wrapper.lzcompresshc(value)
    return value

def decompress(value):
    # Real world implementations will want to compress values.
    # Insert your favorite compression here, ex:
    # return lz4wrapper.lz4decompress(value)
    return value

def generateKey(id):
    return keyprefix + id

def generateId(key):
    return key[len(keyprefix):]

def getKeys():
    raw = stdin.readline()[:-1]
    keycount = int(raw)

    keys = []
    for i in range(keycount):
        id = stdin.readline()[:-1]
        keys.append(generateKey(id))

    results = mc.get_multi(keys)

    hits = 0
    for i, key in enumerate(keys):
        value = results.get(key)
        id = generateId(key)
        # On hit, write to disk
        if value:
            # Integer hit indicates a large file
            if isinstance(value, int):
                largekeys = list([key + str(i) for i in range(value)])
                largevalues = mc.get_multi(largekeys)
                if len(largevalues) == value:
                    value = ""
                    for largekey in largekeys:
                        value += largevalues[largekey]
                else:
                    # A chunk is missing, give up
                    stdout.write(id + "\n")
                    stdout.flush()
                    continue
            path = os.path.join(cachepath, id)
            value = decompress(value)
            writefile(path, value)
            hits += 1
        else:
            # On miss, report to caller
            stdout.write(id + "\n")
            stdout.flush()

        if i % 500 == 0:
            stdout.write("_hits_%s_\n" % hits)
            stdout.flush()

    # done signal
    stdout.write("0\n")
    stdout.flush()

def setKeys():
    raw = stdin.readline()[:-1]
    keycount = int(raw)

    values = {}
    for i in range(keycount):
        id = stdin.readline()[:-1]
        path = os.path.join(cachepath, id)

        value = readfile(path)
        value = compress(value)

        key = generateKey(id)
        if len(value) > maxsize:
            # split up large files
            start = 0
            i = 0
            while start < len(value):
                end = min(len(value), start + maxsize)
                values[key + str(i)] = value[start:end]
                start += maxsize
                i += 1

            # Large files are stored as an integer representing how many
            # chunks it's broken into.
            value = i

        values[key] = value

        if len(values) == batchsize:
            mc.set_multi(values)
            values = {}

    if values:
        mc.set_multi(values)

def main(argv=None):
    """
    remotefilelog uses this cacheclient by setting it in the repo config:

    [remotefilelog]
    cacheprocess = cacheclient <ip address:port> <memcache prefix>

    When memcache requests need to be made, it will execute this process
    with the following arguments:

    cacheclient <ip address:port> <memcache prefix><internal prefix> <cachepath>

    Communication happens via stdin and stdout. To make a get request,
    the following is written to stdin:

    get\n
    <key count>\n
    <key1>\n
    <key...>\n
    <keyN>\n

    The results of any cache hits will be written directly to <cachepath>/<key>.
    Any cache misses will be written to stdout in the form <key>\n. Once all
    hits and misses are finished 0\n will be written to stdout to signal
    completion.

    During the request, progress may be reported via stdout with the format
    _hits_###_\n where ### is an integer representing the number of hits so
    far. remotefilelog uses this to display a progress bar.

    A single cacheclient process may be used for multiple requests (though
    not in parallel), so it stays open until it receives exit\n via stdin.

    """
    if argv is None:
        argv = sys.argv

    global cachepath
    global keyprefix
    global mc

    ip = argv[1]
    keyprefix = argv[2]
    cachepath = argv[3]

    mc = memcache.Client([ip], debug=0)

    while True:
        cmd = stdin.readline()[:-1]
        if cmd == "get":
            getKeys()
        elif cmd == "set":
            setKeys()
        elif cmd == "exit":
            return 0
        else:
            stderr.write("Invalid Command %s\n" % cmd)
            return 1

if __name__ == "__main__":
    sys.exit(main())