|
1 #-*- coding: ISO-8859-1 -*- |
|
2 # pysqlite2/test/transactions.py: tests transactions |
|
3 # |
|
4 # Copyright (C) 2005 Gerhard Häring <gh@ghaering.de> |
|
5 # |
|
6 # This file is part of pysqlite. |
|
7 # |
|
8 # This software is provided 'as-is', without any express or implied |
|
9 # warranty. In no event will the authors be held liable for any damages |
|
10 # arising from the use of this software. |
|
11 # |
|
12 # Permission is granted to anyone to use this software for any purpose, |
|
13 # including commercial applications, and to alter it and redistribute it |
|
14 # freely, subject to the following restrictions: |
|
15 # |
|
16 # 1. The origin of this software must not be misrepresented; you must not |
|
17 # claim that you wrote the original software. If you use this software |
|
18 # in a product, an acknowledgment in the product documentation would be |
|
19 # appreciated but is not required. |
|
20 # 2. Altered source versions must be plainly marked as such, and must not be |
|
21 # misrepresented as being the original software. |
|
22 # 3. This notice may not be removed or altered from any source distribution. |
|
23 |
|
24 import os, unittest |
|
25 import sqlite3 as sqlite |
|
26 |
|
27 def get_db_path(): |
|
28 return "sqlite_testdb" |
|
29 |
|
30 class TransactionTests(unittest.TestCase): |
|
31 def setUp(self): |
|
32 try: |
|
33 os.remove(get_db_path()) |
|
34 except: |
|
35 pass |
|
36 |
|
37 self.con1 = sqlite.connect(get_db_path(), timeout=0.1) |
|
38 self.cur1 = self.con1.cursor() |
|
39 |
|
40 self.con2 = sqlite.connect(get_db_path(), timeout=0.1) |
|
41 self.cur2 = self.con2.cursor() |
|
42 |
|
43 def tearDown(self): |
|
44 self.cur1.close() |
|
45 self.con1.close() |
|
46 |
|
47 self.cur2.close() |
|
48 self.con2.close() |
|
49 |
|
50 os.unlink(get_db_path()) |
|
51 |
|
52 def CheckDMLdoesAutoCommitBefore(self): |
|
53 self.cur1.execute("create table test(i)") |
|
54 self.cur1.execute("insert into test(i) values (5)") |
|
55 self.cur1.execute("create table test2(j)") |
|
56 self.cur2.execute("select i from test") |
|
57 res = self.cur2.fetchall() |
|
58 self.failUnlessEqual(len(res), 1) |
|
59 |
|
60 def CheckInsertStartsTransaction(self): |
|
61 self.cur1.execute("create table test(i)") |
|
62 self.cur1.execute("insert into test(i) values (5)") |
|
63 self.cur2.execute("select i from test") |
|
64 res = self.cur2.fetchall() |
|
65 self.failUnlessEqual(len(res), 0) |
|
66 |
|
67 def CheckUpdateStartsTransaction(self): |
|
68 self.cur1.execute("create table test(i)") |
|
69 self.cur1.execute("insert into test(i) values (5)") |
|
70 self.con1.commit() |
|
71 self.cur1.execute("update test set i=6") |
|
72 self.cur2.execute("select i from test") |
|
73 res = self.cur2.fetchone()[0] |
|
74 self.failUnlessEqual(res, 5) |
|
75 |
|
76 def CheckDeleteStartsTransaction(self): |
|
77 self.cur1.execute("create table test(i)") |
|
78 self.cur1.execute("insert into test(i) values (5)") |
|
79 self.con1.commit() |
|
80 self.cur1.execute("delete from test") |
|
81 self.cur2.execute("select i from test") |
|
82 res = self.cur2.fetchall() |
|
83 self.failUnlessEqual(len(res), 1) |
|
84 |
|
85 def CheckReplaceStartsTransaction(self): |
|
86 self.cur1.execute("create table test(i)") |
|
87 self.cur1.execute("insert into test(i) values (5)") |
|
88 self.con1.commit() |
|
89 self.cur1.execute("replace into test(i) values (6)") |
|
90 self.cur2.execute("select i from test") |
|
91 res = self.cur2.fetchall() |
|
92 self.failUnlessEqual(len(res), 1) |
|
93 self.failUnlessEqual(res[0][0], 5) |
|
94 |
|
95 def CheckToggleAutoCommit(self): |
|
96 self.cur1.execute("create table test(i)") |
|
97 self.cur1.execute("insert into test(i) values (5)") |
|
98 self.con1.isolation_level = None |
|
99 self.failUnlessEqual(self.con1.isolation_level, None) |
|
100 self.cur2.execute("select i from test") |
|
101 res = self.cur2.fetchall() |
|
102 self.failUnlessEqual(len(res), 1) |
|
103 |
|
104 self.con1.isolation_level = "DEFERRED" |
|
105 self.failUnlessEqual(self.con1.isolation_level , "DEFERRED") |
|
106 self.cur1.execute("insert into test(i) values (5)") |
|
107 self.cur2.execute("select i from test") |
|
108 res = self.cur2.fetchall() |
|
109 self.failUnlessEqual(len(res), 1) |
|
110 |
|
111 def CheckRaiseTimeout(self): |
|
112 self.cur1.execute("create table test(i)") |
|
113 self.cur1.execute("insert into test(i) values (5)") |
|
114 try: |
|
115 self.cur2.execute("insert into test(i) values (5)") |
|
116 self.fail("should have raised an OperationalError") |
|
117 except sqlite.OperationalError: |
|
118 pass |
|
119 except: |
|
120 self.fail("should have raised an OperationalError") |
|
121 |
|
122 class SpecialCommandTests(unittest.TestCase): |
|
123 def setUp(self): |
|
124 self.con = sqlite.connect(":memory:") |
|
125 self.cur = self.con.cursor() |
|
126 |
|
127 def CheckVacuum(self): |
|
128 self.cur.execute("create table test(i)") |
|
129 self.cur.execute("insert into test(i) values (5)") |
|
130 self.cur.execute("vacuum") |
|
131 |
|
132 def CheckDropTable(self): |
|
133 self.cur.execute("create table test(i)") |
|
134 self.cur.execute("insert into test(i) values (5)") |
|
135 self.cur.execute("drop table test") |
|
136 |
|
137 def CheckPragma(self): |
|
138 self.cur.execute("create table test(i)") |
|
139 self.cur.execute("insert into test(i) values (5)") |
|
140 self.cur.execute("pragma count_changes=1") |
|
141 |
|
142 def tearDown(self): |
|
143 self.cur.close() |
|
144 self.con.close() |
|
145 |
|
146 def suite(): |
|
147 default_suite = unittest.makeSuite(TransactionTests, "Check") |
|
148 special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check") |
|
149 return unittest.TestSuite((default_suite, special_command_suite)) |
|
150 |
|
151 def test(): |
|
152 runner = unittest.TextTestRunner() |
|
153 runner.run(suite()) |
|
154 |
|
155 if __name__ == "__main__": |
|
156 test() |