diff options
Diffstat (limited to 'lib/python2.7/site-packages/south/db/sql_server/pyodbc.py')
-rw-r--r-- | lib/python2.7/site-packages/south/db/sql_server/pyodbc.py | 444 |
1 files changed, 0 insertions, 444 deletions
diff --git a/lib/python2.7/site-packages/south/db/sql_server/pyodbc.py b/lib/python2.7/site-packages/south/db/sql_server/pyodbc.py deleted file mode 100644 index b725ec0..0000000 --- a/lib/python2.7/site-packages/south/db/sql_server/pyodbc.py +++ /dev/null @@ -1,444 +0,0 @@ -from datetime import date, datetime, time -from warnings import warn -from django.db import models -from django.db.models import fields -from south.db import generic -from south.db.generic import delete_column_constraints, invalidate_table_constraints, copy_column_constraints -from south.exceptions import ConstraintDropped -from south.utils.py3 import string_types -try: - from django.utils.encoding import smart_text # Django >= 1.5 -except ImportError: - from django.utils.encoding import smart_unicode as smart_text # Django < 1.5 -from django.core.management.color import no_style - -class DatabaseOperations(generic.DatabaseOperations): - """ - django-pyodbc (sql_server.pyodbc) implementation of database operations. - """ - - backend_name = "pyodbc" - - add_column_string = 'ALTER TABLE %s ADD %s;' - alter_string_set_type = 'ALTER COLUMN %(column)s %(type)s' - alter_string_set_null = 'ALTER COLUMN %(column)s %(type)s NULL' - alter_string_drop_null = 'ALTER COLUMN %(column)s %(type)s NOT NULL' - - allows_combined_alters = False - - drop_index_string = 'DROP INDEX %(index_name)s ON %(table_name)s' - drop_constraint_string = 'ALTER TABLE %(table_name)s DROP CONSTRAINT %(constraint_name)s' - delete_column_string = 'ALTER TABLE %s DROP COLUMN %s' - - #create_check_constraint_sql = "ALTER TABLE %(table)s " + \ - # generic.DatabaseOperations.add_check_constraint_fragment - create_foreign_key_sql = "ALTER TABLE %(table)s ADD CONSTRAINT %(constraint)s " + \ - "FOREIGN KEY (%(column)s) REFERENCES %(target)s" - create_unique_sql = "ALTER TABLE %(table)s ADD CONSTRAINT %(constraint)s UNIQUE (%(columns)s)" - - - default_schema_name = "dbo" - - has_booleans = False - - - @delete_column_constraints - def delete_column(self, table_name, name): - q_table_name, q_name = (self.quote_name(table_name), self.quote_name(name)) - - # Zap the constraints - for const in self._find_constraints_for_column(table_name,name): - params = {'table_name':q_table_name, 'constraint_name': const} - sql = self.drop_constraint_string % params - self.execute(sql, []) - - # Zap the indexes - for ind in self._find_indexes_for_column(table_name,name): - params = {'table_name':q_table_name, 'index_name': ind} - sql = self.drop_index_string % params - self.execute(sql, []) - - # Zap default if exists - drop_default = self.drop_column_default_sql(table_name, name) - if drop_default: - sql = "ALTER TABLE [%s] %s" % (table_name, drop_default) - self.execute(sql, []) - - # Finally zap the column itself - self.execute(self.delete_column_string % (q_table_name, q_name), []) - - def _find_indexes_for_column(self, table_name, name): - "Find the indexes that apply to a column, needed when deleting" - - sql = """ - SELECT si.name, si.id, sik.colid, sc.name - FROM dbo.sysindexes si WITH (NOLOCK) - INNER JOIN dbo.sysindexkeys sik WITH (NOLOCK) - ON sik.id = si.id - AND sik.indid = si.indid - INNER JOIN dbo.syscolumns sc WITH (NOLOCK) - ON si.id = sc.id - AND sik.colid = sc.colid - WHERE si.indid !=0 - AND si.id = OBJECT_ID('%s') - AND sc.name = '%s' - """ - idx = self.execute(sql % (table_name, name), []) - return [i[0] for i in idx] - - - def _find_constraints_for_column(self, table_name, name, just_names=True): - """ - Find the constraints that apply to a column, needed when deleting. Defaults not included. - This is more general than the parent _constraints_affecting_columns, as on MSSQL this - includes PK and FK constraints. - """ - - sql = """ - SELECT CC.[CONSTRAINT_NAME] - ,TC.[CONSTRAINT_TYPE] - ,CHK.[CHECK_CLAUSE] - ,RFD.TABLE_SCHEMA - ,RFD.TABLE_NAME - ,RFD.COLUMN_NAME - -- used for normalized names - ,CC.TABLE_NAME - ,CC.COLUMN_NAME - FROM [INFORMATION_SCHEMA].[TABLE_CONSTRAINTS] TC - JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE CC - ON TC.CONSTRAINT_CATALOG = CC.CONSTRAINT_CATALOG - AND TC.CONSTRAINT_SCHEMA = CC.CONSTRAINT_SCHEMA - AND TC.CONSTRAINT_NAME = CC.CONSTRAINT_NAME - LEFT JOIN INFORMATION_SCHEMA.CHECK_CONSTRAINTS CHK - ON CHK.CONSTRAINT_CATALOG = CC.CONSTRAINT_CATALOG - AND CHK.CONSTRAINT_SCHEMA = CC.CONSTRAINT_SCHEMA - AND CHK.CONSTRAINT_NAME = CC.CONSTRAINT_NAME - AND 'CHECK' = TC.CONSTRAINT_TYPE - LEFT JOIN INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS REF - ON REF.CONSTRAINT_CATALOG = CC.CONSTRAINT_CATALOG - AND REF.CONSTRAINT_SCHEMA = CC.CONSTRAINT_SCHEMA - AND REF.CONSTRAINT_NAME = CC.CONSTRAINT_NAME - AND 'FOREIGN KEY' = TC.CONSTRAINT_TYPE - LEFT JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE RFD - ON RFD.CONSTRAINT_CATALOG = REF.UNIQUE_CONSTRAINT_CATALOG - AND RFD.CONSTRAINT_SCHEMA = REF.UNIQUE_CONSTRAINT_SCHEMA - AND RFD.CONSTRAINT_NAME = REF.UNIQUE_CONSTRAINT_NAME - WHERE CC.CONSTRAINT_CATALOG = CC.TABLE_CATALOG - AND CC.CONSTRAINT_SCHEMA = CC.TABLE_SCHEMA - AND CC.TABLE_CATALOG = %s - AND CC.TABLE_SCHEMA = %s - AND CC.TABLE_NAME = %s - AND CC.COLUMN_NAME = %s - """ - db_name = self._get_setting('name') - schema_name = self._get_schema_name() - table = self.execute(sql, [db_name, schema_name, table_name, name]) - - if just_names: - return [r[0] for r in table] - - all = {} - for r in table: - cons_name, type = r[:2] - if type=='PRIMARY KEY' or type=='UNIQUE': - cons = all.setdefault(cons_name, (type,[])) - sql = ''' - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE RFD - WHERE RFD.CONSTRAINT_CATALOG = %s - AND RFD.CONSTRAINT_SCHEMA = %s - AND RFD.TABLE_NAME = %s - AND RFD.CONSTRAINT_NAME = %s - ''' - columns = self.execute(sql, [db_name, schema_name, table_name, cons_name]) - cons[1].extend(col for col, in columns) - elif type=='CHECK': - cons = (type, r[2]) - elif type=='FOREIGN KEY': - if cons_name in all: - raise NotImplementedError("Multiple-column foreign keys are not supported") - else: - cons = (type, r[3:6]) - else: - raise NotImplementedError("Don't know how to handle constraints of type "+ type) - all[cons_name] = cons - return all - - @invalidate_table_constraints - def alter_column(self, table_name, name, field, explicit_name=True, ignore_constraints=False): - """ - Alters the given column name so it will match the given field. - Note that conversion between the two by the database must be possible. - Will not automatically add _id by default; to have this behavour, pass - explicit_name=False. - - @param table_name: The name of the table to add the column to - @param name: The name of the column to alter - @param field: The new field definition to use - """ - self._fix_field_definition(field) - - if not ignore_constraints: - qn = self.quote_name - sch = qn(self._get_schema_name()) - tab = qn(table_name) - table = ".".join([sch, tab]) - try: - self.delete_foreign_key(table_name, name) - except ValueError: - # no FK constraint on this field. That's OK. - pass - constraints = self._find_constraints_for_column(table_name, name, False) - for constraint in constraints.keys(): - params = dict(table_name = table, - constraint_name = qn(constraint)) - sql = self.drop_constraint_string % params - self.execute(sql, []) - - ret_val = super(DatabaseOperations, self).alter_column(table_name, name, field, explicit_name, ignore_constraints=True) - - if not ignore_constraints: - for cname, (ctype,args) in constraints.items(): - params = dict(table = table, - constraint = qn(cname)) - if ctype=='UNIQUE': - params['columns'] = ", ".join(map(qn,args)) - sql = self.create_unique_sql % params - elif ctype=='PRIMARY KEY': - params['columns'] = ", ".join(map(qn,args)) - sql = self.create_primary_key_string % params - elif ctype=='FOREIGN KEY': - continue - # Foreign keys taken care of below - #target = "%s.%s(%s)" % tuple(map(qn,args)) - #params.update(column = qn(name), target = target) - #sql = self.create_foreign_key_sql % params - elif ctype=='CHECK': - warn(ConstraintDropped("CHECK "+ args, table_name, name)) - continue - #TODO: Some check constraints should be restored; but not before the generic - # backend restores them. - #params['check'] = args - #sql = self.create_check_constraint_sql % params - else: - raise NotImplementedError("Don't know how to handle constraints of type "+ type) - self.execute(sql, []) - # Create foreign key if necessary - if field.rel and self.supports_foreign_keys: - self.execute( - self.foreign_key_sql( - table_name, - field.column, - field.rel.to._meta.db_table, - field.rel.to._meta.get_field(field.rel.field_name).column - ) - ) - model = self.mock_model("FakeModelForIndexCreation", table_name) - for stmt in self._get_connection().creation.sql_indexes_for_field(model, field, no_style()): - self.execute(stmt) - - - return ret_val - - def _alter_set_defaults(self, field, name, params, sqls): - "Subcommand of alter_column that sets default values (overrideable)" - # Historically, we used to set defaults here. - # But since South 0.8, we don't ever set defaults on alter-column -- we only - # use database-level defaults as scaffolding when adding columns. - # However, we still sometimes need to remove defaults in alter-column. - table_name = self.quote_name(params['table_name']) - drop_default = self.drop_column_default_sql(table_name, name) - if drop_default: - sqls.append((drop_default, [])) - - def _value_to_unquoted_literal(self, field, value): - # Start with the field's own translation - conn = self._get_connection() - value = field.get_db_prep_save(value, connection=conn) - # This is still a Python object -- nobody expects to need a literal. - if isinstance(value, string_types): - return smart_text(value) - elif isinstance(value, (date,time,datetime)): - return value.isoformat() - else: - #TODO: Anybody else needs special translations? - return str(value) - def _default_value_workaround(self, value): - if isinstance(value, (date,time,datetime)): - return value.isoformat() - else: - return super(DatabaseOperations, self)._default_value_workaround(value) - - def _quote_string(self, s): - return "'" + s.replace("'","''") + "'" - - - def drop_column_default_sql(self, table_name, name, q_name=None): - "MSSQL specific drop default, which is a pain" - - sql = """ - SELECT object_name(cdefault) - FROM syscolumns - WHERE id = object_id('%s') - AND name = '%s' - """ - cons = self.execute(sql % (table_name, name), []) - if cons and cons[0] and cons[0][0]: - return "DROP CONSTRAINT %s" % cons[0][0] - return None - - def _fix_field_definition(self, field): - if isinstance(field, (fields.BooleanField, fields.NullBooleanField)): - if field.default == True: - field.default = 1 - if field.default == False: - field.default = 0 - - # This is copied from South's generic add_column, with two modifications: - # 1) The sql-server-specific call to _fix_field_definition - # 2) Removing a default, when needed, by calling drop_default and not the more general alter_column - @invalidate_table_constraints - def add_column(self, table_name, name, field, keep_default=False): - """ - Adds the column 'name' to the table 'table_name'. - Uses the 'field' paramater, a django.db.models.fields.Field instance, - to generate the necessary sql - - @param table_name: The name of the table to add the column to - @param name: The name of the column to add - @param field: The field to use - """ - self._fix_field_definition(field) - sql = self.column_sql(table_name, name, field) - if sql: - params = ( - self.quote_name(table_name), - sql, - ) - sql = self.add_column_string % params - self.execute(sql) - - # Now, drop the default if we need to - if not keep_default and field.default is not None: - field.default = fields.NOT_PROVIDED - #self.alter_column(table_name, name, field, explicit_name=False, ignore_constraints=True) - self.drop_default(table_name, name, field) - - @invalidate_table_constraints - def drop_default(self, table_name, name, field): - fragment = self.drop_column_default_sql(table_name, name) - if fragment: - table_name = self.quote_name(table_name) - sql = " ".join(["ALTER TABLE", table_name, fragment]) - self.execute(sql) - - - @invalidate_table_constraints - def create_table(self, table_name, field_defs): - # Tweak stuff as needed - for _, f in field_defs: - self._fix_field_definition(f) - - # Run - super(DatabaseOperations, self).create_table(table_name, field_defs) - - def _find_referencing_fks(self, table_name): - "MSSQL does not support cascading FKs when dropping tables, we need to implement." - - # FK -- Foreign Keys - # UCTU -- Unique Constraints Table Usage - # FKTU -- Foreign Key Table Usage - # (last two are both really CONSTRAINT_TABLE_USAGE, different join conditions) - sql = """ - SELECT FKTU.TABLE_SCHEMA as REFING_TABLE_SCHEMA, - FKTU.TABLE_NAME as REFING_TABLE_NAME, - FK.[CONSTRAINT_NAME] as FK_NAME - FROM [INFORMATION_SCHEMA].[REFERENTIAL_CONSTRAINTS] FK - JOIN [INFORMATION_SCHEMA].[CONSTRAINT_TABLE_USAGE] UCTU - ON FK.UNIQUE_CONSTRAINT_CATALOG = UCTU.CONSTRAINT_CATALOG and - FK.UNIQUE_CONSTRAINT_NAME = UCTU.CONSTRAINT_NAME and - FK.UNIQUE_CONSTRAINT_SCHEMA = UCTU.CONSTRAINT_SCHEMA - JOIN [INFORMATION_SCHEMA].[CONSTRAINT_TABLE_USAGE] FKTU - ON FK.CONSTRAINT_CATALOG = FKTU.CONSTRAINT_CATALOG and - FK.CONSTRAINT_NAME = FKTU.CONSTRAINT_NAME and - FK.CONSTRAINT_SCHEMA = FKTU.CONSTRAINT_SCHEMA - WHERE FK.CONSTRAINT_CATALOG = %s - AND UCTU.TABLE_SCHEMA = %s -- REFD_TABLE_SCHEMA - AND UCTU.TABLE_NAME = %s -- REFD_TABLE_NAME - """ - db_name = self._get_setting('name') - schema_name = self._get_schema_name() - return self.execute(sql, [db_name, schema_name, table_name]) - - @invalidate_table_constraints - def delete_table(self, table_name, cascade=True): - """ - Deletes the table 'table_name'. - """ - if cascade: - refing = self._find_referencing_fks(table_name) - for schmea, table, constraint in refing: - table = ".".join(map (self.quote_name, [schmea, table])) - params = dict(table_name = table, - constraint_name = self.quote_name(constraint)) - sql = self.drop_constraint_string % params - self.execute(sql, []) - cascade = False - super(DatabaseOperations, self).delete_table(table_name, cascade) - - @copy_column_constraints - @delete_column_constraints - def rename_column(self, table_name, old, new): - """ - Renames the column of 'table_name' from 'old' to 'new'. - WARNING - This isn't transactional on MSSQL! - """ - if old == new: - # No Operation - return - # Examples on the MS site show the table name not being quoted... - params = (table_name, self.quote_name(old), self.quote_name(new)) - self.execute("EXEC sp_rename '%s.%s', %s, 'COLUMN'" % params) - - @invalidate_table_constraints - def rename_table(self, old_table_name, table_name): - """ - Renames the table 'old_table_name' to 'table_name'. - WARNING - This isn't transactional on MSSQL! - """ - if old_table_name == table_name: - # No Operation - return - params = (self.quote_name(old_table_name), self.quote_name(table_name)) - self.execute('EXEC sp_rename %s, %s' % params) - - def _db_type_for_alter_column(self, field): - return self._db_positive_type_for_alter_column(DatabaseOperations, field) - - def _alter_add_column_mods(self, field, name, params, sqls): - return self._alter_add_positive_check(DatabaseOperations, field, name, params, sqls) - - @invalidate_table_constraints - def delete_foreign_key(self, table_name, column): - super(DatabaseOperations, self).delete_foreign_key(table_name, column) - # A FK also implies a non-unique index - find_index_sql = """ - SELECT i.name -- s.name, t.name, c.name - FROM sys.tables t - INNER JOIN sys.schemas s ON t.schema_id = s.schema_id - INNER JOIN sys.indexes i ON i.object_id = t.object_id - INNER JOIN sys.index_columns ic ON ic.object_id = t.object_id - AND ic.index_id = i.index_id - INNER JOIN sys.columns c ON c.object_id = t.object_id - AND ic.column_id = c.column_id - WHERE i.is_unique=0 AND i.is_primary_key=0 AND i.is_unique_constraint=0 - AND s.name = %s - AND t.name = %s - AND c.name = %s - """ - schema = self._get_schema_name() - indexes = self.execute(find_index_sql, [schema, table_name, column]) - qn = self.quote_name - for index in (i[0] for i in indexes if i[0]): # "if i[0]" added because an empty name may return - self.execute("DROP INDEX %s on %s.%s" % (qn(index), qn(schema), qn(table_name) )) - |