diff options
Diffstat (limited to 'lib/python2.7/site-packages/south/creator/changes.py')
-rw-r--r-- | lib/python2.7/site-packages/south/creator/changes.py | 506 |
1 files changed, 506 insertions, 0 deletions
diff --git a/lib/python2.7/site-packages/south/creator/changes.py b/lib/python2.7/site-packages/south/creator/changes.py new file mode 100644 index 0000000..6cdbd19 --- /dev/null +++ b/lib/python2.7/site-packages/south/creator/changes.py @@ -0,0 +1,506 @@ +""" +Contains things to detect changes - either using options passed in on the +commandline, or by using autodetection, etc. +""" + +from __future__ import print_function + +from django.db import models +from django.contrib.contenttypes.generic import GenericRelation +from django.utils.datastructures import SortedDict + +from south.creator.freezer import remove_useless_attributes, freeze_apps, model_key +from south.utils import auto_through +from south.utils.py3 import string_types + +class BaseChanges(object): + """ + Base changes class. + """ + def suggest_name(self): + return '' + + def split_model_def(self, model, model_def): + """ + Given a model and its model def (a dict of field: triple), returns three + items: the real fields dict, the Meta dict, and the M2M fields dict. + """ + real_fields = SortedDict() + meta = SortedDict() + m2m_fields = SortedDict() + for name, triple in model_def.items(): + if name == "Meta": + meta = triple + elif isinstance(model._meta.get_field_by_name(name)[0], models.ManyToManyField): + m2m_fields[name] = triple + else: + real_fields[name] = triple + return real_fields, meta, m2m_fields + + def current_model_from_key(self, key): + app_label, model_name = key.split(".") + return models.get_model(app_label, model_name) + + def current_field_from_key(self, key, fieldname): + app_label, model_name = key.split(".") + # Special, for the magical field from order_with_respect_to + if fieldname == "_order": + field = models.IntegerField() + field.name = "_order" + field.attname = "_order" + field.column = "_order" + field.default = 0 + return field + # Otherwise, normal. + return models.get_model(app_label, model_name)._meta.get_field_by_name(fieldname)[0] + + +class AutoChanges(BaseChanges): + """ + Detects changes by 'diffing' two sets of frozen model definitions. + """ + + # Field types we don't generate add/remove field changes for. + IGNORED_FIELD_TYPES = [ + GenericRelation, + ] + + def __init__(self, migrations, old_defs, old_orm, new_defs): + self.migrations = migrations + self.old_defs = old_defs + self.old_orm = old_orm + self.new_defs = new_defs + + def suggest_name(self): + parts = ["auto"] + for change_name, params in self.get_changes(): + if change_name == "AddModel": + parts.append("add_%s" % params['model']._meta.object_name.lower()) + elif change_name == "DeleteModel": + parts.append("del_%s" % params['model']._meta.object_name.lower()) + elif change_name == "AddField": + parts.append("add_field_%s_%s" % ( + params['model']._meta.object_name.lower(), + params['field'].name, + )) + elif change_name == "DeleteField": + parts.append("del_field_%s_%s" % ( + params['model']._meta.object_name.lower(), + params['field'].name, + )) + elif change_name == "ChangeField": + parts.append("chg_field_%s_%s" % ( + params['model']._meta.object_name.lower(), + params['new_field'].name, + )) + elif change_name == "AddUnique": + parts.append("add_unique_%s_%s" % ( + params['model']._meta.object_name.lower(), + "_".join([x.name for x in params['fields']]), + )) + elif change_name == "DeleteUnique": + parts.append("del_unique_%s_%s" % ( + params['model']._meta.object_name.lower(), + "_".join([x.name for x in params['fields']]), + )) + elif change_name == "AddIndex": + parts.append("add_index_%s_%s" % ( + params['model']._meta.object_name.lower(), + "_".join([x.name for x in params['fields']]), + )) + elif change_name == "DeleteIndex": + parts.append("del_index_%s_%s" % ( + params['model']._meta.object_name.lower(), + "_".join([x.name for x in params['fields']]), + )) + return ("__".join(parts))[:70] + + def get_changes(self): + """ + Returns the difference between the old and new sets of models as a 5-tuple: + added_models, deleted_models, added_fields, deleted_fields, changed_fields + """ + + deleted_models = set() + + # See if anything's vanished + for key in self.old_defs: + if key not in self.new_defs: + # We shouldn't delete it if it was managed=False + old_fields, old_meta, old_m2ms = self.split_model_def(self.old_orm[key], self.old_defs[key]) + if old_meta.get("managed", "True") != "False": + # Alright, delete it. + yield ("DeleteModel", { + "model": self.old_orm[key], + "model_def": old_fields, + }) + # Also make sure we delete any M2Ms it had. + for fieldname in old_m2ms: + # Only delete its stuff if it wasn't a through=. + field = self.old_orm[key + ":" + fieldname] + if auto_through(field): + yield ("DeleteM2M", {"model": self.old_orm[key], "field": field}) + # And any index/uniqueness constraints it had + for attr, operation in (("unique_together", "DeleteUnique"), ("index_together", "DeleteIndex")): + together = eval(old_meta.get(attr, "[]")) + if together: + # If it's only a single tuple, make it into the longer one + if isinstance(together[0], string_types): + together = [together] + # For each combination, make an action for it + for fields in together: + yield (operation, { + "model": self.old_orm[key], + "fields": [self.old_orm[key]._meta.get_field_by_name(x)[0] for x in fields], + }) + # We always add it in here so we ignore it later + deleted_models.add(key) + + # Or appeared + for key in self.new_defs: + if key not in self.old_defs: + # We shouldn't add it if it's managed=False + new_fields, new_meta, new_m2ms = self.split_model_def(self.current_model_from_key(key), self.new_defs[key]) + if new_meta.get("managed", "True") != "False": + yield ("AddModel", { + "model": self.current_model_from_key(key), + "model_def": new_fields, + }) + # Also make sure we add any M2Ms it has. + for fieldname in new_m2ms: + # Only create its stuff if it wasn't a through=. + field = self.current_field_from_key(key, fieldname) + if auto_through(field): + yield ("AddM2M", {"model": self.current_model_from_key(key), "field": field}) + # And any index/uniqueness constraints it has + for attr, operation in (("unique_together", "AddUnique"), ("index_together", "AddIndex")): + together = eval(new_meta.get(attr, "[]")) + if together: + # If it's only a single tuple, make it into the longer one + if isinstance(together[0], string_types): + together = [together] + # For each combination, make an action for it + for fields in together: + yield (operation, { + "model": self.current_model_from_key(key), + "fields": [self.current_model_from_key(key)._meta.get_field_by_name(x)[0] for x in fields], + }) + + # Now, for every model that's stayed the same, check its fields. + for key in self.old_defs: + if key not in deleted_models: + + old_fields, old_meta, old_m2ms = self.split_model_def(self.old_orm[key], self.old_defs[key]) + new_fields, new_meta, new_m2ms = self.split_model_def(self.current_model_from_key(key), self.new_defs[key]) + + # Do nothing for models which are now not managed. + if new_meta.get("managed", "True") == "False": + continue + + # Find fields that have vanished. + for fieldname in old_fields: + if fieldname not in new_fields: + # Don't do it for any fields we're ignoring + field = self.old_orm[key + ":" + fieldname] + field_allowed = True + for field_type in self.IGNORED_FIELD_TYPES: + if isinstance(field, field_type): + field_allowed = False + if field_allowed: + # Looks alright. + yield ("DeleteField", { + "model": self.old_orm[key], + "field": field, + "field_def": old_fields[fieldname], + }) + + # And ones that have appeared + for fieldname in new_fields: + if fieldname not in old_fields: + # Don't do it for any fields we're ignoring + field = self.current_field_from_key(key, fieldname) + field_allowed = True + for field_type in self.IGNORED_FIELD_TYPES: + if isinstance(field, field_type): + field_allowed = False + if field_allowed: + # Looks alright. + yield ("AddField", { + "model": self.current_model_from_key(key), + "field": field, + "field_def": new_fields[fieldname], + }) + + # Find M2Ms that have vanished + for fieldname in old_m2ms: + if fieldname not in new_m2ms: + # Only delete its stuff if it wasn't a through=. + field = self.old_orm[key + ":" + fieldname] + if auto_through(field): + yield ("DeleteM2M", {"model": self.old_orm[key], "field": field}) + + # Find M2Ms that have appeared + for fieldname in new_m2ms: + if fieldname not in old_m2ms: + # Only create its stuff if it wasn't a through=. + field = self.current_field_from_key(key, fieldname) + if auto_through(field): + yield ("AddM2M", {"model": self.current_model_from_key(key), "field": field}) + + # For the ones that exist in both models, see if they were changed + for fieldname in set(old_fields).intersection(set(new_fields)): + # Non-index changes + if self.different_attributes( + remove_useless_attributes(old_fields[fieldname], True, True), + remove_useless_attributes(new_fields[fieldname], True, True)): + yield ("ChangeField", { + "model": self.current_model_from_key(key), + "old_field": self.old_orm[key + ":" + fieldname], + "new_field": self.current_field_from_key(key, fieldname), + "old_def": old_fields[fieldname], + "new_def": new_fields[fieldname], + }) + # Index changes + old_field = self.old_orm[key + ":" + fieldname] + new_field = self.current_field_from_key(key, fieldname) + if not old_field.db_index and new_field.db_index: + # They've added an index. + yield ("AddIndex", { + "model": self.current_model_from_key(key), + "fields": [new_field], + }) + if old_field.db_index and not new_field.db_index: + # They've removed an index. + yield ("DeleteIndex", { + "model": self.old_orm[key], + "fields": [old_field], + }) + # See if their uniques have changed + if old_field.unique != new_field.unique: + # Make sure we look at the one explicitly given to see what happened + if new_field.unique: + yield ("AddUnique", { + "model": self.current_model_from_key(key), + "fields": [new_field], + }) + else: + yield ("DeleteUnique", { + "model": self.old_orm[key], + "fields": [old_field], + }) + + # See if there's any M2Ms that have changed. + for fieldname in set(old_m2ms).intersection(set(new_m2ms)): + old_field = self.old_orm[key + ":" + fieldname] + new_field = self.current_field_from_key(key, fieldname) + # Have they _added_ a through= ? + if auto_through(old_field) and not auto_through(new_field): + yield ("DeleteM2M", {"model": self.old_orm[key], "field": old_field}) + # Have they _removed_ a through= ? + if not auto_through(old_field) and auto_through(new_field): + yield ("AddM2M", {"model": self.current_model_from_key(key), "field": new_field}) + + ## See if the {index,unique}_togethers have changed + for attr, add_operation, del_operation in (("unique_together", "AddUnique", "DeleteUnique"), ("index_together", "AddIndex", "DeleteIndex")): + # First, normalise them into lists of sets. + old_together = eval(old_meta.get(attr, "[]")) + new_together = eval(new_meta.get(attr, "[]")) + if old_together and isinstance(old_together[0], string_types): + old_together = [old_together] + if new_together and isinstance(new_together[0], string_types): + new_together = [new_together] + old_together = frozenset(tuple(o) for o in old_together) + new_together = frozenset(tuple(n) for n in new_together) + # See if any appeared or disappeared + disappeared = old_together.difference(new_together) + appeared = new_together.difference(old_together) + for item in disappeared: + yield (del_operation, { + "model": self.old_orm[key], + "fields": [self.old_orm[key + ":" + x] for x in item], + }) + for item in appeared: + yield (add_operation, { + "model": self.current_model_from_key(key), + "fields": [self.current_field_from_key(key, x) for x in item], + }) + + @classmethod + def is_triple(cls, triple): + "Returns whether the argument is a triple." + return isinstance(triple, (list, tuple)) and len(triple) == 3 and \ + isinstance(triple[0], string_types) and \ + isinstance(triple[1], (list, tuple)) and \ + isinstance(triple[2], dict) + + @classmethod + def different_attributes(cls, old, new): + """ + Backwards-compat comparison that ignores orm. on the RHS and not the left + and which knows django.db.models.fields.CharField = models.CharField. + Has a whole load of tests in tests/autodetection.py. + """ + + # If they're not triples, just do normal comparison + if not cls.is_triple(old) or not cls.is_triple(new): + return old != new + + # Expand them out into parts + old_field, old_pos, old_kwd = old + new_field, new_pos, new_kwd = new + + # Copy the positional and keyword arguments so we can compare them and pop off things + old_pos, new_pos = old_pos[:], new_pos[:] + old_kwd = dict(old_kwd.items()) + new_kwd = dict(new_kwd.items()) + + # Remove comparison of the existence of 'unique', that's done elsewhere. + # TODO: Make this work for custom fields where unique= means something else? + if "unique" in old_kwd: + del old_kwd['unique'] + if "unique" in new_kwd: + del new_kwd['unique'] + + # If the first bit is different, check it's not by dj.db.models... + if old_field != new_field: + if old_field.startswith("models.") and (new_field.startswith("django.db.models") \ + or new_field.startswith("django.contrib.gis")): + if old_field.split(".")[-1] != new_field.split(".")[-1]: + return True + else: + # Remove those fields from the final comparison + old_field = new_field = "" + + # If there's a positional argument in the first, and a 'to' in the second, + # see if they're actually comparable. + if (old_pos and "to" in new_kwd) and ("orm" in new_kwd['to'] and "orm" not in old_pos[0]): + # Do special comparison to fix #153 + try: + if old_pos[0] != new_kwd['to'].split("'")[1].split(".")[1]: + return True + except IndexError: + pass # Fall back to next comparison + # Remove those attrs from the final comparison + old_pos = old_pos[1:] + del new_kwd['to'] + + return old_field != new_field or old_pos != new_pos or old_kwd != new_kwd + + +class ManualChanges(BaseChanges): + """ + Detects changes by reading the command line. + """ + + def __init__(self, migrations, added_models, added_fields, added_indexes): + self.migrations = migrations + self.added_models = added_models + self.added_fields = added_fields + self.added_indexes = added_indexes + + def suggest_name(self): + bits = [] + for model_name in self.added_models: + bits.append('add_model_%s' % model_name) + for field_name in self.added_fields: + bits.append('add_field_%s' % field_name) + for index_name in self.added_indexes: + bits.append('add_index_%s' % index_name) + return '_'.join(bits).replace('.', '_') + + def get_changes(self): + # Get the model defs so we can use them for the yield later + model_defs = freeze_apps([self.migrations.app_label()]) + # Make the model changes + for model_name in self.added_models: + model = models.get_model(self.migrations.app_label(), model_name) + real_fields, meta, m2m_fields = self.split_model_def(model, model_defs[model_key(model)]) + yield ("AddModel", { + "model": model, + "model_def": real_fields, + }) + # And the field changes + for field_desc in self.added_fields: + try: + model_name, field_name = field_desc.split(".") + except (TypeError, ValueError): + raise ValueError("%r is not a valid field description." % field_desc) + model = models.get_model(self.migrations.app_label(), model_name) + real_fields, meta, m2m_fields = self.split_model_def(model, model_defs[model_key(model)]) + yield ("AddField", { + "model": model, + "field": model._meta.get_field_by_name(field_name)[0], + "field_def": real_fields[field_name], + }) + # And the indexes + for field_desc in self.added_indexes: + try: + model_name, field_name = field_desc.split(".") + except (TypeError, ValueError): + print("%r is not a valid field description." % field_desc) + model = models.get_model(self.migrations.app_label(), model_name) + yield ("AddIndex", { + "model": model, + "fields": [model._meta.get_field_by_name(field_name)[0]], + }) + + +class InitialChanges(BaseChanges): + """ + Creates all models; handles --initial. + """ + def suggest_name(self): + return 'initial' + + def __init__(self, migrations): + self.migrations = migrations + + def get_changes(self): + # Get the frozen models for this app + model_defs = freeze_apps([self.migrations.app_label()]) + + for model in models.get_models(models.get_app(self.migrations.app_label())): + + # Don't do anything for unmanaged, abstract or proxy models + if model._meta.abstract or getattr(model._meta, "proxy", False) or not getattr(model._meta, "managed", True): + continue + + real_fields, meta, m2m_fields = self.split_model_def(model, model_defs[model_key(model)]) + + # Firstly, add the main table and fields + yield ("AddModel", { + "model": model, + "model_def": real_fields, + }) + + # Then, add any indexing/uniqueness that's around + if meta: + for attr, operation in (("unique_together", "AddUnique"), ("index_together", "AddIndex")): + together = eval(meta.get(attr, "[]")) + if together: + # If it's only a single tuple, make it into the longer one + if isinstance(together[0], string_types): + together = [together] + # For each combination, make an action for it + for fields in together: + yield (operation, { + "model": model, + "fields": [model._meta.get_field_by_name(x)[0] for x in fields], + }) + + # Finally, see if there's some M2M action + for name, triple in m2m_fields.items(): + field = model._meta.get_field_by_name(name)[0] + # But only if it's not through=foo (#120) + if field.rel.through: + try: + # Django 1.1 and below + through_model = field.rel.through_model + except AttributeError: + # Django 1.2 + through_model = field.rel.through + if (not field.rel.through) or getattr(through_model._meta, "auto_created", False): + yield ("AddM2M", { + "model": model, + "field": field, + }) |