summaryrefslogtreecommitdiff
path: root/lib/python2.7/site-packages/django/db/backends/oracle/introspection.py
blob: 3ea3a08572ef958725e3310ca2e9cab0647bfa89 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from django.db.backends import BaseDatabaseIntrospection, FieldInfo
from django.utils.encoding import force_text
import cx_Oracle
import re

foreign_key_re = re.compile(r"\sCONSTRAINT `[^`]*` FOREIGN KEY \(`([^`]*)`\) REFERENCES `([^`]*)` \(`([^`]*)`\)")

class DatabaseIntrospection(BaseDatabaseIntrospection):
    # Maps type objects to Django Field types.
    data_types_reverse = {
        cx_Oracle.BLOB: 'BinaryField',
        cx_Oracle.CLOB: 'TextField',
        cx_Oracle.DATETIME: 'DateField',
        cx_Oracle.FIXED_CHAR: 'CharField',
        cx_Oracle.NCLOB: 'TextField',
        cx_Oracle.NUMBER: 'DecimalField',
        cx_Oracle.STRING: 'CharField',
        cx_Oracle.TIMESTAMP: 'DateTimeField',
    }

    try:
        data_types_reverse[cx_Oracle.NATIVE_FLOAT] = 'FloatField'
    except AttributeError:
        pass

    try:
        data_types_reverse[cx_Oracle.UNICODE] = 'CharField'
    except AttributeError:
        pass

    def get_field_type(self, data_type, description):
        # If it's a NUMBER with scale == 0, consider it an IntegerField
        if data_type == cx_Oracle.NUMBER:
            precision, scale = description[4:6]
            if scale == 0:
                if precision > 11:
                    return 'BigIntegerField'
                elif precision == 1:
                    return 'BooleanField'
                else:
                    return 'IntegerField'
            elif scale == -127:
                return 'FloatField'

        return super(DatabaseIntrospection, self).get_field_type(data_type, description)

    def get_table_list(self, cursor):
        "Returns a list of table names in the current database."
        cursor.execute("SELECT TABLE_NAME FROM USER_TABLES")
        return [row[0].lower() for row in cursor.fetchall()]

    def get_table_description(self, cursor, table_name):
        "Returns a description of the table, with the DB-API cursor.description interface."
        cursor.execute("SELECT * FROM %s WHERE ROWNUM < 2" % self.connection.ops.quote_name(table_name))
        description = []
        for desc in cursor.description:
            name = force_text(desc[0]) # cx_Oracle always returns a 'str' on both Python 2 and 3
            name = name % {} # cx_Oracle, for some reason, doubles percent signs.
            description.append(FieldInfo(*(name.lower(),) + desc[1:]))
        return description

    def table_name_converter(self, name):
        "Table name comparison is case insensitive under Oracle"
        return name.lower()

    def _name_to_index(self, cursor, table_name):
        """
        Returns a dictionary of {field_name: field_index} for the given table.
        Indexes are 0-based.
        """
        return dict([(d[0], i) for i, d in enumerate(self.get_table_description(cursor, table_name))])

    def get_relations(self, cursor, table_name):
        """
        Returns a dictionary of {field_index: (field_index_other_table, other_table)}
        representing all relationships to the given table. Indexes are 0-based.
        """
        table_name = table_name.upper()
        cursor.execute("""
    SELECT ta.column_id - 1, tb.table_name, tb.column_id - 1
    FROM   user_constraints, USER_CONS_COLUMNS ca, USER_CONS_COLUMNS cb,
           user_tab_cols ta, user_tab_cols tb
    WHERE  user_constraints.table_name = %s AND
           ta.table_name = user_constraints.table_name AND
           ta.column_name = ca.column_name AND
           ca.table_name = ta.table_name AND
           user_constraints.constraint_name = ca.constraint_name AND
           user_constraints.r_constraint_name = cb.constraint_name AND
           cb.table_name = tb.table_name AND
           cb.column_name = tb.column_name AND
           ca.position = cb.position""", [table_name])

        relations = {}
        for row in cursor.fetchall():
            relations[row[0]] = (row[2], row[1].lower())
        return relations

    def get_key_columns(self, cursor, table_name):
        cursor.execute("""
            SELECT ccol.column_name, rcol.table_name AS referenced_table, rcol.column_name AS referenced_column
            FROM user_constraints c
            JOIN user_cons_columns ccol
              ON ccol.constraint_name = c.constraint_name 
            JOIN user_cons_columns rcol
              ON rcol.constraint_name = c.r_constraint_name 
            WHERE c.table_name = %s AND c.constraint_type = 'R'""" , [table_name.upper()])
        return [tuple(cell.lower() for cell in row) 
                for row in cursor.fetchall()]

    def get_indexes(self, cursor, table_name):
        sql = """
    SELECT LOWER(uic1.column_name) AS column_name,
           CASE user_constraints.constraint_type
               WHEN 'P' THEN 1 ELSE 0
           END AS is_primary_key,
           CASE user_indexes.uniqueness
               WHEN 'UNIQUE' THEN 1 ELSE 0
           END AS is_unique
    FROM   user_constraints, user_indexes, user_ind_columns uic1
    WHERE  user_constraints.constraint_type (+) = 'P'
      AND  user_constraints.index_name (+) = uic1.index_name
      AND  user_indexes.uniqueness (+) = 'UNIQUE'
      AND  user_indexes.index_name (+) = uic1.index_name
      AND  uic1.table_name = UPPER(%s)
      AND  uic1.column_position = 1
      AND  NOT EXISTS (
              SELECT 1
              FROM   user_ind_columns uic2
              WHERE  uic2.index_name = uic1.index_name
                AND  uic2.column_position = 2
           )
        """
        cursor.execute(sql, [table_name])
        indexes = {}
        for row in cursor.fetchall():
            indexes[row[0]] = {'primary_key': bool(row[1]),
                               'unique': bool(row[2])}
        return indexes