diff options
Diffstat (limited to 'Windows/dateutil/test/_common.py')
-rw-r--r-- | Windows/dateutil/test/_common.py | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/Windows/dateutil/test/_common.py b/Windows/dateutil/test/_common.py new file mode 100644 index 00000000..264dfbda --- /dev/null +++ b/Windows/dateutil/test/_common.py @@ -0,0 +1,275 @@ +from __future__ import unicode_literals +import os +import time +import subprocess +import warnings +import tempfile +import pickle + + +class WarningTestMixin(object): + # Based on https://stackoverflow.com/a/12935176/467366 + class _AssertWarnsContext(warnings.catch_warnings): + def __init__(self, expected_warnings, parent, **kwargs): + super(WarningTestMixin._AssertWarnsContext, self).__init__(**kwargs) + + self.parent = parent + try: + self.expected_warnings = list(expected_warnings) + except TypeError: + self.expected_warnings = [expected_warnings] + + self._warning_log = [] + + def __enter__(self, *args, **kwargs): + rv = super(WarningTestMixin._AssertWarnsContext, self).__enter__(*args, **kwargs) + + if self._showwarning is not self._module.showwarning: + super_showwarning = self._module.showwarning + else: + super_showwarning = None + + def showwarning(*args, **kwargs): + if super_showwarning is not None: + super_showwarning(*args, **kwargs) + + self._warning_log.append(warnings.WarningMessage(*args, **kwargs)) + + self._module.showwarning = showwarning + return rv + + def __exit__(self, *args, **kwargs): + super(WarningTestMixin._AssertWarnsContext, self).__exit__(self, *args, **kwargs) + + self.parent.assertTrue(any(issubclass(item.category, warning) + for warning in self.expected_warnings + for item in self._warning_log)) + + def assertWarns(self, warning, callable=None, *args, **kwargs): + warnings.simplefilter('always') + context = self.__class__._AssertWarnsContext(warning, self) + if callable is None: + return context + else: + with context: + callable(*args, **kwargs) + + +class PicklableMixin(object): + def _get_nobj_bytes(self, obj, dump_kwargs, load_kwargs): + """ + Pickle and unpickle an object using ``pickle.dumps`` / ``pickle.loads`` + """ + pkl = pickle.dumps(obj, **dump_kwargs) + return pickle.loads(pkl, **load_kwargs) + + def _get_nobj_file(self, obj, dump_kwargs, load_kwargs): + """ + Pickle and unpickle an object using ``pickle.dump`` / ``pickle.load`` on + a temporary file. + """ + with tempfile.TemporaryFile('w+b') as pkl: + pickle.dump(obj, pkl, **dump_kwargs) + pkl.seek(0) # Reset the file to the beginning to read it + nobj = pickle.load(pkl, **load_kwargs) + + return nobj + + def assertPicklable(self, obj, singleton=False, asfile=False, + dump_kwargs=None, load_kwargs=None): + """ + Assert that an object can be pickled and unpickled. This assertion + assumes that the desired behavior is that the unpickled object compares + equal to the original object, but is not the same object. + """ + get_nobj = self._get_nobj_file if asfile else self._get_nobj_bytes + dump_kwargs = dump_kwargs or {} + load_kwargs = load_kwargs or {} + + nobj = get_nobj(obj, dump_kwargs, load_kwargs) + if not singleton: + self.assertIsNot(obj, nobj) + self.assertEqual(obj, nobj) + + +class TZContextBase(object): + """ + Base class for a context manager which allows changing of time zones. + + Subclasses may define a guard variable to either block or or allow time + zone changes by redefining ``_guard_var_name`` and ``_guard_allows_change``. + The default is that the guard variable must be affirmatively set. + + Subclasses must define ``get_current_tz`` and ``set_current_tz``. + """ + _guard_var_name = "DATEUTIL_MAY_CHANGE_TZ" + _guard_allows_change = True + + def __init__(self, tzval): + self.tzval = tzval + self._old_tz = None + + @classmethod + def tz_change_allowed(cls): + """ + Class method used to query whether or not this class allows time zone + changes. + """ + guard = bool(os.environ.get(cls._guard_var_name, False)) + + # _guard_allows_change gives the "default" behavior - if True, the + # guard is overcoming a block. If false, the guard is causing a block. + # Whether tz_change is allowed is therefore the XNOR of the two. + return guard == cls._guard_allows_change + + @classmethod + def tz_change_disallowed_message(cls): + """ Generate instructions on how to allow tz changes """ + msg = ('Changing time zone not allowed. Set {envar} to {gval} ' + 'if you would like to allow this behavior') + + return msg.format(envar=cls._guard_var_name, + gval=cls._guard_allows_change) + + def __enter__(self): + if not self.tz_change_allowed(): + raise ValueError(self.tz_change_disallowed_message()) + + self._old_tz = self.get_current_tz() + self.set_current_tz(self.tzval) + + def __exit__(self, type, value, traceback): + if self._old_tz is not None: + self.set_current_tz(self._old_tz) + + self._old_tz = None + + def get_current_tz(self): + raise NotImplementedError + + def set_current_tz(self): + raise NotImplementedError + + +class TZEnvContext(TZContextBase): + """ + Context manager that temporarily sets the `TZ` variable (for use on + *nix-like systems). Because the effect is local to the shell anyway, this + will apply *unless* a guard is set. + + If you do not want the TZ environment variable set, you may set the + ``DATEUTIL_MAY_NOT_CHANGE_TZ_VAR`` variable to a truthy value. + """ + _guard_var_name = "DATEUTIL_MAY_NOT_CHANGE_TZ_VAR" + _guard_allows_change = False + + def get_current_tz(self): + return os.environ.get('TZ', UnsetTz) + + def set_current_tz(self, tzval): + if tzval is UnsetTz and 'TZ' in os.environ: + del os.environ['TZ'] + else: + os.environ['TZ'] = tzval + + time.tzset() + + +class TZWinContext(TZContextBase): + """ + Context manager for changing local time zone on Windows. + + Because the effect of this is system-wide and global, it may have + unintended side effect. Set the ``DATEUTIL_MAY_CHANGE_TZ`` environment + variable to a truthy value before using this context manager. + """ + def get_current_tz(self): + p = subprocess.Popen(['tzutil', '/g'], stdout=subprocess.PIPE) + + ctzname, err = p.communicate() + ctzname = ctzname.decode() # Popen returns + + if p.returncode: + raise OSError('Failed to get current time zone: ' + err) + + return ctzname + + def set_current_tz(self, tzname): + p = subprocess.Popen('tzutil /s "' + tzname + '"') + + out, err = p.communicate() + + if p.returncode: + raise OSError('Failed to set current time zone: ' + + (err or 'Unknown error.')) + + +### +# Utility classes +class NotAValueClass(object): + """ + A class analogous to NaN that has operations defined for any type. + """ + def _op(self, other): + return self # Operation with NotAValue returns NotAValue + + def _cmp(self, other): + return False + + __add__ = __radd__ = _op + __sub__ = __rsub__ = _op + __mul__ = __rmul__ = _op + __div__ = __rdiv__ = _op + __truediv__ = __rtruediv__ = _op + __floordiv__ = __rfloordiv__ = _op + + __lt__ = __rlt__ = _op + __gt__ = __rgt__ = _op + __eq__ = __req__ = _op + __le__ = __rle__ = _op + __ge__ = __rge__ = _op + + +NotAValue = NotAValueClass() + + +class ComparesEqualClass(object): + """ + A class that is always equal to whatever you compare it to. + """ + + def __eq__(self, other): + return True + + def __ne__(self, other): + return False + + def __le__(self, other): + return True + + def __ge__(self, other): + return True + + def __lt__(self, other): + return False + + def __gt__(self, other): + return False + + __req__ = __eq__ + __rne__ = __ne__ + __rle__ = __le__ + __rge__ = __ge__ + __rlt__ = __lt__ + __rgt__ = __gt__ + + +ComparesEqual = ComparesEqualClass() + + +class UnsetTzClass(object): + """ Sentinel class for unset time zone variable """ + pass + + +UnsetTz = UnsetTzClass() |