Source code for pysys.utils.smtpserver

#!/usr/bin/env python
# PySys System Test Framework, Copyright (C) 2006-2018  M.B.Grieve

# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.

# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

# Contact: moraygrieve@users.sourceforge.net
 
import asyncore, smtpd, threading


[docs]class SimpleSMTPServer(smtpd.SMTPServer):
[docs] def __init__(self, localaddr, remoteaddr, filename='smtpserver.out', logMails=True): smtpd.SMTPServer.__init__(self, localaddr, remoteaddr) try: self.fp = open(filename, 'w') self.logMails = logMails self.count = 0 except Exception: pass
[docs] def process_message(self, peer, mailfrom, rcpttos, data): self.count = self.count + 1 if self.fp: self.fp.write("SimpleSMTPServer: Message count = %d\n" % self.count) if self.logMails: self.fp.write(data) self.fp.flush()
[docs]class SimpleSMTPServerRunner(object):
[docs] def __init__(self): self.exit = 0
[docs] def kickoff(self): while not self.exit: asyncore.loop(timeout=1.0, use_poll=False, count=1)
[docs] def start(self): threading.Thread(target=self.kickoff).start()
[docs] def stop(self): self.exit = 1