Source code for pysys.utils.allocport

#!/usr/bin/env python
# PySys System Test Framework, Copyright (C) 2006-2018  M.B.Grieve

# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.

# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

# Contact: moraygrieve@users.sourceforge.net

import collections, random, subprocess, sys
from pysys import process_lock
from pysys.constants import *

# LRU queue of server TCP ports for allocation to tests which need to
# start TCP servers. Initialized to None since it might not actually be used.
# Properly initialize only on demand.
tcpServerPortPool = None

[docs]def getEphemeralTCPPortRange(): """Returns the range of TCP ports the operating system uses to allocate ephemeral ports from i.e. the ports allocated for the client side of a client-server connection. Returned as a tuple, (ephemeral_low, ephemeral_high) or raises exception on error. """ # Find the smallest and largest ephemeral port if PLATFORM == 'linux': f = open('/proc/sys/net/ipv4/ip_local_port_range') s = f.readline().split() ephemeral_low = int(s[0]) ephemeral_high = int(s[1]) del f elif PLATFORM == 'sunos': def runNdd(driver, parameter): p = subprocess.Popen(['/usr/sbin/ndd', driver, parameter], stdout=subprocess.PIPE, stderr=subprocess.PIPE) return int(p.communicate()[0].strip()) ephemeral_low = runNdd('/dev/tcp', 'tcp_smallest_anon_port') ephemeral_high = runNdd('/dev/tcp', 'tcp_largest_anon_port') elif PLATFORM == 'darwin': def runSysctl(parameter): p = subprocess.Popen(['/usr/sbin/sysctl', '-n', parameter], stdout=subprocess.PIPE, stderr=subprocess.PIPE) return int(p.communicate()[0].strip()) ephemeral_low = runSysctl('net.inet.ip.portrange.first') ephemeral_high = runSysctl('net.inet.ip.portrange.last') elif PLATFORM == 'win32': ephemeral_low = 1025 ephemeral_high = 5000 # The default if sys.version_info[0] == 2: import _winreg as winreg else: import winreg h = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r'SYSTEM\CurrentControlSet\Services\Tcpip\Parameters') try: ephemeral_high = winreg.QueryValueEx(h, 'MaxUserPort')[0] except Exception: # Accept the default if there isn't a value in the registry pass finally: winreg.CloseKey(h) del h else: raise SystemError("No way of determining ephemeral port range on platform %s" % sys.platform) return (ephemeral_low, ephemeral_high)
[docs]def initializePortPool(): """Initialize the pool of ports we can allocate TCP server ports from i.e. ports to which processes can bind to without clashes with other processes """ global tcpServerPortPool ephemeral_low, ephemeral_high = getEphemeralTCPPortRange() # Allocate server ports from all non-privileged, non-ephemeral ports tcpServerPortPool = list(range(1024, ephemeral_low)) + list(range(ephemeral_high,65536)) # Randomize the port set to reduce the chance of clashes between # simultaneous runs on the same machine random.shuffle(tcpServerPortPool) # Convert to an LRU queue of ports tcpServerPortPool = collections.deque(tcpServerPortPool)
[docs]def portIsInUse(port): # Try to bind to it to see if anyone else is using it with process_lock: s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Set SO_LINGER since we don't want any accidentally # connecting clients to cause the socket to hang # around if OSFAMILY == 'windows': s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 0) # Set non-blocking since we want to fail fast rather # than block s.setblocking(0) # Bind to empty host i.e wildcard interface try: s.bind(("", port)) except Exception: # If we get any exception assume it is because # the port is in use s.close() return True # Listen may not be necessary, but on unix it seems to # help do a more complete shutdown if listen is called s.listen(1) try: s.shutdown(socket.SHUT_RDWR) except Exception: # Do nothing - on windows shutdown sometimes # fails even after listen pass s.close() return False except Exception as e: # Don't expect this but just in case sys.stderr.write('Exception from port allocator: %s\n'%e) if s != None: s.close() return True
[docs]def allocateTCPPort(): while True: port = tcpServerPortPool.popleft() if portIsInUse(port): # Toss the port back at the end of the queue tcpServerPortPool.append(port) else: return port
[docs]class TCPPortOwner(object):
[docs] def __init__(self): self.port = allocateTCPPort()
[docs] def cleanup(self): tcpServerPortPool.append(self.port)
# Initialize the TCP port pool initializePortPool()