summaryrefslogtreecommitdiff
path: root/Windows/dateutil/test/_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'Windows/dateutil/test/_common.py')
-rw-r--r--Windows/dateutil/test/_common.py275
1 files changed, 275 insertions, 0 deletions
diff --git a/Windows/dateutil/test/_common.py b/Windows/dateutil/test/_common.py
new file mode 100644
index 00000000..264dfbda
--- /dev/null
+++ b/Windows/dateutil/test/_common.py
@@ -0,0 +1,275 @@
+from __future__ import unicode_literals
+import os
+import time
+import subprocess
+import warnings
+import tempfile
+import pickle
+
+
+class WarningTestMixin(object):
+ # Based on https://stackoverflow.com/a/12935176/467366
+ class _AssertWarnsContext(warnings.catch_warnings):
+ def __init__(self, expected_warnings, parent, **kwargs):
+ super(WarningTestMixin._AssertWarnsContext, self).__init__(**kwargs)
+
+ self.parent = parent
+ try:
+ self.expected_warnings = list(expected_warnings)
+ except TypeError:
+ self.expected_warnings = [expected_warnings]
+
+ self._warning_log = []
+
+ def __enter__(self, *args, **kwargs):
+ rv = super(WarningTestMixin._AssertWarnsContext, self).__enter__(*args, **kwargs)
+
+ if self._showwarning is not self._module.showwarning:
+ super_showwarning = self._module.showwarning
+ else:
+ super_showwarning = None
+
+ def showwarning(*args, **kwargs):
+ if super_showwarning is not None:
+ super_showwarning(*args, **kwargs)
+
+ self._warning_log.append(warnings.WarningMessage(*args, **kwargs))
+
+ self._module.showwarning = showwarning
+ return rv
+
+ def __exit__(self, *args, **kwargs):
+ super(WarningTestMixin._AssertWarnsContext, self).__exit__(self, *args, **kwargs)
+
+ self.parent.assertTrue(any(issubclass(item.category, warning)
+ for warning in self.expected_warnings
+ for item in self._warning_log))
+
+ def assertWarns(self, warning, callable=None, *args, **kwargs):
+ warnings.simplefilter('always')
+ context = self.__class__._AssertWarnsContext(warning, self)
+ if callable is None:
+ return context
+ else:
+ with context:
+ callable(*args, **kwargs)
+
+
+class PicklableMixin(object):
+ def _get_nobj_bytes(self, obj, dump_kwargs, load_kwargs):
+ """
+ Pickle and unpickle an object using ``pickle.dumps`` / ``pickle.loads``
+ """
+ pkl = pickle.dumps(obj, **dump_kwargs)
+ return pickle.loads(pkl, **load_kwargs)
+
+ def _get_nobj_file(self, obj, dump_kwargs, load_kwargs):
+ """
+ Pickle and unpickle an object using ``pickle.dump`` / ``pickle.load`` on
+ a temporary file.
+ """
+ with tempfile.TemporaryFile('w+b') as pkl:
+ pickle.dump(obj, pkl, **dump_kwargs)
+ pkl.seek(0) # Reset the file to the beginning to read it
+ nobj = pickle.load(pkl, **load_kwargs)
+
+ return nobj
+
+ def assertPicklable(self, obj, singleton=False, asfile=False,
+ dump_kwargs=None, load_kwargs=None):
+ """
+ Assert that an object can be pickled and unpickled. This assertion
+ assumes that the desired behavior is that the unpickled object compares
+ equal to the original object, but is not the same object.
+ """
+ get_nobj = self._get_nobj_file if asfile else self._get_nobj_bytes
+ dump_kwargs = dump_kwargs or {}
+ load_kwargs = load_kwargs or {}
+
+ nobj = get_nobj(obj, dump_kwargs, load_kwargs)
+ if not singleton:
+ self.assertIsNot(obj, nobj)
+ self.assertEqual(obj, nobj)
+
+
+class TZContextBase(object):
+ """
+ Base class for a context manager which allows changing of time zones.
+
+ Subclasses may define a guard variable to either block or or allow time
+ zone changes by redefining ``_guard_var_name`` and ``_guard_allows_change``.
+ The default is that the guard variable must be affirmatively set.
+
+ Subclasses must define ``get_current_tz`` and ``set_current_tz``.
+ """
+ _guard_var_name = "DATEUTIL_MAY_CHANGE_TZ"
+ _guard_allows_change = True
+
+ def __init__(self, tzval):
+ self.tzval = tzval
+ self._old_tz = None
+
+ @classmethod
+ def tz_change_allowed(cls):
+ """
+ Class method used to query whether or not this class allows time zone
+ changes.
+ """
+ guard = bool(os.environ.get(cls._guard_var_name, False))
+
+ # _guard_allows_change gives the "default" behavior - if True, the
+ # guard is overcoming a block. If false, the guard is causing a block.
+ # Whether tz_change is allowed is therefore the XNOR of the two.
+ return guard == cls._guard_allows_change
+
+ @classmethod
+ def tz_change_disallowed_message(cls):
+ """ Generate instructions on how to allow tz changes """
+ msg = ('Changing time zone not allowed. Set {envar} to {gval} '
+ 'if you would like to allow this behavior')
+
+ return msg.format(envar=cls._guard_var_name,
+ gval=cls._guard_allows_change)
+
+ def __enter__(self):
+ if not self.tz_change_allowed():
+ raise ValueError(self.tz_change_disallowed_message())
+
+ self._old_tz = self.get_current_tz()
+ self.set_current_tz(self.tzval)
+
+ def __exit__(self, type, value, traceback):
+ if self._old_tz is not None:
+ self.set_current_tz(self._old_tz)
+
+ self._old_tz = None
+
+ def get_current_tz(self):
+ raise NotImplementedError
+
+ def set_current_tz(self):
+ raise NotImplementedError
+
+
+class TZEnvContext(TZContextBase):
+ """
+ Context manager that temporarily sets the `TZ` variable (for use on
+ *nix-like systems). Because the effect is local to the shell anyway, this
+ will apply *unless* a guard is set.
+
+ If you do not want the TZ environment variable set, you may set the
+ ``DATEUTIL_MAY_NOT_CHANGE_TZ_VAR`` variable to a truthy value.
+ """
+ _guard_var_name = "DATEUTIL_MAY_NOT_CHANGE_TZ_VAR"
+ _guard_allows_change = False
+
+ def get_current_tz(self):
+ return os.environ.get('TZ', UnsetTz)
+
+ def set_current_tz(self, tzval):
+ if tzval is UnsetTz and 'TZ' in os.environ:
+ del os.environ['TZ']
+ else:
+ os.environ['TZ'] = tzval
+
+ time.tzset()
+
+
+class TZWinContext(TZContextBase):
+ """
+ Context manager for changing local time zone on Windows.
+
+ Because the effect of this is system-wide and global, it may have
+ unintended side effect. Set the ``DATEUTIL_MAY_CHANGE_TZ`` environment
+ variable to a truthy value before using this context manager.
+ """
+ def get_current_tz(self):
+ p = subprocess.Popen(['tzutil', '/g'], stdout=subprocess.PIPE)
+
+ ctzname, err = p.communicate()
+ ctzname = ctzname.decode() # Popen returns
+
+ if p.returncode:
+ raise OSError('Failed to get current time zone: ' + err)
+
+ return ctzname
+
+ def set_current_tz(self, tzname):
+ p = subprocess.Popen('tzutil /s "' + tzname + '"')
+
+ out, err = p.communicate()
+
+ if p.returncode:
+ raise OSError('Failed to set current time zone: ' +
+ (err or 'Unknown error.'))
+
+
+###
+# Utility classes
+class NotAValueClass(object):
+ """
+ A class analogous to NaN that has operations defined for any type.
+ """
+ def _op(self, other):
+ return self # Operation with NotAValue returns NotAValue
+
+ def _cmp(self, other):
+ return False
+
+ __add__ = __radd__ = _op
+ __sub__ = __rsub__ = _op
+ __mul__ = __rmul__ = _op
+ __div__ = __rdiv__ = _op
+ __truediv__ = __rtruediv__ = _op
+ __floordiv__ = __rfloordiv__ = _op
+
+ __lt__ = __rlt__ = _op
+ __gt__ = __rgt__ = _op
+ __eq__ = __req__ = _op
+ __le__ = __rle__ = _op
+ __ge__ = __rge__ = _op
+
+
+NotAValue = NotAValueClass()
+
+
+class ComparesEqualClass(object):
+ """
+ A class that is always equal to whatever you compare it to.
+ """
+
+ def __eq__(self, other):
+ return True
+
+ def __ne__(self, other):
+ return False
+
+ def __le__(self, other):
+ return True
+
+ def __ge__(self, other):
+ return True
+
+ def __lt__(self, other):
+ return False
+
+ def __gt__(self, other):
+ return False
+
+ __req__ = __eq__
+ __rne__ = __ne__
+ __rle__ = __le__
+ __rge__ = __ge__
+ __rlt__ = __lt__
+ __rgt__ = __gt__
+
+
+ComparesEqual = ComparesEqualClass()
+
+
+class UnsetTzClass(object):
+ """ Sentinel class for unset time zone variable """
+ pass
+
+
+UnsetTz = UnsetTzClass()