# HG changeset patch # User Matt Mackall # Date 1279211082 18000 # Node ID d054cc5c77370c345a0c41a002b2ce0aada45389 # Parent 26e0782b83804cdb55a87cb2c91217f009e77b9b protocol: unify unbundle on the server side diff -r 26e0782b8380 -r d054cc5c7737 mercurial/hgweb/hgweb_mod.py --- a/mercurial/hgweb/hgweb_mod.py Wed Jul 14 17:12:18 2010 -0500 +++ b/mercurial/hgweb/hgweb_mod.py Thu Jul 15 11:24:42 2010 -0500 @@ -6,8 +6,8 @@ # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. -import os, zlib -from mercurial import ui, hg, hook, error, encoding, templater, wireproto +import os, zlib, sys, cStringIO, urllib +from mercurial import ui, hg, hook, error, encoding, templater, wireproto, util from common import get_mtime, ErrorResponse, permhooks from common import HTTP_OK, HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVER_ERROR from request import wsgirequest @@ -56,6 +56,23 @@ def respond(self, s): self.req.respond(HTTP_OK, HGTYPE, length=len(s)) self.response = s + def getfile(self, fp): + length = int(self.req.env['CONTENT_LENGTH']) + for s in util.filechunkiter(self.req, limit=length): + fp.write(s) + def redirect(self): + self.oldio = sys.stdout, sys.stderr + sys.stderr = sys.stdout = cStringIO.StringIO() + def respondpush(self, ret): + val = sys.stdout.getvalue() + sys.stdout, sys.stderr = self.oldio + self.req.respond(HTTP_OK, HGTYPE) + self.response = '%d\n%s' % (ret, val) + def _client(self): + return 'remote:%s:%s:%s' % ( + self.req.env.get('wsgi.url_scheme') or 'http', + urllib.quote(self.req.env.get('REMOTE_HOST', '')), + urllib.quote(self.req.env.get('REMOTE_USER', ''))) def callproto(repo, req, cmd): p = webproto(req) diff -r 26e0782b8380 -r d054cc5c7737 mercurial/hgweb/protocol.py --- a/mercurial/hgweb/protocol.py Wed Jul 14 17:12:18 2010 -0500 +++ b/mercurial/hgweb/protocol.py Thu Jul 15 11:24:42 2010 -0500 @@ -32,85 +32,3 @@ rsp = ' '.join(caps) req.respond(HTTP_OK, HGTYPE, length=len(rsp)) yield rsp - -def unbundle(repo, req): - - proto = req.env.get('wsgi.url_scheme') or 'http' - their_heads = req.form['heads'][0].split(' ') - - def check_heads(): - heads = map(hex, repo.heads()) - return their_heads == [hex('force')] or their_heads == heads - - # fail early if possible - if not check_heads(): - req.drain() - raise ErrorResponse(HTTP_OK, 'unsynced changes') - - # do not lock repo until all changegroup data is - # streamed. save to temporary file. - - fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-') - fp = os.fdopen(fd, 'wb+') - try: - length = int(req.env['CONTENT_LENGTH']) - for s in util.filechunkiter(req, limit=length): - fp.write(s) - - try: - lock = repo.lock() - try: - if not check_heads(): - raise ErrorResponse(HTTP_OK, 'unsynced changes') - - fp.seek(0) - header = fp.read(6) - if header.startswith('HG') and not header.startswith('HG10'): - raise ValueError('unknown bundle version') - elif header not in changegroupmod.bundletypes: - raise ValueError('unknown bundle compression type') - gen = changegroupmod.unbundle(header, fp) - - # send addchangegroup output to client - - oldio = sys.stdout, sys.stderr - sys.stderr = sys.stdout = cStringIO.StringIO() - - try: - url = 'remote:%s:%s:%s' % ( - proto, - urllib.quote(req.env.get('REMOTE_HOST', '')), - urllib.quote(req.env.get('REMOTE_USER', ''))) - try: - ret = repo.addchangegroup(gen, 'serve', url, lock=lock) - except util.Abort, inst: - sys.stdout.write("abort: %s\n" % inst) - ret = 0 - finally: - val = sys.stdout.getvalue() - sys.stdout, sys.stderr = oldio - req.respond(HTTP_OK, HGTYPE) - return '%d\n%s' % (ret, val), - finally: - lock.release() - except ValueError, inst: - raise ErrorResponse(HTTP_OK, inst) - except (OSError, IOError), inst: - error = getattr(inst, 'strerror', 'Unknown error') - if not isinstance(error, str): - error = 'Error: %s' % str(error) - if inst.errno == errno.ENOENT: - code = HTTP_NOT_FOUND - else: - code = HTTP_SERVER_ERROR - filename = getattr(inst, 'filename', '') - # Don't send our filesystem layout to the client - if filename and filename.startswith(repo.root): - filename = filename[len(repo.root)+1:] - text = '%s: %s' % (error, filename) - else: - text = error.replace(repo.root + os.path.sep, '') - raise ErrorResponse(code, text) - finally: - fp.close() - os.unlink(tempname) diff -r 26e0782b8380 -r d054cc5c7737 mercurial/sshserver.py --- a/mercurial/sshserver.py Wed Jul 14 17:12:18 2010 -0500 +++ b/mercurial/sshserver.py Thu Jul 15 11:24:42 2010 -0500 @@ -72,6 +72,20 @@ self.fout.write(chunk) self.fout.flush() + def getfile(self, fpout): + self.respond('') + count = int(self.fin.readline()) + while count: + fpout.write(self.fin.read(count)) + count = int(self.fin.readline()) + + def redirect(self): + pass + + def respondpush(self, ret): + self.respond('') + self.respond(str(ret)) + def serve_forever(self): try: while self.serve_one(): @@ -127,58 +141,10 @@ return self.respond("") - r = self.repo.addchangegroup(self.fin, 'serve', self.client_url(), + r = self.repo.addchangegroup(self.fin, 'serve', self._client(), lock=self.lock) return str(r) - def client_url(self): + def _client(self): client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0] return 'remote:ssh:' + client - - def do_unbundle(self): - their_heads = self.getarg('heads').split() - - def check_heads(): - heads = map(hex, self.repo.heads()) - return their_heads == [hex('force')] or their_heads == heads - - # fail early if possible - if not check_heads(): - self.respond(_('unsynced changes')) - return - - self.respond('') - - # write bundle data to temporary file because it can be big - fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-') - fp = os.fdopen(fd, 'wb+') - try: - count = int(self.fin.readline()) - while count: - fp.write(self.fin.read(count)) - count = int(self.fin.readline()) - - was_locked = self.lock is not None - if not was_locked: - self.lock = self.repo.lock() - try: - if not check_heads(): - # someone else committed/pushed/unbundled while we - # were transferring data - self.respond(_('unsynced changes')) - return - self.respond('') - - # push can proceed - - fp.seek(0) - r = self.repo.addchangegroup(fp, 'serve', self.client_url(), - lock=self.lock) - self.respond(str(r)) - finally: - if not was_locked: - self.lock.release() - self.lock = None - finally: - fp.close() - os.unlink(tempname) diff -r 26e0782b8380 -r d054cc5c7737 mercurial/wireproto.py --- a/mercurial/wireproto.py Wed Jul 14 17:12:18 2010 -0500 +++ b/mercurial/wireproto.py Thu Jul 15 11:24:42 2010 -0500 @@ -5,10 +5,11 @@ # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. +import urllib, tempfile, os from i18n import _ from node import bin, hex -import urllib -import streamclone, repo, error, encoding +import changegroup as changegroupmod +import streamclone, repo, error, encoding, util import pushkey as pushkey_ # client side @@ -198,6 +199,56 @@ except streamclone.StreamException, inst: return str(inst) +def unbundle(repo, proto, heads): + their_heads = heads.split() + + def check_heads(): + heads = map(hex, repo.heads()) + return their_heads == [hex('force')] or their_heads == heads + + # fail early if possible + if not check_heads(): + repo.respond(_('unsynced changes')) + return + + # write bundle data to temporary file because it can be big + fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-') + fp = os.fdopen(fd, 'wb+') + r = 0 + proto.redirect() + try: + proto.getfile(fp) + lock = repo.lock() + try: + if not check_heads(): + # someone else committed/pushed/unbundled while we + # were transferring data + proto.respond(_('unsynced changes')) + return + + # push can proceed + fp.seek(0) + header = fp.read(6) + if header.startswith('HG'): + if not header.startswith('HG10'): + raise ValueError('unknown bundle version') + elif header not in changegroupmod.bundletypes: + raise ValueError('unknown bundle compression type') + gen = changegroupmod.unbundle(header, fp) + + try: + r = repo.addchangegroup(gen, 'serve', proto._client(), + lock=lock) + except util.Abort, inst: + sys.stderr.write("abort: %s\n" % inst) + finally: + lock.release() + proto.respondpush(r) + + finally: + fp.close() + os.unlink(tempname) + commands = { 'between': (between, 'pairs'), 'branchmap': (branchmap, ''), @@ -209,4 +260,5 @@ 'lookup': (lookup, 'key'), 'pushkey': (pushkey, 'namespace key old new'), 'stream_out': (stream, ''), + 'unbundle': (unbundle, 'heads'), }