diff options
Diffstat (limited to 'lib/python2.7/site-packages/django/db/backends/creation.py')
-rw-r--r-- | lib/python2.7/site-packages/django/db/backends/creation.py | 489 |
1 files changed, 0 insertions, 489 deletions
diff --git a/lib/python2.7/site-packages/django/db/backends/creation.py b/lib/python2.7/site-packages/django/db/backends/creation.py deleted file mode 100644 index bae439b..0000000 --- a/lib/python2.7/site-packages/django/db/backends/creation.py +++ /dev/null @@ -1,489 +0,0 @@ -import hashlib -import sys -import time -import warnings - -from django.conf import settings -from django.db.utils import load_backend -from django.utils.encoding import force_bytes -from django.utils.six.moves import input - -from .util import truncate_name - -# The prefix to put on the default database name when creating -# the test database. -TEST_DATABASE_PREFIX = 'test_' - - -class BaseDatabaseCreation(object): - """ - This class encapsulates all backend-specific differences that pertain to - database *creation*, such as the column types to use for particular Django - Fields, the SQL used to create and destroy tables, and the creation and - destruction of test databases. - """ - data_types = {} - - def __init__(self, connection): - self.connection = connection - - def _digest(self, *args): - """ - Generates a 32-bit digest of a set of arguments that can be used to - shorten identifying names. - """ - h = hashlib.md5() - for arg in args: - h.update(force_bytes(arg)) - return h.hexdigest()[:8] - - def sql_create_model(self, model, style, known_models=set()): - """ - Returns the SQL required to create a single model, as a tuple of: - (list_of_sql, pending_references_dict) - """ - opts = model._meta - if not opts.managed or opts.proxy or opts.swapped: - return [], {} - final_output = [] - table_output = [] - pending_references = {} - qn = self.connection.ops.quote_name - for f in opts.local_fields: - col_type = f.db_type(connection=self.connection) - tablespace = f.db_tablespace or opts.db_tablespace - if col_type is None: - # Skip ManyToManyFields, because they're not represented as - # database columns in this table. - continue - # Make the definition (e.g. 'foo VARCHAR(30)') for this field. - field_output = [style.SQL_FIELD(qn(f.column)), - style.SQL_COLTYPE(col_type)] - # Oracle treats the empty string ('') as null, so coerce the null - # option whenever '' is a possible value. - null = f.null - if (f.empty_strings_allowed and not f.primary_key and - self.connection.features.interprets_empty_strings_as_nulls): - null = True - if not null: - field_output.append(style.SQL_KEYWORD('NOT NULL')) - if f.primary_key: - field_output.append(style.SQL_KEYWORD('PRIMARY KEY')) - elif f.unique: - field_output.append(style.SQL_KEYWORD('UNIQUE')) - if tablespace and f.unique: - # We must specify the index tablespace inline, because we - # won't be generating a CREATE INDEX statement for this field. - tablespace_sql = self.connection.ops.tablespace_sql( - tablespace, inline=True) - if tablespace_sql: - field_output.append(tablespace_sql) - if f.rel and f.db_constraint: - ref_output, pending = self.sql_for_inline_foreign_key_references( - model, f, known_models, style) - if pending: - pending_references.setdefault(f.rel.to, []).append( - (model, f)) - else: - field_output.extend(ref_output) - table_output.append(' '.join(field_output)) - for field_constraints in opts.unique_together: - table_output.append(style.SQL_KEYWORD('UNIQUE') + ' (%s)' % - ", ".join( - [style.SQL_FIELD(qn(opts.get_field(f).column)) - for f in field_constraints])) - - full_statement = [style.SQL_KEYWORD('CREATE TABLE') + ' ' + - style.SQL_TABLE(qn(opts.db_table)) + ' ('] - for i, line in enumerate(table_output): # Combine and add commas. - full_statement.append( - ' %s%s' % (line, ',' if i < len(table_output) - 1 else '')) - full_statement.append(')') - if opts.db_tablespace: - tablespace_sql = self.connection.ops.tablespace_sql( - opts.db_tablespace) - if tablespace_sql: - full_statement.append(tablespace_sql) - full_statement.append(';') - final_output.append('\n'.join(full_statement)) - - if opts.has_auto_field: - # Add any extra SQL needed to support auto-incrementing primary - # keys. - auto_column = opts.auto_field.db_column or opts.auto_field.name - autoinc_sql = self.connection.ops.autoinc_sql(opts.db_table, - auto_column) - if autoinc_sql: - for stmt in autoinc_sql: - final_output.append(stmt) - - return final_output, pending_references - - def sql_for_inline_foreign_key_references(self, model, field, known_models, style): - """ - Return the SQL snippet defining the foreign key reference for a field. - """ - qn = self.connection.ops.quote_name - rel_to = field.rel.to - if rel_to in known_models or rel_to == model: - output = [style.SQL_KEYWORD('REFERENCES') + ' ' + - style.SQL_TABLE(qn(rel_to._meta.db_table)) + ' (' + - style.SQL_FIELD(qn(rel_to._meta.get_field( - field.rel.field_name).column)) + ')' + - self.connection.ops.deferrable_sql() - ] - pending = False - else: - # We haven't yet created the table to which this field - # is related, so save it for later. - output = [] - pending = True - - return output, pending - - def sql_for_pending_references(self, model, style, pending_references): - """ - Returns any ALTER TABLE statements to add constraints after the fact. - """ - opts = model._meta - if not opts.managed or opts.swapped: - return [] - qn = self.connection.ops.quote_name - final_output = [] - if model in pending_references: - for rel_class, f in pending_references[model]: - rel_opts = rel_class._meta - r_table = rel_opts.db_table - r_col = f.column - table = opts.db_table - col = opts.get_field(f.rel.field_name).column - # For MySQL, r_name must be unique in the first 64 characters. - # So we are careful with character usage here. - r_name = '%s_refs_%s_%s' % ( - r_col, col, self._digest(r_table, table)) - final_output.append(style.SQL_KEYWORD('ALTER TABLE') + - ' %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s)%s;' % - (qn(r_table), qn(truncate_name( - r_name, self.connection.ops.max_name_length())), - qn(r_col), qn(table), qn(col), - self.connection.ops.deferrable_sql())) - del pending_references[model] - return final_output - - def sql_indexes_for_model(self, model, style): - """ - Returns the CREATE INDEX SQL statements for a single model. - """ - if not model._meta.managed or model._meta.proxy or model._meta.swapped: - return [] - output = [] - for f in model._meta.local_fields: - output.extend(self.sql_indexes_for_field(model, f, style)) - for fs in model._meta.index_together: - fields = [model._meta.get_field_by_name(f)[0] for f in fs] - output.extend(self.sql_indexes_for_fields(model, fields, style)) - return output - - def sql_indexes_for_field(self, model, f, style): - """ - Return the CREATE INDEX SQL statements for a single model field. - """ - if f.db_index and not f.unique: - return self.sql_indexes_for_fields(model, [f], style) - else: - return [] - - def sql_indexes_for_fields(self, model, fields, style): - if len(fields) == 1 and fields[0].db_tablespace: - tablespace_sql = self.connection.ops.tablespace_sql(fields[0].db_tablespace) - elif model._meta.db_tablespace: - tablespace_sql = self.connection.ops.tablespace_sql(model._meta.db_tablespace) - else: - tablespace_sql = "" - if tablespace_sql: - tablespace_sql = " " + tablespace_sql - - field_names = [] - qn = self.connection.ops.quote_name - for f in fields: - field_names.append(style.SQL_FIELD(qn(f.column))) - - index_name = "%s_%s" % (model._meta.db_table, self._digest([f.name for f in fields])) - - return [ - style.SQL_KEYWORD("CREATE INDEX") + " " + - style.SQL_TABLE(qn(truncate_name(index_name, self.connection.ops.max_name_length()))) + " " + - style.SQL_KEYWORD("ON") + " " + - style.SQL_TABLE(qn(model._meta.db_table)) + " " + - "(%s)" % style.SQL_FIELD(", ".join(field_names)) + - "%s;" % tablespace_sql, - ] - - def sql_destroy_model(self, model, references_to_delete, style): - """ - Return the DROP TABLE and restraint dropping statements for a single - model. - """ - if not model._meta.managed or model._meta.proxy or model._meta.swapped: - return [] - # Drop the table now - qn = self.connection.ops.quote_name - output = ['%s %s;' % (style.SQL_KEYWORD('DROP TABLE'), - style.SQL_TABLE(qn(model._meta.db_table)))] - if model in references_to_delete: - output.extend(self.sql_remove_table_constraints( - model, references_to_delete, style)) - if model._meta.has_auto_field: - ds = self.connection.ops.drop_sequence_sql(model._meta.db_table) - if ds: - output.append(ds) - return output - - def sql_remove_table_constraints(self, model, references_to_delete, style): - if not model._meta.managed or model._meta.proxy or model._meta.swapped: - return [] - output = [] - qn = self.connection.ops.quote_name - for rel_class, f in references_to_delete[model]: - table = rel_class._meta.db_table - col = f.column - r_table = model._meta.db_table - r_col = model._meta.get_field(f.rel.field_name).column - r_name = '%s_refs_%s_%s' % ( - col, r_col, self._digest(table, r_table)) - output.append('%s %s %s %s;' % \ - (style.SQL_KEYWORD('ALTER TABLE'), - style.SQL_TABLE(qn(table)), - style.SQL_KEYWORD(self.connection.ops.drop_foreignkey_sql()), - style.SQL_FIELD(qn(truncate_name( - r_name, self.connection.ops.max_name_length()))))) - del references_to_delete[model] - return output - - def sql_destroy_indexes_for_model(self, model, style): - """ - Returns the DROP INDEX SQL statements for a single model. - """ - if not model._meta.managed or model._meta.proxy or model._meta.swapped: - return [] - output = [] - for f in model._meta.local_fields: - output.extend(self.sql_destroy_indexes_for_field(model, f, style)) - for fs in model._meta.index_together: - fields = [model._meta.get_field_by_name(f)[0] for f in fs] - output.extend(self.sql_destroy_indexes_for_fields(model, fields, style)) - return output - - def sql_destroy_indexes_for_field(self, model, f, style): - """ - Return the DROP INDEX SQL statements for a single model field. - """ - if f.db_index and not f.unique: - return self.sql_destroy_indexes_for_fields(model, [f], style) - else: - return [] - - def sql_destroy_indexes_for_fields(self, model, fields, style): - if len(fields) == 1 and fields[0].db_tablespace: - tablespace_sql = self.connection.ops.tablespace_sql(fields[0].db_tablespace) - elif model._meta.db_tablespace: - tablespace_sql = self.connection.ops.tablespace_sql(model._meta.db_tablespace) - else: - tablespace_sql = "" - if tablespace_sql: - tablespace_sql = " " + tablespace_sql - - field_names = [] - qn = self.connection.ops.quote_name - for f in fields: - field_names.append(style.SQL_FIELD(qn(f.column))) - - index_name = "%s_%s" % (model._meta.db_table, self._digest([f.name for f in fields])) - - return [ - style.SQL_KEYWORD("DROP INDEX") + " " + - style.SQL_TABLE(qn(truncate_name(index_name, self.connection.ops.max_name_length()))) + " " + - ";", - ] - - def create_test_db(self, verbosity=1, autoclobber=False): - """ - Creates a test database, prompting the user for confirmation if the - database already exists. Returns the name of the test database created. - """ - # Don't import django.core.management if it isn't needed. - from django.core.management import call_command - - test_database_name = self._get_test_db_name() - - if verbosity >= 1: - test_db_repr = '' - if verbosity >= 2: - test_db_repr = " ('%s')" % test_database_name - print("Creating test database for alias '%s'%s..." % ( - self.connection.alias, test_db_repr)) - - self._create_test_db(verbosity, autoclobber) - - self.connection.close() - settings.DATABASES[self.connection.alias]["NAME"] = test_database_name - self.connection.settings_dict["NAME"] = test_database_name - - # Report syncdb messages at one level lower than that requested. - # This ensures we don't get flooded with messages during testing - # (unless you really ask to be flooded) - call_command('syncdb', - verbosity=max(verbosity - 1, 0), - interactive=False, - database=self.connection.alias, - load_initial_data=False) - - # We need to then do a flush to ensure that any data installed by - # custom SQL has been removed. The only test data should come from - # test fixtures, or autogenerated from post_syncdb triggers. - # This has the side effect of loading initial data (which was - # intentionally skipped in the syncdb). - call_command('flush', - verbosity=max(verbosity - 1, 0), - interactive=False, - database=self.connection.alias) - - from django.core.cache import get_cache - from django.core.cache.backends.db import BaseDatabaseCache - for cache_alias in settings.CACHES: - cache = get_cache(cache_alias) - if isinstance(cache, BaseDatabaseCache): - call_command('createcachetable', cache._table, - database=self.connection.alias) - - # Get a cursor (even though we don't need one yet). This has - # the side effect of initializing the test database. - self.connection.cursor() - - return test_database_name - - def _get_test_db_name(self): - """ - Internal implementation - returns the name of the test DB that will be - created. Only useful when called from create_test_db() and - _create_test_db() and when no external munging is done with the 'NAME' - or 'TEST_NAME' settings. - """ - if self.connection.settings_dict['TEST_NAME']: - return self.connection.settings_dict['TEST_NAME'] - return TEST_DATABASE_PREFIX + self.connection.settings_dict['NAME'] - - def _create_test_db(self, verbosity, autoclobber): - """ - Internal implementation - creates the test db tables. - """ - suffix = self.sql_table_creation_suffix() - - test_database_name = self._get_test_db_name() - - qn = self.connection.ops.quote_name - - # Create the test database and connect to it. - cursor = self.connection.cursor() - try: - cursor.execute( - "CREATE DATABASE %s %s" % (qn(test_database_name), suffix)) - except Exception as e: - sys.stderr.write( - "Got an error creating the test database: %s\n" % e) - if not autoclobber: - confirm = input( - "Type 'yes' if you would like to try deleting the test " - "database '%s', or 'no' to cancel: " % test_database_name) - if autoclobber or confirm == 'yes': - try: - if verbosity >= 1: - print("Destroying old test database '%s'..." - % self.connection.alias) - cursor.execute( - "DROP DATABASE %s" % qn(test_database_name)) - cursor.execute( - "CREATE DATABASE %s %s" % (qn(test_database_name), - suffix)) - except Exception as e: - sys.stderr.write( - "Got an error recreating the test database: %s\n" % e) - sys.exit(2) - else: - print("Tests cancelled.") - sys.exit(1) - - return test_database_name - - def destroy_test_db(self, old_database_name, verbosity=1): - """ - Destroy a test database, prompting the user for confirmation if the - database already exists. - """ - self.connection.close() - test_database_name = self.connection.settings_dict['NAME'] - if verbosity >= 1: - test_db_repr = '' - if verbosity >= 2: - test_db_repr = " ('%s')" % test_database_name - print("Destroying test database for alias '%s'%s..." % ( - self.connection.alias, test_db_repr)) - - # Temporarily use a new connection and a copy of the settings dict. - # This prevents the production database from being exposed to potential - # child threads while (or after) the test database is destroyed. - # Refs #10868 and #17786. - settings_dict = self.connection.settings_dict.copy() - settings_dict['NAME'] = old_database_name - backend = load_backend(settings_dict['ENGINE']) - new_connection = backend.DatabaseWrapper( - settings_dict, - alias='__destroy_test_db__', - allow_thread_sharing=False) - new_connection.creation._destroy_test_db(test_database_name, verbosity) - - def _destroy_test_db(self, test_database_name, verbosity): - """ - Internal implementation - remove the test db tables. - """ - # Remove the test database to clean up after - # ourselves. Connect to the previous database (not the test database) - # to do so, because it's not allowed to delete a database while being - # connected to it. - cursor = self.connection.cursor() - # Wait to avoid "database is being accessed by other users" errors. - time.sleep(1) - cursor.execute("DROP DATABASE %s" - % self.connection.ops.quote_name(test_database_name)) - self.connection.close() - - def set_autocommit(self): - """ - Make sure a connection is in autocommit mode. - Deprecated, not used - anymore by Django code. Kept for compatibility with user code that - might use it. - """ - warnings.warn( - "set_autocommit was moved from BaseDatabaseCreation to " - "BaseDatabaseWrapper.", PendingDeprecationWarning, stacklevel=2) - return self.connection.set_autocommit(True) - - def sql_table_creation_suffix(self): - """ - SQL to append to the end of the test table creation statements. - """ - return '' - - def test_db_signature(self): - """ - Returns a tuple with elements of self.connection.settings_dict (a - DATABASES setting value) that uniquely identify a database - accordingly to the RDBMS particularities. - """ - settings_dict = self.connection.settings_dict - return ( - settings_dict['HOST'], - settings_dict['PORT'], - settings_dict['ENGINE'], - settings_dict['NAME'] - ) |