mercurial/sslutil.py
changeset 26587 56b2bcea2529
parent 25977 696f6e2be282
child 26622 9e15286609ae
equal deleted inserted replaced
26586:d51c658d3f04 26587:56b2bcea2529
    12 import os
    12 import os
    13 import ssl
    13 import ssl
    14 import sys
    14 import sys
    15 
    15 
    16 from .i18n import _
    16 from .i18n import _
    17 from . import util
    17 from . import error, util
    18 
    18 
    19 _canloaddefaultcerts = False
    19 _canloaddefaultcerts = False
    20 try:
    20 try:
    21     ssl_context = ssl.SSLContext
    21     ssl_context = ssl.SSLContext
    22     _canloaddefaultcerts = util.safehasattr(ssl_context, 'load_default_certs')
    22     _canloaddefaultcerts = util.safehasattr(ssl_context, 'load_default_certs')
    48         sslsocket = sslcontext.wrap_socket(sock, server_hostname=serverhostname)
    48         sslsocket = sslcontext.wrap_socket(sock, server_hostname=serverhostname)
    49         # check if wrap_socket failed silently because socket had been
    49         # check if wrap_socket failed silently because socket had been
    50         # closed
    50         # closed
    51         # - see http://bugs.python.org/issue13721
    51         # - see http://bugs.python.org/issue13721
    52         if not sslsocket.cipher():
    52         if not sslsocket.cipher():
    53             raise util.Abort(_('ssl connection failed'))
    53             raise error.Abort(_('ssl connection failed'))
    54         return sslsocket
    54         return sslsocket
    55 except AttributeError:
    55 except AttributeError:
    56     def wrapsocket(sock, keyfile, certfile, ui, cert_reqs=ssl.CERT_NONE,
    56     def wrapsocket(sock, keyfile, certfile, ui, cert_reqs=ssl.CERT_NONE,
    57                    ca_certs=None, serverhostname=None):
    57                    ca_certs=None, serverhostname=None):
    58         sslsocket = ssl.wrap_socket(sock, keyfile, certfile,
    58         sslsocket = ssl.wrap_socket(sock, keyfile, certfile,
    60                                     ssl_version=ssl.PROTOCOL_TLSv1)
    60                                     ssl_version=ssl.PROTOCOL_TLSv1)
    61         # check if wrap_socket failed silently because socket had been
    61         # check if wrap_socket failed silently because socket had been
    62         # closed
    62         # closed
    63         # - see http://bugs.python.org/issue13721
    63         # - see http://bugs.python.org/issue13721
    64         if not sslsocket.cipher():
    64         if not sslsocket.cipher():
    65             raise util.Abort(_('ssl connection failed'))
    65             raise error.Abort(_('ssl connection failed'))
    66         return sslsocket
    66         return sslsocket
    67 
    67 
    68 def _verifycert(cert, hostname):
    68 def _verifycert(cert, hostname):
    69     '''Verify that cert (in socket.getpeercert() format) matches hostname.
    69     '''Verify that cert (in socket.getpeercert() format) matches hostname.
    70     CRLs is not handled.
    70     CRLs is not handled.
   138     if cacerts == '!':
   138     if cacerts == '!':
   139         pass
   139         pass
   140     elif cacerts:
   140     elif cacerts:
   141         cacerts = util.expandpath(cacerts)
   141         cacerts = util.expandpath(cacerts)
   142         if not os.path.exists(cacerts):
   142         if not os.path.exists(cacerts):
   143             raise util.Abort(_('could not find web.cacerts: %s') % cacerts)
   143             raise error.Abort(_('could not find web.cacerts: %s') % cacerts)
   144     else:
   144     else:
   145         cacerts = _defaultcacerts()
   145         cacerts = _defaultcacerts()
   146         if cacerts and cacerts != '!':
   146         if cacerts and cacerts != '!':
   147             ui.debug('using %s to enable OS X system CA\n' % cacerts)
   147             ui.debug('using %s to enable OS X system CA\n' % cacerts)
   148         ui.setconfig('web', 'cacerts', cacerts, 'defaultcacerts')
   148         ui.setconfig('web', 'cacerts', cacerts, 'defaultcacerts')
   161         host = self.host
   161         host = self.host
   162         cacerts = self.ui.config('web', 'cacerts')
   162         cacerts = self.ui.config('web', 'cacerts')
   163         hostfingerprint = self.ui.config('hostfingerprints', host)
   163         hostfingerprint = self.ui.config('hostfingerprints', host)
   164 
   164 
   165         if not sock.cipher(): # work around http://bugs.python.org/issue13721
   165         if not sock.cipher(): # work around http://bugs.python.org/issue13721
   166             raise util.Abort(_('%s ssl connection error') % host)
   166             raise error.Abort(_('%s ssl connection error') % host)
   167         try:
   167         try:
   168             peercert = sock.getpeercert(True)
   168             peercert = sock.getpeercert(True)
   169             peercert2 = sock.getpeercert()
   169             peercert2 = sock.getpeercert()
   170         except AttributeError:
   170         except AttributeError:
   171             raise util.Abort(_('%s ssl connection error') % host)
   171             raise error.Abort(_('%s ssl connection error') % host)
   172 
   172 
   173         if not peercert:
   173         if not peercert:
   174             raise util.Abort(_('%s certificate error: '
   174             raise error.Abort(_('%s certificate error: '
   175                                'no certificate received') % host)
   175                                'no certificate received') % host)
   176         peerfingerprint = util.sha1(peercert).hexdigest()
   176         peerfingerprint = util.sha1(peercert).hexdigest()
   177         nicefingerprint = ":".join([peerfingerprint[x:x + 2]
   177         nicefingerprint = ":".join([peerfingerprint[x:x + 2]
   178             for x in xrange(0, len(peerfingerprint), 2)])
   178             for x in xrange(0, len(peerfingerprint), 2)])
   179         if hostfingerprint:
   179         if hostfingerprint:
   180             if peerfingerprint.lower() != \
   180             if peerfingerprint.lower() != \
   181                     hostfingerprint.replace(':', '').lower():
   181                     hostfingerprint.replace(':', '').lower():
   182                 raise util.Abort(_('certificate for %s has unexpected '
   182                 raise error.Abort(_('certificate for %s has unexpected '
   183                                    'fingerprint %s') % (host, nicefingerprint),
   183                                    'fingerprint %s') % (host, nicefingerprint),
   184                                  hint=_('check hostfingerprint configuration'))
   184                                  hint=_('check hostfingerprint configuration'))
   185             self.ui.debug('%s certificate matched fingerprint %s\n' %
   185             self.ui.debug('%s certificate matched fingerprint %s\n' %
   186                           (host, nicefingerprint))
   186                           (host, nicefingerprint))
   187         elif cacerts != '!':
   187         elif cacerts != '!':
   188             msg = _verifycert(peercert2, host)
   188             msg = _verifycert(peercert2, host)
   189             if msg:
   189             if msg:
   190                 raise util.Abort(_('%s certificate error: %s') % (host, msg),
   190                 raise error.Abort(_('%s certificate error: %s') % (host, msg),
   191                                  hint=_('configure hostfingerprint %s or use '
   191                                  hint=_('configure hostfingerprint %s or use '
   192                                         '--insecure to connect insecurely') %
   192                                         '--insecure to connect insecurely') %
   193                                       nicefingerprint)
   193                                       nicefingerprint)
   194             self.ui.debug('%s certificate successfully verified\n' % host)
   194             self.ui.debug('%s certificate successfully verified\n' % host)
   195         elif strict:
   195         elif strict:
   196             raise util.Abort(_('%s certificate with fingerprint %s not '
   196             raise error.Abort(_('%s certificate with fingerprint %s not '
   197                                'verified') % (host, nicefingerprint),
   197                                'verified') % (host, nicefingerprint),
   198                              hint=_('check hostfingerprints or web.cacerts '
   198                              hint=_('check hostfingerprints or web.cacerts '
   199                                      'config setting'))
   199                                      'config setting'))
   200         else:
   200         else:
   201             self.ui.warn(_('warning: %s certificate with fingerprint %s not '
   201             self.ui.warn(_('warning: %s certificate with fingerprint %s not '