-# Copyright (c) 2004-2009 Zope Corporation and Contributors.
-# All Rights Reserved.
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-"""Various test-support utility functions
-$Id: 116197 2010-09-04 00:02:53Z gary $
-import BaseHTTPServer
-import errno
-import logging
-import os
-import pkg_resources
-import random
-import re
-import shutil
-import socket
-import subprocess
-import sys
-import tempfile
-import textwrap
-import threading
-import time
-import urllib2
-import zc.buildout.buildout
-import zc.buildout.easy_install
-from zc.buildout.rmtree import rmtree
-fsync = getattr(os, 'fsync', lambda fileno: None)
-is_win32 = sys.platform == 'win32'
-setuptools_location = pkg_resources.working_set.find(
- pkg_resources.Requirement.parse('setuptools')).location
-def cat(dir, *names):
- path = os.path.join(dir, *names)
- if (not os.path.exists(path)
- and is_win32
- and os.path.exists(path+'')
- ):
- path = path+''
- print open(path).read(),
-def ls(dir, *subs):
- if subs:
- dir = os.path.join(dir, *subs)
- names = os.listdir(dir)
- names.sort()
- for name in names:
- if os.path.isdir(os.path.join(dir, name)):
- print 'd ',
- elif os.path.islink(os.path.join(dir, name)):
- print 'l ',
- else:
- print '- ',
- print name
-def mkdir(*path):
- os.mkdir(os.path.join(*path))
-def remove(*path):
- path = os.path.join(*path)
- if os.path.isdir(path):
- shutil.rmtree(path)
- else:
- os.remove(path)
-def rmdir(*path):
- shutil.rmtree(os.path.join(*path))
-def write(dir, *args):
- path = os.path.join(dir, *(args[:-1]))
- f = open(path, 'w')
- f.write(args[-1])
- f.flush()
- fsync(f.fileno())
- f.close()
-## FIXME - check for other platforms
-MUST_CLOSE_FDS = not sys.platform.startswith('win')
-def system(command, input=''):
- env = dict(os.environ)
- env['COLUMNS'] = '80'
- p = subprocess.Popen(command,
- shell=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- close_fds=MUST_CLOSE_FDS,
- env=env,
- )
- i, o, e = (p.stdin, p.stdout, p.stderr)
- if input:
- i.write(input)
- i.close()
- result = +
- o.close()
- e.close()
- return result
-def call_py(interpreter, cmd, flags=None):
- if sys.platform == 'win32':
- args = ['"%s"' % arg for arg in (interpreter, flags, cmd) if arg]
- args.insert(-1, '"-c"')
- return system('"%s"' % ' '.join(args))
- else:
- cmd = repr(cmd)
- return system(
- ' '.join(arg for arg in (interpreter, flags, '-c', cmd) if arg))
-def get(url):
- return urllib2.urlopen(url).read()
-def _runsetup(setup, executable, *args):
- if os.path.isdir(setup):
- setup = os.path.join(setup, '')
- d = os.path.dirname(setup)
- args = [zc.buildout.easy_install._safe_arg(arg)
- for arg in args]
- args.insert(0, '-q')
- env = dict(os.environ)
- if executable == sys.executable:
- env['PYTHONPATH'] = setuptools_location
- # else pass an executable that has setuptools! See
- args.append(env)
- here = os.getcwd()
- try:
- os.chdir(d)
- os.spawnle(os.P_WAIT, executable,
- zc.buildout.easy_install._safe_arg(executable),
- setup, *args)
- if os.path.exists('build'):
- rmtree('build')
- finally:
- os.chdir(here)
-def sdist(setup, dest):
- _runsetup(setup, sys.executable, 'sdist', '-d', dest, '--formats=zip')
-def bdist_egg(setup, executable, dest):
- _runsetup(setup, executable, 'bdist_egg', '-d', dest)
-def sys_install(setup, dest):
- _runsetup(setup, sys.executable, 'install', '--install-purelib', dest,
- '--record', os.path.join(dest, '__added_files__'),
- '--single-version-externally-managed')
-def find_python(version):
- e = os.environ.get('PYTHON%s' % version)
- if e is not None:
- return e
- if is_win32:
- e = '\Python%s%s\python.exe' % tuple(version.split('.'))
- if os.path.exists(e):
- return e
- else:
- cmd = 'python%s -c "import sys; print sys.executable"' % version
- p = subprocess.Popen(cmd,
- shell=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- close_fds=MUST_CLOSE_FDS)
- i, o = (p.stdin, p.stdout)
- i.close()
- e =
- o.close()
- if os.path.exists(e):
- return e
- cmd = 'python -c "import sys; print \'%s.%s\' % sys.version_info[:2]"'
- p = subprocess.Popen(cmd,
- shell=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- close_fds=MUST_CLOSE_FDS)
- i, o = (p.stdin, p.stdout)
- i.close()
- e =
- o.close()
- if e == version:
- cmd = 'python -c "import sys; print sys.executable"'
- p = subprocess.Popen(cmd,
- shell=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- close_fds=MUST_CLOSE_FDS)
- i, o = (p.stdin, p.stdout)
- i.close()
- e =
- o.close()
- if os.path.exists(e):
- return e
- raise ValueError(
- "Couldn't figure out the executable for Python %(version)s.\n"
- "Set the environment variable PYTHON%(version)s to the location\n"
- "of the Python %(version)s executable before running the tests."
- % {'version': version})
-def wait_until(label, func, *args, **kw):
- if 'timeout' in kw:
- kw = dict(kw)
- timeout = kw.pop('timeout')
- else:
- timeout = 30
- deadline = time.time()+timeout
- while time.time() < deadline:
- if func(*args, **kw):
- return
- time.sleep(0.01)
- raise ValueError('Timed out waiting for: '+label)
-def get_installer_values():
- """Get the current values for the easy_install module.
- This is necessary because instantiating a Buildout will force the
- Buildout's values on the installer.
- Returns a dict of names-values suitable for set_installer_values."""
- names = ('default_versions', 'download_cache', 'install_from_cache',
- 'prefer_final', 'include_site_packages',
- 'allowed_eggs_from_site_packages', 'use_dependency_links',
- 'allow_picked_versions', 'always_unzip'
- )
- values = {}
- for name in names:
- values[name] = getattr(zc.buildout.easy_install, name)()
- return values
-def set_installer_values(values):
- """Set the given values on the installer."""
- for name, value in values.items():
- getattr(zc.buildout.easy_install, name)(value)
-def make_buildout(executable=None):
- """Make a buildout that uses this version of zc.buildout."""
- # Create a basic buildout.cfg to avoid a warning from buildout.
- open('buildout.cfg', 'w').write(
- "[buildout]\nparts =\n"
- )
- # Get state of installer defaults so we can reinstate them (instantiating
- # a Buildout will force the Buildout's defaults on the installer).
- installer_values = get_installer_values()
- # Use the buildout bootstrap command to create a buildout
- config = [
- ('buildout', 'log-level', 'WARNING'),
- # trick bootstrap into putting the buildout develop egg
- # in the eggs dir.
- ('buildout', 'develop-eggs-directory', 'eggs'),
- ]
- if executable is not None:
- config.append(('buildout', 'executable', executable))
- zc.buildout.buildout.Buildout(
- 'buildout.cfg', config,
- user_defaults=False,
- ).bootstrap([])
- # Create the develop-eggs dir, which didn't get created the usual
- # way due to the trick above:
- os.mkdir('develop-eggs')
- # Reinstate the default values of the installer.
- set_installer_values(installer_values)
-def buildoutSetUp(test):
- test.globs['__tear_downs'] = __tear_downs = []
- test.globs['register_teardown'] = register_teardown = __tear_downs.append
- installer_values = get_installer_values()
- register_teardown(
- lambda: set_installer_values(installer_values)
- )
- here = os.getcwd()
- register_teardown(lambda: os.chdir(here))
- handlers_before_set_up = logging.getLogger().handlers[:]
- def restore_root_logger_handlers():
- root_logger = logging.getLogger()
- for handler in root_logger.handlers[:]:
- root_logger.removeHandler(handler)
- for handler in handlers_before_set_up:
- root_logger.addHandler(handler)
- register_teardown(restore_root_logger_handlers)
- base = tempfile.mkdtemp('buildoutSetUp')
- base = os.path.realpath(base)
- register_teardown(lambda base=base: rmtree(base))
- old_home = os.environ.get('HOME')
- os.environ['HOME'] = os.path.join(base, 'bbbBadHome')
- def restore_home():
- if old_home is None:
- del os.environ['HOME']
- else:
- os.environ['HOME'] = old_home
- register_teardown(restore_home)
- base = os.path.join(base, '_TEST_')
- os.mkdir(base)
- tmp = tempfile.mkdtemp('buildouttests')
- register_teardown(lambda: rmtree(tmp))
- zc.buildout.easy_install.default_index_url = 'file://'+tmp
- os.environ['buildout-testing-index-url'] = (
- zc.buildout.easy_install.default_index_url)
- os.environ.pop('PYTHONPATH', None)
- def tmpdir(name):
- path = os.path.join(base, name)
- mkdir(path)
- return path
- sample = tmpdir('sample-buildout')
- os.chdir(sample)
- make_buildout()
- def start_server(path):
- port, thread = _start_server(path, name=path)
- url = 'http://localhost:%s/' % port
- register_teardown(lambda: stop_server(url, thread))
- return url
- def make_py(initialization=''):
- """Returns paths to new executable and to its site-packages.
- """
- buildout = tmpdir('executable_buildout')
- site_packages_dir = os.path.join(buildout, 'site-packages')
- mkdir(site_packages_dir)
- old_wd = os.getcwd()
- os.chdir(buildout)
- make_buildout()
- # Normally we don't process .pth files in extra-paths. We want to
- # in this case so that we can test with setuptools system installs
- # (--single-version-externally-managed), which use .pth files.
- initialization = (
- ('import sys\n'
- 'import site\n'
- 'known_paths = set(sys.path)\n'
- 'site_packages_dir = %r\n'
- 'site.addsitedir(site_packages_dir, known_paths)\n'
- ) % (site_packages_dir,)) + initialization
- initialization = '\n'.join(
- ' ' + line for line in initialization.split('\n'))
- install_develop(
- 'zc.recipe.egg', os.path.join(buildout, 'develop-eggs'))
- install_develop(
- 'z3c.recipe.scripts', os.path.join(buildout, 'develop-eggs'))
- write('buildout.cfg', textwrap.dedent('''\
- [buildout]
- parts = py
- include-site-packages = false
- exec-sitecustomize = false
- [py]
- recipe = z3c.recipe.scripts
- interpreter = py
- initialization =
- %(initialization)s
- extra-paths = %(site-packages)s
- eggs = setuptools
- ''') % {
- 'initialization': initialization,
- 'site-packages': site_packages_dir})
- system(os.path.join(buildout, 'bin', 'buildout'))
- os.chdir(old_wd)
- return (
- os.path.join(buildout, 'bin', 'py'), site_packages_dir)
- test.globs.update(dict(
- sample_buildout = sample,
- ls = ls,
- cat = cat,
- mkdir = mkdir,
- rmdir = rmdir,
- remove = remove,
- tmpdir = tmpdir,
- write = write,
- system = system,
- call_py = call_py,
- get = get,
- cd = (lambda *path: os.chdir(os.path.join(*path))),
- join = os.path.join,
- sdist = sdist,
- bdist_egg = bdist_egg,
- start_server = start_server,
- buildout = os.path.join(sample, 'bin', 'buildout'),
- wait_until = wait_until,
- make_py = make_py
- ))
-def buildoutTearDown(test):
- for f in test.globs['__tear_downs']:
- f()
-class Server(BaseHTTPServer.HTTPServer):
- def __init__(self, tree, *args):
- BaseHTTPServer.HTTPServer.__init__(self, *args)
- self.tree = os.path.abspath(tree)
- __run = True
- def serve_forever(self):
- while self.__run:
- self.handle_request()
- def handle_error(self, *_):
- self.__run = False
-class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
- Server.__log = False
- def __init__(self, request, address, server):
- self.__server = server
- self.tree = server.tree
- BaseHTTPServer.BaseHTTPRequestHandler.__init__(
- self, request, address, server)
- def do_GET(self):
- if '__stop__' in self.path:
- raise SystemExit
- if self.path == '/enable_server_logging':
- self.__server.__log = True
- self.send_response(200)
- return
- if self.path == '/disable_server_logging':
- self.__server.__log = False
- self.send_response(200)
- return
- path = os.path.abspath(os.path.join(self.tree, *self.path.split('/')))
- if not (
- ((path == self.tree) or path.startswith(self.tree+os.path.sep))
- and
- os.path.exists(path)
- ):
- self.send_response(404, 'Not Found')
- #self.send_response(200)
- out = '<html><body>Not Found</body></html>'
- #out = '\n'.join(self.tree, self.path, path)
- self.send_header('Content-Length', str(len(out)))
- self.send_header('Content-Type', 'text/html')
- self.end_headers()
- self.wfile.write(out)
- return
- self.send_response(200)
- if os.path.isdir(path):
- out = ['<html><body>\n']
- names = os.listdir(path)
- names.sort()
- for name in names:
- if os.path.isdir(os.path.join(path, name)):
- name += '/'
- out.append('<a href="%s">%s</a><br>\n' % (name, name))
- out.append('</body></html>\n')
- out = ''.join(out)
- self.send_header('Content-Length', str(len(out)))
- self.send_header('Content-Type', 'text/html')
- else:
- out = open(path, 'rb').read()
- self.send_header('Content-Length', len(out))
- if path.endswith('.egg'):
- self.send_header('Content-Type', 'application/zip')
- elif path.endswith('.gz'):
- self.send_header('Content-Type', 'application/x-gzip')
- elif path.endswith('.zip'):
- self.send_header('Content-Type', 'application/x-gzip')
- else:
- self.send_header('Content-Type', 'text/html')
- self.end_headers()
- self.wfile.write(out)
- def log_request(self, code):
- if self.__server.__log:
- print '%s %s %s' % (self.command, code, self.path)
-def _run(tree, port):
- server_address = ('localhost', port)
- httpd = Server(tree, server_address, Handler)
- httpd.serve_forever()
-def get_port():
- for i in range(10):
- port = random.randrange(20000, 30000)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- try:
- s.connect(('localhost', port))
- except socket.error:
- return port
- finally:
- s.close()
- raise RuntimeError, "Can't find port"
-def _start_server(tree, name=''):
- port = get_port()
- thread = threading.Thread(target=_run, args=(tree, port), name=name)
- thread.setDaemon(True)
- thread.start()
- wait(port, up=True)
- return port, thread
-def start_server(tree):
- return _start_server(tree)[0]
-def stop_server(url, thread=None):
- try:
- urllib2.urlopen(url+'__stop__')
- except Exception:
- pass
- if thread is not None:
- thread.join() # wait for thread to stop
-def wait(port, up):
- addr = 'localhost', port
- for i in range(120):
- time.sleep(0.25)
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(addr)
- s.close()
- if up:
- break
- except socket.error, e:
- if e[0] not in (errno.ECONNREFUSED, errno.ECONNRESET):
- raise
- s.close()
- if not up:
- break
- else:
- if up:
- raise
- else:
- raise SystemError("Couln't stop server")
-def install(project, destination):
- if not isinstance(destination, basestring):
- destination = os.path.join(destination.globs['sample_buildout'],
- 'eggs')
- dist = pkg_resources.working_set.find(
- pkg_resources.Requirement.parse(project))
- if dist.location.endswith('.egg'):
- destination = os.path.join(destination,
- os.path.basename(dist.location),
- )
- if os.path.isdir(dist.location):
- shutil.copytree(dist.location, destination)
- else:
- shutil.copyfile(dist.location, destination)
- else:
- # copy link
- open(os.path.join(destination, project+'.egg-link'), 'w'
- ).write(dist.location)
-def install_develop(project, destination):
- if not isinstance(destination, basestring):
- destination = os.path.join(destination.globs['sample_buildout'],
- 'develop-eggs')
- dist = pkg_resources.working_set.find(
- pkg_resources.Requirement.parse(project))
- open(os.path.join(destination, project+'.egg-link'), 'w'
- ).write(dist.location)
-def _normalize_path(match):
- path =
- if os.path.sep == '\\':
- path = path.replace('\\\\', '/')
- if path.startswith('\\'):
- path = path[1:]
- return '/' + path.replace(os.path.sep, '/')
-if sys.platform == 'win32':
- sep = r'[\\/]' # Windows uses both sometimes.
- sep = re.escape(os.path.sep)
-normalize_path = (
- re.compile(
- r'''[^'" \t\n\r!]+%(sep)s_[Tt][Ee][Ss][Tt]_%(sep)s([^"' \t\n\r]+)'''
- % dict(sep=sep)),
- _normalize_path,
- )
-normalize_endings = re.compile('\r\n'), '\n'
-normalize_script = (
- re.compile('(\n?)- ([a-zA-Z_.-]+)\n- \\2.exe\n'),
- '\\1- \\2\n')
-normalize_egg_py = (
- re.compile('-py\d[.]\d(-\S+)?.egg'),
- '-pyN.N.egg',
- )