tests: add dummy SMTP daemon for SSL tests
authorYuya Nishihara <yuya@tcha.org>
Fri, 27 May 2016 22:43:47 +0900
changeset 29332 2bb0ddd8267b
parent 29331 1e02d9576194
child 29333 cdef60d9f442
tests: add dummy SMTP daemon for SSL tests Currently it only supports SMTP over SSL since SMTPS should be simpler than handling StartTLS. Since we don't need asynchronous server for our tests, it does TLS handshake in blocking way. But asyncore is required by Python smtpd module.
tests/dummysmtpd.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/dummysmtpd.py	Fri May 27 22:43:47 2016 +0900
@@ -0,0 +1,81 @@
+#!/usr/bin/env python
+
+"""dummy SMTP server for use in tests"""
+
+from __future__ import absolute_import
+
+import asyncore
+import optparse
+import smtpd
+import ssl
+import sys
+
+from mercurial import (
+    cmdutil,
+)
+
+def log(msg):
+    sys.stdout.write(msg)
+    sys.stdout.flush()
+
+class dummysmtpserver(smtpd.SMTPServer):
+    def __init__(self, localaddr):
+        smtpd.SMTPServer.__init__(self, localaddr, remoteaddr=None)
+
+    def process_message(self, peer, mailfrom, rcpttos, data):
+        log('%s from=%s to=%s\n' % (peer[0], mailfrom, ', '.join(rcpttos)))
+
+class dummysmtpsecureserver(dummysmtpserver):
+    def __init__(self, localaddr, certfile):
+        dummysmtpserver.__init__(self, localaddr)
+        self._certfile = certfile
+
+    def handle_accept(self):
+        pair = self.accept()
+        if not pair:
+            return
+        conn, addr = pair
+        try:
+            # wrap_socket() would block, but we don't care
+            conn = ssl.wrap_socket(conn, server_side=True,
+                                   certfile=self._certfile,
+                                   ssl_version=ssl.PROTOCOL_TLSv1)
+        except ssl.SSLError:
+            log('%s ssl error\n' % addr[0])
+            conn.close()
+            return
+        smtpd.SMTPChannel(self, conn, addr)
+
+def run():
+    try:
+        asyncore.loop()
+    except KeyboardInterrupt:
+        pass
+
+def main():
+    op = optparse.OptionParser()
+    op.add_option('-d', '--daemon', action='store_true')
+    op.add_option('--daemon-postexec', action='append')
+    op.add_option('-p', '--port', type=int, default=8025)
+    op.add_option('-a', '--address', default='localhost')
+    op.add_option('--pid-file', metavar='FILE')
+    op.add_option('--tls', choices=['none', 'smtps'], default='none')
+    op.add_option('--certificate', metavar='FILE')
+
+    opts, args = op.parse_args()
+    if opts.tls == 'smtps' and not opts.certificate:
+        op.error('--certificate must be specified')
+
+    addr = (opts.address, opts.port)
+    def init():
+        if opts.tls == 'none':
+            dummysmtpserver(addr)
+        else:
+            dummysmtpsecureserver(addr, opts.certificate)
+        log('listening at %s:%d\n' % addr)
+
+    cmdutil.service(vars(opts), initfn=init, runfn=run,
+                    runargs=[sys.executable, __file__] + sys.argv[1:])
+
+if __name__ == '__main__':
+    main()