############################################################################# # # Copyright (c) 2004-2009 Zope Corporation and Contributors. # All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, # Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE. # ############################################################################## """Various test-support utility functions $Id: testing.py 116197 2010-09-04 00:02:53Z gary $ """ import BaseHTTPServer import errno import logging import os import pkg_resources import random import re import shutil import socket import subprocess import sys import tempfile import textwrap import threading import time import urllib2 import zc.buildout.buildout import zc.buildout.easy_install from zc.buildout.rmtree import rmtree fsync = getattr(os, 'fsync', lambda fileno: None) is_win32 = sys.platform == 'win32' setuptools_location = pkg_resources.working_set.find( pkg_resources.Requirement.parse('setuptools')).location def cat(dir, *names): path = os.path.join(dir, *names) if (not os.path.exists(path) and is_win32 and os.path.exists(path+'-script.py') ): path = path+'-script.py' print open(path).read(), def ls(dir, *subs): if subs: dir = os.path.join(dir, *subs) names = os.listdir(dir) names.sort() for name in names: if os.path.isdir(os.path.join(dir, name)): print 'd ', elif os.path.islink(os.path.join(dir, name)): print 'l ', else: print '- ', print name def mkdir(*path): os.mkdir(os.path.join(*path)) def remove(*path): path = os.path.join(*path) if os.path.isdir(path): shutil.rmtree(path) else: os.remove(path) def rmdir(*path): shutil.rmtree(os.path.join(*path)) def write(dir, *args): path = os.path.join(dir, *(args[:-1])) f = open(path, 'w') f.write(args[-1]) f.flush() fsync(f.fileno()) f.close() ## FIXME - check for other platforms MUST_CLOSE_FDS = not sys.platform.startswith('win') def system(command, input=''): env = dict(os.environ) env['COLUMNS'] = '80' p = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=MUST_CLOSE_FDS, env=env, ) i, o, e = (p.stdin, p.stdout, p.stderr) if input: i.write(input) i.close() result = o.read() + e.read() o.close() e.close() return result def call_py(interpreter, cmd, flags=None): if sys.platform == 'win32': args = ['"%s"' % arg for arg in (interpreter, flags, cmd) if arg] args.insert(-1, '"-c"') return system('"%s"' % ' '.join(args)) else: cmd = repr(cmd) return system( ' '.join(arg for arg in (interpreter, flags, '-c', cmd) if arg)) def get(url): return urllib2.urlopen(url).read() def _runsetup(setup, executable, *args): if os.path.isdir(setup): setup = os.path.join(setup, 'setup.py') d = os.path.dirname(setup) args = [zc.buildout.easy_install._safe_arg(arg) for arg in args] args.insert(0, '-q') env = dict(os.environ) if executable == sys.executable: env['PYTHONPATH'] = setuptools_location # else pass an executable that has setuptools! See testselectingpython.py. args.append(env) here = os.getcwd() try: os.chdir(d) os.spawnle(os.P_WAIT, executable, zc.buildout.easy_install._safe_arg(executable), setup, *args) if os.path.exists('build'): rmtree('build') finally: os.chdir(here) def sdist(setup, dest): _runsetup(setup, sys.executable, 'sdist', '-d', dest, '--formats=zip') def bdist_egg(setup, executable, dest): _runsetup(setup, executable, 'bdist_egg', '-d', dest) def sys_install(setup, dest): _runsetup(setup, sys.executable, 'install', '--install-purelib', dest, '--record', os.path.join(dest, '__added_files__'), '--single-version-externally-managed') def find_python(version): e = os.environ.get('PYTHON%s' % version) if e is not None: return e if is_win32: e = '\Python%s%s\python.exe' % tuple(version.split('.')) if os.path.exists(e): return e else: cmd = 'python%s -c "import sys; print sys.executable"' % version p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=MUST_CLOSE_FDS) i, o = (p.stdin, p.stdout) i.close() e = o.read().strip() o.close() if os.path.exists(e): return e cmd = 'python -c "import sys; print \'%s.%s\' % sys.version_info[:2]"' p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=MUST_CLOSE_FDS) i, o = (p.stdin, p.stdout) i.close() e = o.read().strip() o.close() if e == version: cmd = 'python -c "import sys; print sys.executable"' p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=MUST_CLOSE_FDS) i, o = (p.stdin, p.stdout) i.close() e = o.read().strip() o.close() if os.path.exists(e): return e raise ValueError( "Couldn't figure out the executable for Python %(version)s.\n" "Set the environment variable PYTHON%(version)s to the location\n" "of the Python %(version)s executable before running the tests." % {'version': version}) def wait_until(label, func, *args, **kw): if 'timeout' in kw: kw = dict(kw) timeout = kw.pop('timeout') else: timeout = 30 deadline = time.time()+timeout while time.time() < deadline: if func(*args, **kw): return time.sleep(0.01) raise ValueError('Timed out waiting for: '+label) def get_installer_values(): """Get the current values for the easy_install module. This is necessary because instantiating a Buildout will force the Buildout's values on the installer. Returns a dict of names-values suitable for set_installer_values.""" names = ('default_versions', 'download_cache', 'install_from_cache', 'prefer_final', 'include_site_packages', 'allowed_eggs_from_site_packages', 'use_dependency_links', 'allow_picked_versions', 'always_unzip' ) values = {} for name in names: values[name] = getattr(zc.buildout.easy_install, name)() return values def set_installer_values(values): """Set the given values on the installer.""" for name, value in values.items(): getattr(zc.buildout.easy_install, name)(value) def make_buildout(executable=None): """Make a buildout that uses this version of zc.buildout.""" # Create a basic buildout.cfg to avoid a warning from buildout. open('buildout.cfg', 'w').write( "[buildout]\nparts =\n" ) # Get state of installer defaults so we can reinstate them (instantiating # a Buildout will force the Buildout's defaults on the installer). installer_values = get_installer_values() # Use the buildout bootstrap command to create a buildout config = [ ('buildout', 'log-level', 'WARNING'), # trick bootstrap into putting the buildout develop egg # in the eggs dir. ('buildout', 'develop-eggs-directory', 'eggs'), ] if executable is not None: config.append(('buildout', 'executable', executable)) zc.buildout.buildout.Buildout( 'buildout.cfg', config, user_defaults=False, ).bootstrap([]) # Create the develop-eggs dir, which didn't get created the usual # way due to the trick above: os.mkdir('develop-eggs') # Reinstate the default values of the installer. set_installer_values(installer_values) def buildoutSetUp(test): test.globs['__tear_downs'] = __tear_downs = [] test.globs['register_teardown'] = register_teardown = __tear_downs.append installer_values = get_installer_values() register_teardown( lambda: set_installer_values(installer_values) ) here = os.getcwd() register_teardown(lambda: os.chdir(here)) handlers_before_set_up = logging.getLogger().handlers[:] def restore_root_logger_handlers(): root_logger = logging.getLogger() for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) for handler in handlers_before_set_up: root_logger.addHandler(handler) register_teardown(restore_root_logger_handlers) base = tempfile.mkdtemp('buildoutSetUp') base = os.path.realpath(base) register_teardown(lambda base=base: rmtree(base)) old_home = os.environ.get('HOME') os.environ['HOME'] = os.path.join(base, 'bbbBadHome') def restore_home(): if old_home is None: del os.environ['HOME'] else: os.environ['HOME'] = old_home register_teardown(restore_home) base = os.path.join(base, '_TEST_') os.mkdir(base) tmp = tempfile.mkdtemp('buildouttests') register_teardown(lambda: rmtree(tmp)) zc.buildout.easy_install.default_index_url = 'file://'+tmp os.environ['buildout-testing-index-url'] = ( zc.buildout.easy_install.default_index_url) os.environ.pop('PYTHONPATH', None) def tmpdir(name): path = os.path.join(base, name) mkdir(path) return path sample = tmpdir('sample-buildout') os.chdir(sample) make_buildout() def start_server(path): port, thread = _start_server(path, name=path) url = 'http://localhost:%s/' % port register_teardown(lambda: stop_server(url, thread)) return url def make_py(initialization=''): """Returns paths to new executable and to its site-packages. """ buildout = tmpdir('executable_buildout') site_packages_dir = os.path.join(buildout, 'site-packages') mkdir(site_packages_dir) old_wd = os.getcwd() os.chdir(buildout) make_buildout() # Normally we don't process .pth files in extra-paths. We want to # in this case so that we can test with setuptools system installs # (--single-version-externally-managed), which use .pth files. initialization = ( ('import sys\n' 'import site\n' 'known_paths = set(sys.path)\n' 'site_packages_dir = %r\n' 'site.addsitedir(site_packages_dir, known_paths)\n' ) % (site_packages_dir,)) + initialization initialization = '\n'.join( ' ' + line for line in initialization.split('\n')) install_develop( 'zc.recipe.egg', os.path.join(buildout, 'develop-eggs')) install_develop( 'z3c.recipe.scripts', os.path.join(buildout, 'develop-eggs')) write('buildout.cfg', textwrap.dedent('''\ [buildout] parts = py include-site-packages = false exec-sitecustomize = false [py] recipe = z3c.recipe.scripts interpreter = py initialization = %(initialization)s extra-paths = %(site-packages)s eggs = setuptools ''') % { 'initialization': initialization, 'site-packages': site_packages_dir}) system(os.path.join(buildout, 'bin', 'buildout')) os.chdir(old_wd) return ( os.path.join(buildout, 'bin', 'py'), site_packages_dir) test.globs.update(dict( sample_buildout = sample, ls = ls, cat = cat, mkdir = mkdir, rmdir = rmdir, remove = remove, tmpdir = tmpdir, write = write, system = system, call_py = call_py, get = get, cd = (lambda *path: os.chdir(os.path.join(*path))), join = os.path.join, sdist = sdist, bdist_egg = bdist_egg, start_server = start_server, buildout = os.path.join(sample, 'bin', 'buildout'), wait_until = wait_until, make_py = make_py )) def buildoutTearDown(test): for f in test.globs['__tear_downs']: f() class Server(BaseHTTPServer.HTTPServer): def __init__(self, tree, *args): BaseHTTPServer.HTTPServer.__init__(self, *args) self.tree = os.path.abspath(tree) __run = True def serve_forever(self): while self.__run: self.handle_request() def handle_error(self, *_): self.__run = False class Handler(BaseHTTPServer.BaseHTTPRequestHandler): Server.__log = False def __init__(self, request, address, server): self.__server = server self.tree = server.tree BaseHTTPServer.BaseHTTPRequestHandler.__init__( self, request, address, server) def do_GET(self): if '__stop__' in self.path: raise SystemExit if self.path == '/enable_server_logging': self.__server.__log = True self.send_response(200) return if self.path == '/disable_server_logging': self.__server.__log = False self.send_response(200) return path = os.path.abspath(os.path.join(self.tree, *self.path.split('/'))) if not ( ((path == self.tree) or path.startswith(self.tree+os.path.sep)) and os.path.exists(path) ): self.send_response(404, 'Not Found') #self.send_response(200) out = '
Not Found' #out = '\n'.join(self.tree, self.path, path) self.send_header('Content-Length', str(len(out))) self.send_header('Content-Type', 'text/html') self.end_headers() self.wfile.write(out) return self.send_response(200) if os.path.isdir(path): out = ['\n'] names = os.listdir(path) names.sort() for name in names: if os.path.isdir(os.path.join(path, name)): name += '/' out.append('%s