|
1 #-*- coding: ISO-8859-1 -*- |
|
2 # pysqlite2/test/transactions.py: tests transactions |
|
3 # |
|
4 # Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> |
|
5 # |
|
6 # This file is part of pysqlite. |
|
7 # |
|
8 # This software is provided 'as-is', without any express or implied |
|
9 # warranty. In no event will the authors be held liable for any damages |
|
10 # arising from the use of this software. |
|
11 # |
|
12 # Permission is granted to anyone to use this software for any purpose, |
|
13 # including commercial applications, and to alter it and redistribute it |
|
14 # freely, subject to the following restrictions: |
|
15 # |
|
16 # 1. The origin of this software must not be misrepresented; you must not |
|
17 # claim that you wrote the original software. If you use this software |
|
18 # in a product, an acknowledgment in the product documentation would be |
|
19 # appreciated but is not required. |
|
20 # 2. Altered source versions must be plainly marked as such, and must not be |
|
21 # misrepresented as being the original software. |
|
22 # 3. This notice may not be removed or altered from any source distribution. |
|
23 |
|
24 import sys |
|
25 import os, unittest |
|
26 import sqlite3 as sqlite |
|
27 |
|
28 def get_db_path(): |
|
29 return "sqlite_testdb" |
|
30 |
|
31 class TransactionTests(unittest.TestCase): |
|
32 def setUp(self): |
|
33 try: |
|
34 os.remove(get_db_path()) |
|
35 except OSError: |
|
36 pass |
|
37 |
|
38 self.con1 = sqlite.connect(get_db_path(), timeout=0.1) |
|
39 self.cur1 = self.con1.cursor() |
|
40 |
|
41 self.con2 = sqlite.connect(get_db_path(), timeout=0.1) |
|
42 self.cur2 = self.con2.cursor() |
|
43 |
|
44 def tearDown(self): |
|
45 self.cur1.close() |
|
46 self.con1.close() |
|
47 |
|
48 self.cur2.close() |
|
49 self.con2.close() |
|
50 |
|
51 try: |
|
52 os.unlink(get_db_path()) |
|
53 except OSError: |
|
54 pass |
|
55 |
|
56 def CheckDMLdoesAutoCommitBefore(self): |
|
57 self.cur1.execute("create table test(i)") |
|
58 self.cur1.execute("insert into test(i) values (5)") |
|
59 self.cur1.execute("create table test2(j)") |
|
60 self.cur2.execute("select i from test") |
|
61 res = self.cur2.fetchall() |
|
62 self.failUnlessEqual(len(res), 1) |
|
63 |
|
64 def CheckInsertStartsTransaction(self): |
|
65 self.cur1.execute("create table test(i)") |
|
66 self.cur1.execute("insert into test(i) values (5)") |
|
67 self.cur2.execute("select i from test") |
|
68 res = self.cur2.fetchall() |
|
69 self.failUnlessEqual(len(res), 0) |
|
70 |
|
71 def CheckUpdateStartsTransaction(self): |
|
72 self.cur1.execute("create table test(i)") |
|
73 self.cur1.execute("insert into test(i) values (5)") |
|
74 self.con1.commit() |
|
75 self.cur1.execute("update test set i=6") |
|
76 self.cur2.execute("select i from test") |
|
77 res = self.cur2.fetchone()[0] |
|
78 self.failUnlessEqual(res, 5) |
|
79 |
|
80 def CheckDeleteStartsTransaction(self): |
|
81 self.cur1.execute("create table test(i)") |
|
82 self.cur1.execute("insert into test(i) values (5)") |
|
83 self.con1.commit() |
|
84 self.cur1.execute("delete from test") |
|
85 self.cur2.execute("select i from test") |
|
86 res = self.cur2.fetchall() |
|
87 self.failUnlessEqual(len(res), 1) |
|
88 |
|
89 def CheckReplaceStartsTransaction(self): |
|
90 self.cur1.execute("create table test(i)") |
|
91 self.cur1.execute("insert into test(i) values (5)") |
|
92 self.con1.commit() |
|
93 self.cur1.execute("replace into test(i) values (6)") |
|
94 self.cur2.execute("select i from test") |
|
95 res = self.cur2.fetchall() |
|
96 self.failUnlessEqual(len(res), 1) |
|
97 self.failUnlessEqual(res[0][0], 5) |
|
98 |
|
99 def CheckToggleAutoCommit(self): |
|
100 self.cur1.execute("create table test(i)") |
|
101 self.cur1.execute("insert into test(i) values (5)") |
|
102 self.con1.isolation_level = None |
|
103 self.failUnlessEqual(self.con1.isolation_level, None) |
|
104 self.cur2.execute("select i from test") |
|
105 res = self.cur2.fetchall() |
|
106 self.failUnlessEqual(len(res), 1) |
|
107 |
|
108 self.con1.isolation_level = "DEFERRED" |
|
109 self.failUnlessEqual(self.con1.isolation_level , "DEFERRED") |
|
110 self.cur1.execute("insert into test(i) values (5)") |
|
111 self.cur2.execute("select i from test") |
|
112 res = self.cur2.fetchall() |
|
113 self.failUnlessEqual(len(res), 1) |
|
114 |
|
115 def CheckRaiseTimeout(self): |
|
116 if sqlite.sqlite_version_info < (3, 2, 2): |
|
117 # This will fail (hang) on earlier versions of sqlite. |
|
118 # Determine exact version it was fixed. 3.2.1 hangs. |
|
119 return |
|
120 self.cur1.execute("create table test(i)") |
|
121 self.cur1.execute("insert into test(i) values (5)") |
|
122 try: |
|
123 self.cur2.execute("insert into test(i) values (5)") |
|
124 self.fail("should have raised an OperationalError") |
|
125 except sqlite.OperationalError: |
|
126 pass |
|
127 except: |
|
128 self.fail("should have raised an OperationalError") |
|
129 |
|
130 def CheckLocking(self): |
|
131 """ |
|
132 This tests the improved concurrency with pysqlite 2.3.4. You needed |
|
133 to roll back con2 before you could commit con1. |
|
134 """ |
|
135 if sqlite.sqlite_version_info < (3, 2, 2): |
|
136 # This will fail (hang) on earlier versions of sqlite. |
|
137 # Determine exact version it was fixed. 3.2.1 hangs. |
|
138 return |
|
139 self.cur1.execute("create table test(i)") |
|
140 self.cur1.execute("insert into test(i) values (5)") |
|
141 try: |
|
142 self.cur2.execute("insert into test(i) values (5)") |
|
143 self.fail("should have raised an OperationalError") |
|
144 except sqlite.OperationalError: |
|
145 pass |
|
146 except: |
|
147 self.fail("should have raised an OperationalError") |
|
148 # NO self.con2.rollback() HERE!!! |
|
149 self.con1.commit() |
|
150 |
|
151 class SpecialCommandTests(unittest.TestCase): |
|
152 def setUp(self): |
|
153 self.con = sqlite.connect(":memory:") |
|
154 self.cur = self.con.cursor() |
|
155 |
|
156 def CheckVacuum(self): |
|
157 self.cur.execute("create table test(i)") |
|
158 self.cur.execute("insert into test(i) values (5)") |
|
159 self.cur.execute("vacuum") |
|
160 |
|
161 def CheckDropTable(self): |
|
162 self.cur.execute("create table test(i)") |
|
163 self.cur.execute("insert into test(i) values (5)") |
|
164 self.cur.execute("drop table test") |
|
165 |
|
166 def CheckPragma(self): |
|
167 self.cur.execute("create table test(i)") |
|
168 self.cur.execute("insert into test(i) values (5)") |
|
169 self.cur.execute("pragma count_changes=1") |
|
170 |
|
171 def tearDown(self): |
|
172 self.cur.close() |
|
173 self.con.close() |
|
174 |
|
175 def suite(): |
|
176 default_suite = unittest.makeSuite(TransactionTests, "Check") |
|
177 special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check") |
|
178 return unittest.TestSuite((default_suite, special_command_suite)) |
|
179 |
|
180 def test(): |
|
181 runner = unittest.TextTestRunner() |
|
182 runner.run(suite()) |
|
183 |
|
184 if __name__ == "__main__": |
|
185 test() |