protocol: unify unbundle on the server side
authorMatt Mackall <mpm@selenic.com>
Thu, 15 Jul 2010 11:24:42 -0500
changeset 11593 d054cc5c7737
parent 11592 26e0782b8380
child 11594 67863f9d805f
protocol: unify unbundle on the server side
mercurial/hgweb/hgweb_mod.py
mercurial/hgweb/protocol.py
mercurial/sshserver.py
mercurial/wireproto.py
--- a/mercurial/hgweb/hgweb_mod.py	Wed Jul 14 17:12:18 2010 -0500
+++ b/mercurial/hgweb/hgweb_mod.py	Thu Jul 15 11:24:42 2010 -0500
@@ -6,8 +6,8 @@
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 
-import os, zlib
-from mercurial import ui, hg, hook, error, encoding, templater, wireproto
+import os, zlib, sys, cStringIO, urllib
+from mercurial import ui, hg, hook, error, encoding, templater, wireproto, util
 from common import get_mtime, ErrorResponse, permhooks
 from common import HTTP_OK, HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVER_ERROR
 from request import wsgirequest
@@ -56,6 +56,23 @@
     def respond(self, s):
         self.req.respond(HTTP_OK, HGTYPE, length=len(s))
         self.response = s
+    def getfile(self, fp):
+        length = int(self.req.env['CONTENT_LENGTH'])
+        for s in util.filechunkiter(self.req, limit=length):
+            fp.write(s)
+    def redirect(self):
+        self.oldio = sys.stdout, sys.stderr
+        sys.stderr = sys.stdout = cStringIO.StringIO()
+    def respondpush(self, ret):
+        val = sys.stdout.getvalue()
+        sys.stdout, sys.stderr = self.oldio
+        self.req.respond(HTTP_OK, HGTYPE)
+        self.response = '%d\n%s' % (ret, val)
+    def _client(self):
+        return 'remote:%s:%s:%s' % (
+            self.req.env.get('wsgi.url_scheme') or 'http',
+            urllib.quote(self.req.env.get('REMOTE_HOST', '')),
+            urllib.quote(self.req.env.get('REMOTE_USER', '')))
 
 def callproto(repo, req, cmd):
     p = webproto(req)
--- a/mercurial/hgweb/protocol.py	Wed Jul 14 17:12:18 2010 -0500
+++ b/mercurial/hgweb/protocol.py	Thu Jul 15 11:24:42 2010 -0500
@@ -32,85 +32,3 @@
     rsp = ' '.join(caps)
     req.respond(HTTP_OK, HGTYPE, length=len(rsp))
     yield rsp
-
-def unbundle(repo, req):
-
-    proto = req.env.get('wsgi.url_scheme') or 'http'
-    their_heads = req.form['heads'][0].split(' ')
-
-    def check_heads():
-        heads = map(hex, repo.heads())
-        return their_heads == [hex('force')] or their_heads == heads
-
-    # fail early if possible
-    if not check_heads():
-        req.drain()
-        raise ErrorResponse(HTTP_OK, 'unsynced changes')
-
-    # do not lock repo until all changegroup data is
-    # streamed. save to temporary file.
-
-    fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
-    fp = os.fdopen(fd, 'wb+')
-    try:
-        length = int(req.env['CONTENT_LENGTH'])
-        for s in util.filechunkiter(req, limit=length):
-            fp.write(s)
-
-        try:
-            lock = repo.lock()
-            try:
-                if not check_heads():
-                    raise ErrorResponse(HTTP_OK, 'unsynced changes')
-
-                fp.seek(0)
-                header = fp.read(6)
-                if header.startswith('HG') and not header.startswith('HG10'):
-                    raise ValueError('unknown bundle version')
-                elif header not in changegroupmod.bundletypes:
-                    raise ValueError('unknown bundle compression type')
-                gen = changegroupmod.unbundle(header, fp)
-
-                # send addchangegroup output to client
-
-                oldio = sys.stdout, sys.stderr
-                sys.stderr = sys.stdout = cStringIO.StringIO()
-
-                try:
-                    url = 'remote:%s:%s:%s' % (
-                          proto,
-                          urllib.quote(req.env.get('REMOTE_HOST', '')),
-                          urllib.quote(req.env.get('REMOTE_USER', '')))
-                    try:
-                        ret = repo.addchangegroup(gen, 'serve', url, lock=lock)
-                    except util.Abort, inst:
-                        sys.stdout.write("abort: %s\n" % inst)
-                        ret = 0
-                finally:
-                    val = sys.stdout.getvalue()
-                    sys.stdout, sys.stderr = oldio
-                req.respond(HTTP_OK, HGTYPE)
-                return '%d\n%s' % (ret, val),
-            finally:
-                lock.release()
-        except ValueError, inst:
-            raise ErrorResponse(HTTP_OK, inst)
-        except (OSError, IOError), inst:
-            error = getattr(inst, 'strerror', 'Unknown error')
-            if not isinstance(error, str):
-                error = 'Error: %s' % str(error)
-            if inst.errno == errno.ENOENT:
-                code = HTTP_NOT_FOUND
-            else:
-                code = HTTP_SERVER_ERROR
-            filename = getattr(inst, 'filename', '')
-            # Don't send our filesystem layout to the client
-            if filename and filename.startswith(repo.root):
-                filename = filename[len(repo.root)+1:]
-                text = '%s: %s' % (error, filename)
-            else:
-                text = error.replace(repo.root + os.path.sep, '')
-            raise ErrorResponse(code, text)
-    finally:
-        fp.close()
-        os.unlink(tempname)
--- a/mercurial/sshserver.py	Wed Jul 14 17:12:18 2010 -0500
+++ b/mercurial/sshserver.py	Thu Jul 15 11:24:42 2010 -0500
@@ -72,6 +72,20 @@
             self.fout.write(chunk)
         self.fout.flush()
 
+    def getfile(self, fpout):
+        self.respond('')
+        count = int(self.fin.readline())
+        while count:
+            fpout.write(self.fin.read(count))
+            count = int(self.fin.readline())
+
+    def redirect(self):
+        pass
+
+    def respondpush(self, ret):
+        self.respond('')
+        self.respond(str(ret))
+
     def serve_forever(self):
         try:
             while self.serve_one():
@@ -127,58 +141,10 @@
             return
 
         self.respond("")
-        r = self.repo.addchangegroup(self.fin, 'serve', self.client_url(),
+        r = self.repo.addchangegroup(self.fin, 'serve', self._client(),
                                      lock=self.lock)
         return str(r)
 
-    def client_url(self):
+    def _client(self):
         client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
         return 'remote:ssh:' + client
-
-    def do_unbundle(self):
-        their_heads = self.getarg('heads').split()
-
-        def check_heads():
-            heads = map(hex, self.repo.heads())
-            return their_heads == [hex('force')] or their_heads == heads
-
-        # fail early if possible
-        if not check_heads():
-            self.respond(_('unsynced changes'))
-            return
-
-        self.respond('')
-
-        # write bundle data to temporary file because it can be big
-        fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
-        fp = os.fdopen(fd, 'wb+')
-        try:
-            count = int(self.fin.readline())
-            while count:
-                fp.write(self.fin.read(count))
-                count = int(self.fin.readline())
-
-            was_locked = self.lock is not None
-            if not was_locked:
-                self.lock = self.repo.lock()
-            try:
-                if not check_heads():
-                    # someone else committed/pushed/unbundled while we
-                    # were transferring data
-                    self.respond(_('unsynced changes'))
-                    return
-                self.respond('')
-
-                # push can proceed
-
-                fp.seek(0)
-                r = self.repo.addchangegroup(fp, 'serve', self.client_url(),
-                                             lock=self.lock)
-                self.respond(str(r))
-            finally:
-                if not was_locked:
-                    self.lock.release()
-                    self.lock = None
-        finally:
-            fp.close()
-            os.unlink(tempname)
--- a/mercurial/wireproto.py	Wed Jul 14 17:12:18 2010 -0500
+++ b/mercurial/wireproto.py	Thu Jul 15 11:24:42 2010 -0500
@@ -5,10 +5,11 @@
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 
+import urllib, tempfile, os
 from i18n import _
 from node import bin, hex
-import urllib
-import streamclone, repo, error, encoding
+import changegroup as changegroupmod
+import streamclone, repo, error, encoding, util
 import pushkey as pushkey_
 
 # client side
@@ -198,6 +199,56 @@
     except streamclone.StreamException, inst:
         return str(inst)
 
+def unbundle(repo, proto, heads):
+    their_heads = heads.split()
+
+    def check_heads():
+        heads = map(hex, repo.heads())
+        return their_heads == [hex('force')] or their_heads == heads
+
+    # fail early if possible
+    if not check_heads():
+        repo.respond(_('unsynced changes'))
+        return
+
+    # write bundle data to temporary file because it can be big
+    fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
+    fp = os.fdopen(fd, 'wb+')
+    r = 0
+    proto.redirect()
+    try:
+        proto.getfile(fp)
+        lock = repo.lock()
+        try:
+            if not check_heads():
+                # someone else committed/pushed/unbundled while we
+                # were transferring data
+                proto.respond(_('unsynced changes'))
+                return
+
+            # push can proceed
+            fp.seek(0)
+            header = fp.read(6)
+            if header.startswith('HG'):
+                if not header.startswith('HG10'):
+                    raise ValueError('unknown bundle version')
+                elif header not in changegroupmod.bundletypes:
+                    raise ValueError('unknown bundle compression type')
+            gen = changegroupmod.unbundle(header, fp)
+
+            try:
+                r = repo.addchangegroup(gen, 'serve', proto._client(),
+                                        lock=lock)
+            except util.Abort, inst:
+                sys.stderr.write("abort: %s\n" % inst)
+        finally:
+            lock.release()
+            proto.respondpush(r)
+
+    finally:
+        fp.close()
+        os.unlink(tempname)
+
 commands = {
     'between': (between, 'pairs'),
     'branchmap': (branchmap, ''),
@@ -209,4 +260,5 @@
     'lookup': (lookup, 'key'),
     'pushkey': (pushkey, 'namespace key old new'),
     'stream_out': (stream, ''),
+    'unbundle': (unbundle, 'heads'),
 }