|
1 #-*- coding: ISO-8859-1 -*- |
|
2 # pysqlite2/test/dbapi.py: tests for DB-API compliance |
|
3 # |
|
4 # Copyright (C) 2004-2007 Gerhard Häring <gh@ghaering.de> |
|
5 # |
|
6 # This file is part of pysqlite. |
|
7 # |
|
8 # This software is provided 'as-is', without any express or implied |
|
9 # warranty. In no event will the authors be held liable for any damages |
|
10 # arising from the use of this software. |
|
11 # |
|
12 # Permission is granted to anyone to use this software for any purpose, |
|
13 # including commercial applications, and to alter it and redistribute it |
|
14 # freely, subject to the following restrictions: |
|
15 # |
|
16 # 1. The origin of this software must not be misrepresented; you must not |
|
17 # claim that you wrote the original software. If you use this software |
|
18 # in a product, an acknowledgment in the product documentation would be |
|
19 # appreciated but is not required. |
|
20 # 2. Altered source versions must be plainly marked as such, and must not be |
|
21 # misrepresented as being the original software. |
|
22 # 3. This notice may not be removed or altered from any source distribution. |
|
23 |
|
24 import unittest |
|
25 import sys |
|
26 import threading |
|
27 import sqlite3 as sqlite |
|
28 |
|
29 class ModuleTests(unittest.TestCase): |
|
30 def CheckAPILevel(self): |
|
31 self.assertEqual(sqlite.apilevel, "2.0", |
|
32 "apilevel is %s, should be 2.0" % sqlite.apilevel) |
|
33 |
|
34 def CheckThreadSafety(self): |
|
35 self.assertEqual(sqlite.threadsafety, 1, |
|
36 "threadsafety is %d, should be 1" % sqlite.threadsafety) |
|
37 |
|
38 def CheckParamStyle(self): |
|
39 self.assertEqual(sqlite.paramstyle, "qmark", |
|
40 "paramstyle is '%s', should be 'qmark'" % |
|
41 sqlite.paramstyle) |
|
42 |
|
43 def CheckWarning(self): |
|
44 self.assert_(issubclass(sqlite.Warning, StandardError), |
|
45 "Warning is not a subclass of StandardError") |
|
46 |
|
47 def CheckError(self): |
|
48 self.failUnless(issubclass(sqlite.Error, StandardError), |
|
49 "Error is not a subclass of StandardError") |
|
50 |
|
51 def CheckInterfaceError(self): |
|
52 self.failUnless(issubclass(sqlite.InterfaceError, sqlite.Error), |
|
53 "InterfaceError is not a subclass of Error") |
|
54 |
|
55 def CheckDatabaseError(self): |
|
56 self.failUnless(issubclass(sqlite.DatabaseError, sqlite.Error), |
|
57 "DatabaseError is not a subclass of Error") |
|
58 |
|
59 def CheckDataError(self): |
|
60 self.failUnless(issubclass(sqlite.DataError, sqlite.DatabaseError), |
|
61 "DataError is not a subclass of DatabaseError") |
|
62 |
|
63 def CheckOperationalError(self): |
|
64 self.failUnless(issubclass(sqlite.OperationalError, sqlite.DatabaseError), |
|
65 "OperationalError is not a subclass of DatabaseError") |
|
66 |
|
67 def CheckIntegrityError(self): |
|
68 self.failUnless(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), |
|
69 "IntegrityError is not a subclass of DatabaseError") |
|
70 |
|
71 def CheckInternalError(self): |
|
72 self.failUnless(issubclass(sqlite.InternalError, sqlite.DatabaseError), |
|
73 "InternalError is not a subclass of DatabaseError") |
|
74 |
|
75 def CheckProgrammingError(self): |
|
76 self.failUnless(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), |
|
77 "ProgrammingError is not a subclass of DatabaseError") |
|
78 |
|
79 def CheckNotSupportedError(self): |
|
80 self.failUnless(issubclass(sqlite.NotSupportedError, |
|
81 sqlite.DatabaseError), |
|
82 "NotSupportedError is not a subclass of DatabaseError") |
|
83 |
|
84 class ConnectionTests(unittest.TestCase): |
|
85 def setUp(self): |
|
86 self.cx = sqlite.connect(":memory:") |
|
87 cu = self.cx.cursor() |
|
88 cu.execute("create table test(id integer primary key, name text)") |
|
89 cu.execute("insert into test(name) values (?)", ("foo",)) |
|
90 |
|
91 def tearDown(self): |
|
92 self.cx.close() |
|
93 |
|
94 def CheckCommit(self): |
|
95 self.cx.commit() |
|
96 |
|
97 def CheckCommitAfterNoChanges(self): |
|
98 """ |
|
99 A commit should also work when no changes were made to the database. |
|
100 """ |
|
101 self.cx.commit() |
|
102 self.cx.commit() |
|
103 |
|
104 def CheckRollback(self): |
|
105 self.cx.rollback() |
|
106 |
|
107 def CheckRollbackAfterNoChanges(self): |
|
108 """ |
|
109 A rollback should also work when no changes were made to the database. |
|
110 """ |
|
111 self.cx.rollback() |
|
112 self.cx.rollback() |
|
113 |
|
114 def CheckCursor(self): |
|
115 cu = self.cx.cursor() |
|
116 |
|
117 def CheckFailedOpen(self): |
|
118 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" |
|
119 try: |
|
120 con = sqlite.connect(YOU_CANNOT_OPEN_THIS) |
|
121 except sqlite.OperationalError: |
|
122 return |
|
123 self.fail("should have raised an OperationalError") |
|
124 |
|
125 def CheckClose(self): |
|
126 self.cx.close() |
|
127 |
|
128 def CheckExceptions(self): |
|
129 # Optional DB-API extension. |
|
130 self.failUnlessEqual(self.cx.Warning, sqlite.Warning) |
|
131 self.failUnlessEqual(self.cx.Error, sqlite.Error) |
|
132 self.failUnlessEqual(self.cx.InterfaceError, sqlite.InterfaceError) |
|
133 self.failUnlessEqual(self.cx.DatabaseError, sqlite.DatabaseError) |
|
134 self.failUnlessEqual(self.cx.DataError, sqlite.DataError) |
|
135 self.failUnlessEqual(self.cx.OperationalError, sqlite.OperationalError) |
|
136 self.failUnlessEqual(self.cx.IntegrityError, sqlite.IntegrityError) |
|
137 self.failUnlessEqual(self.cx.InternalError, sqlite.InternalError) |
|
138 self.failUnlessEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) |
|
139 self.failUnlessEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) |
|
140 |
|
141 class CursorTests(unittest.TestCase): |
|
142 def setUp(self): |
|
143 self.cx = sqlite.connect(":memory:") |
|
144 self.cu = self.cx.cursor() |
|
145 self.cu.execute("create table test(id integer primary key, name text, income number)") |
|
146 self.cu.execute("insert into test(name) values (?)", ("foo",)) |
|
147 |
|
148 def tearDown(self): |
|
149 self.cu.close() |
|
150 self.cx.close() |
|
151 |
|
152 def CheckExecuteNoArgs(self): |
|
153 self.cu.execute("delete from test") |
|
154 |
|
155 def CheckExecuteIllegalSql(self): |
|
156 try: |
|
157 self.cu.execute("select asdf") |
|
158 self.fail("should have raised an OperationalError") |
|
159 except sqlite.OperationalError: |
|
160 return |
|
161 except: |
|
162 self.fail("raised wrong exception") |
|
163 |
|
164 def CheckExecuteTooMuchSql(self): |
|
165 try: |
|
166 self.cu.execute("select 5+4; select 4+5") |
|
167 self.fail("should have raised a Warning") |
|
168 except sqlite.Warning: |
|
169 return |
|
170 except: |
|
171 self.fail("raised wrong exception") |
|
172 |
|
173 def CheckExecuteTooMuchSql2(self): |
|
174 self.cu.execute("select 5+4; -- foo bar") |
|
175 |
|
176 def CheckExecuteTooMuchSql3(self): |
|
177 self.cu.execute(""" |
|
178 select 5+4; |
|
179 |
|
180 /* |
|
181 foo |
|
182 */ |
|
183 """) |
|
184 |
|
185 def CheckExecuteWrongSqlArg(self): |
|
186 try: |
|
187 self.cu.execute(42) |
|
188 self.fail("should have raised a ValueError") |
|
189 except ValueError: |
|
190 return |
|
191 except: |
|
192 self.fail("raised wrong exception.") |
|
193 |
|
194 def CheckExecuteArgInt(self): |
|
195 self.cu.execute("insert into test(id) values (?)", (42,)) |
|
196 |
|
197 def CheckExecuteArgFloat(self): |
|
198 self.cu.execute("insert into test(income) values (?)", (2500.32,)) |
|
199 |
|
200 def CheckExecuteArgString(self): |
|
201 self.cu.execute("insert into test(name) values (?)", ("Hugo",)) |
|
202 |
|
203 def CheckExecuteWrongNoOfArgs1(self): |
|
204 # too many parameters |
|
205 try: |
|
206 self.cu.execute("insert into test(id) values (?)", (17, "Egon")) |
|
207 self.fail("should have raised ProgrammingError") |
|
208 except sqlite.ProgrammingError: |
|
209 pass |
|
210 |
|
211 def CheckExecuteWrongNoOfArgs2(self): |
|
212 # too little parameters |
|
213 try: |
|
214 self.cu.execute("insert into test(id) values (?)") |
|
215 self.fail("should have raised ProgrammingError") |
|
216 except sqlite.ProgrammingError: |
|
217 pass |
|
218 |
|
219 def CheckExecuteWrongNoOfArgs3(self): |
|
220 # no parameters, parameters are needed |
|
221 try: |
|
222 self.cu.execute("insert into test(id) values (?)") |
|
223 self.fail("should have raised ProgrammingError") |
|
224 except sqlite.ProgrammingError: |
|
225 pass |
|
226 |
|
227 def CheckExecuteParamList(self): |
|
228 self.cu.execute("insert into test(name) values ('foo')") |
|
229 self.cu.execute("select name from test where name=?", ["foo"]) |
|
230 row = self.cu.fetchone() |
|
231 self.failUnlessEqual(row[0], "foo") |
|
232 |
|
233 def CheckExecuteParamSequence(self): |
|
234 class L(object): |
|
235 def __len__(self): |
|
236 return 1 |
|
237 def __getitem__(self, x): |
|
238 assert x == 0 |
|
239 return "foo" |
|
240 |
|
241 self.cu.execute("insert into test(name) values ('foo')") |
|
242 self.cu.execute("select name from test where name=?", L()) |
|
243 row = self.cu.fetchone() |
|
244 self.failUnlessEqual(row[0], "foo") |
|
245 |
|
246 def CheckExecuteDictMapping(self): |
|
247 self.cu.execute("insert into test(name) values ('foo')") |
|
248 self.cu.execute("select name from test where name=:name", {"name": "foo"}) |
|
249 row = self.cu.fetchone() |
|
250 self.failUnlessEqual(row[0], "foo") |
|
251 |
|
252 def CheckExecuteDictMapping_Mapping(self): |
|
253 # Test only works with Python 2.5 or later |
|
254 if sys.version_info < (2, 5, 0): |
|
255 return |
|
256 |
|
257 class D(dict): |
|
258 def __missing__(self, key): |
|
259 return "foo" |
|
260 |
|
261 self.cu.execute("insert into test(name) values ('foo')") |
|
262 self.cu.execute("select name from test where name=:name", D()) |
|
263 row = self.cu.fetchone() |
|
264 self.failUnlessEqual(row[0], "foo") |
|
265 |
|
266 def CheckExecuteDictMappingTooLittleArgs(self): |
|
267 self.cu.execute("insert into test(name) values ('foo')") |
|
268 try: |
|
269 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) |
|
270 self.fail("should have raised ProgrammingError") |
|
271 except sqlite.ProgrammingError: |
|
272 pass |
|
273 |
|
274 def CheckExecuteDictMappingNoArgs(self): |
|
275 self.cu.execute("insert into test(name) values ('foo')") |
|
276 try: |
|
277 self.cu.execute("select name from test where name=:name") |
|
278 self.fail("should have raised ProgrammingError") |
|
279 except sqlite.ProgrammingError: |
|
280 pass |
|
281 |
|
282 def CheckExecuteDictMappingUnnamed(self): |
|
283 self.cu.execute("insert into test(name) values ('foo')") |
|
284 try: |
|
285 self.cu.execute("select name from test where name=?", {"name": "foo"}) |
|
286 self.fail("should have raised ProgrammingError") |
|
287 except sqlite.ProgrammingError: |
|
288 pass |
|
289 |
|
290 def CheckClose(self): |
|
291 self.cu.close() |
|
292 |
|
293 def CheckRowcountExecute(self): |
|
294 self.cu.execute("delete from test") |
|
295 self.cu.execute("insert into test(name) values ('foo')") |
|
296 self.cu.execute("insert into test(name) values ('foo')") |
|
297 self.cu.execute("update test set name='bar'") |
|
298 self.failUnlessEqual(self.cu.rowcount, 2) |
|
299 |
|
300 def CheckRowcountSelect(self): |
|
301 """ |
|
302 pysqlite does not know the rowcount of SELECT statements, because we |
|
303 don't fetch all rows after executing the select statement. The rowcount |
|
304 has thus to be -1. |
|
305 """ |
|
306 self.cu.execute("select 5 union select 6") |
|
307 self.failUnlessEqual(self.cu.rowcount, -1) |
|
308 |
|
309 def CheckRowcountExecutemany(self): |
|
310 self.cu.execute("delete from test") |
|
311 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) |
|
312 self.failUnlessEqual(self.cu.rowcount, 3) |
|
313 |
|
314 def CheckTotalChanges(self): |
|
315 self.cu.execute("insert into test(name) values ('foo')") |
|
316 self.cu.execute("insert into test(name) values ('foo')") |
|
317 if self.cx.total_changes < 2: |
|
318 self.fail("total changes reported wrong value") |
|
319 |
|
320 # Checks for executemany: |
|
321 # Sequences are required by the DB-API, iterators |
|
322 # enhancements in pysqlite. |
|
323 |
|
324 def CheckExecuteManySequence(self): |
|
325 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) |
|
326 |
|
327 def CheckExecuteManyIterator(self): |
|
328 class MyIter: |
|
329 def __init__(self): |
|
330 self.value = 5 |
|
331 |
|
332 def next(self): |
|
333 if self.value == 10: |
|
334 raise StopIteration |
|
335 else: |
|
336 self.value += 1 |
|
337 return (self.value,) |
|
338 |
|
339 self.cu.executemany("insert into test(income) values (?)", MyIter()) |
|
340 |
|
341 def CheckExecuteManyGenerator(self): |
|
342 def mygen(): |
|
343 for i in range(5): |
|
344 yield (i,) |
|
345 |
|
346 self.cu.executemany("insert into test(income) values (?)", mygen()) |
|
347 |
|
348 def CheckExecuteManyWrongSqlArg(self): |
|
349 try: |
|
350 self.cu.executemany(42, [(3,)]) |
|
351 self.fail("should have raised a ValueError") |
|
352 except ValueError: |
|
353 return |
|
354 except: |
|
355 self.fail("raised wrong exception.") |
|
356 |
|
357 def CheckExecuteManySelect(self): |
|
358 try: |
|
359 self.cu.executemany("select ?", [(3,)]) |
|
360 self.fail("should have raised a ProgrammingError") |
|
361 except sqlite.ProgrammingError: |
|
362 return |
|
363 except: |
|
364 self.fail("raised wrong exception.") |
|
365 |
|
366 def CheckExecuteManyNotIterable(self): |
|
367 try: |
|
368 self.cu.executemany("insert into test(income) values (?)", 42) |
|
369 self.fail("should have raised a TypeError") |
|
370 except TypeError: |
|
371 return |
|
372 except Exception, e: |
|
373 print "raised", e.__class__ |
|
374 self.fail("raised wrong exception.") |
|
375 |
|
376 def CheckFetchIter(self): |
|
377 # Optional DB-API extension. |
|
378 self.cu.execute("delete from test") |
|
379 self.cu.execute("insert into test(id) values (?)", (5,)) |
|
380 self.cu.execute("insert into test(id) values (?)", (6,)) |
|
381 self.cu.execute("select id from test order by id") |
|
382 lst = [] |
|
383 for row in self.cu: |
|
384 lst.append(row[0]) |
|
385 self.failUnlessEqual(lst[0], 5) |
|
386 self.failUnlessEqual(lst[1], 6) |
|
387 |
|
388 def CheckFetchone(self): |
|
389 self.cu.execute("select name from test") |
|
390 row = self.cu.fetchone() |
|
391 self.failUnlessEqual(row[0], "foo") |
|
392 row = self.cu.fetchone() |
|
393 self.failUnlessEqual(row, None) |
|
394 |
|
395 def CheckFetchoneNoStatement(self): |
|
396 cur = self.cx.cursor() |
|
397 row = cur.fetchone() |
|
398 self.failUnlessEqual(row, None) |
|
399 |
|
400 def CheckArraySize(self): |
|
401 # must default ot 1 |
|
402 self.failUnlessEqual(self.cu.arraysize, 1) |
|
403 |
|
404 # now set to 2 |
|
405 self.cu.arraysize = 2 |
|
406 |
|
407 # now make the query return 3 rows |
|
408 self.cu.execute("delete from test") |
|
409 self.cu.execute("insert into test(name) values ('A')") |
|
410 self.cu.execute("insert into test(name) values ('B')") |
|
411 self.cu.execute("insert into test(name) values ('C')") |
|
412 self.cu.execute("select name from test") |
|
413 res = self.cu.fetchmany() |
|
414 |
|
415 self.failUnlessEqual(len(res), 2) |
|
416 |
|
417 def CheckFetchmany(self): |
|
418 self.cu.execute("select name from test") |
|
419 res = self.cu.fetchmany(100) |
|
420 self.failUnlessEqual(len(res), 1) |
|
421 res = self.cu.fetchmany(100) |
|
422 self.failUnlessEqual(res, []) |
|
423 |
|
424 def CheckFetchmanyKwArg(self): |
|
425 """Checks if fetchmany works with keyword arguments""" |
|
426 self.cu.execute("select name from test") |
|
427 res = self.cu.fetchmany(size=100) |
|
428 self.failUnlessEqual(len(res), 1) |
|
429 |
|
430 def CheckFetchall(self): |
|
431 self.cu.execute("select name from test") |
|
432 res = self.cu.fetchall() |
|
433 self.failUnlessEqual(len(res), 1) |
|
434 res = self.cu.fetchall() |
|
435 self.failUnlessEqual(res, []) |
|
436 |
|
437 def CheckSetinputsizes(self): |
|
438 self.cu.setinputsizes([3, 4, 5]) |
|
439 |
|
440 def CheckSetoutputsize(self): |
|
441 self.cu.setoutputsize(5, 0) |
|
442 |
|
443 def CheckSetoutputsizeNoColumn(self): |
|
444 self.cu.setoutputsize(42) |
|
445 |
|
446 def CheckCursorConnection(self): |
|
447 # Optional DB-API extension. |
|
448 self.failUnlessEqual(self.cu.connection, self.cx) |
|
449 |
|
450 def CheckWrongCursorCallable(self): |
|
451 try: |
|
452 def f(): pass |
|
453 cur = self.cx.cursor(f) |
|
454 self.fail("should have raised a TypeError") |
|
455 except TypeError: |
|
456 return |
|
457 self.fail("should have raised a ValueError") |
|
458 |
|
459 def CheckCursorWrongClass(self): |
|
460 class Foo: pass |
|
461 foo = Foo() |
|
462 try: |
|
463 cur = sqlite.Cursor(foo) |
|
464 self.fail("should have raised a ValueError") |
|
465 except TypeError: |
|
466 pass |
|
467 |
|
468 class ThreadTests(unittest.TestCase): |
|
469 def setUp(self): |
|
470 self.con = sqlite.connect(":memory:") |
|
471 self.cur = self.con.cursor() |
|
472 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)") |
|
473 |
|
474 def tearDown(self): |
|
475 self.cur.close() |
|
476 self.con.close() |
|
477 |
|
478 def CheckConCursor(self): |
|
479 def run(con, errors): |
|
480 try: |
|
481 cur = con.cursor() |
|
482 errors.append("did not raise ProgrammingError") |
|
483 return |
|
484 except sqlite.ProgrammingError: |
|
485 return |
|
486 except: |
|
487 errors.append("raised wrong exception") |
|
488 |
|
489 errors = [] |
|
490 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) |
|
491 t.start() |
|
492 t.join() |
|
493 if len(errors) > 0: |
|
494 self.fail("\n".join(errors)) |
|
495 |
|
496 def CheckConCommit(self): |
|
497 def run(con, errors): |
|
498 try: |
|
499 con.commit() |
|
500 errors.append("did not raise ProgrammingError") |
|
501 return |
|
502 except sqlite.ProgrammingError: |
|
503 return |
|
504 except: |
|
505 errors.append("raised wrong exception") |
|
506 |
|
507 errors = [] |
|
508 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) |
|
509 t.start() |
|
510 t.join() |
|
511 if len(errors) > 0: |
|
512 self.fail("\n".join(errors)) |
|
513 |
|
514 def CheckConRollback(self): |
|
515 def run(con, errors): |
|
516 try: |
|
517 con.rollback() |
|
518 errors.append("did not raise ProgrammingError") |
|
519 return |
|
520 except sqlite.ProgrammingError: |
|
521 return |
|
522 except: |
|
523 errors.append("raised wrong exception") |
|
524 |
|
525 errors = [] |
|
526 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) |
|
527 t.start() |
|
528 t.join() |
|
529 if len(errors) > 0: |
|
530 self.fail("\n".join(errors)) |
|
531 |
|
532 def CheckConClose(self): |
|
533 def run(con, errors): |
|
534 try: |
|
535 con.close() |
|
536 errors.append("did not raise ProgrammingError") |
|
537 return |
|
538 except sqlite.ProgrammingError: |
|
539 return |
|
540 except: |
|
541 errors.append("raised wrong exception") |
|
542 |
|
543 errors = [] |
|
544 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) |
|
545 t.start() |
|
546 t.join() |
|
547 if len(errors) > 0: |
|
548 self.fail("\n".join(errors)) |
|
549 |
|
550 def CheckCurImplicitBegin(self): |
|
551 def run(cur, errors): |
|
552 try: |
|
553 cur.execute("insert into test(name) values ('a')") |
|
554 errors.append("did not raise ProgrammingError") |
|
555 return |
|
556 except sqlite.ProgrammingError: |
|
557 return |
|
558 except: |
|
559 errors.append("raised wrong exception") |
|
560 |
|
561 errors = [] |
|
562 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) |
|
563 t.start() |
|
564 t.join() |
|
565 if len(errors) > 0: |
|
566 self.fail("\n".join(errors)) |
|
567 |
|
568 def CheckCurClose(self): |
|
569 def run(cur, errors): |
|
570 try: |
|
571 cur.close() |
|
572 errors.append("did not raise ProgrammingError") |
|
573 return |
|
574 except sqlite.ProgrammingError: |
|
575 return |
|
576 except: |
|
577 errors.append("raised wrong exception") |
|
578 |
|
579 errors = [] |
|
580 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) |
|
581 t.start() |
|
582 t.join() |
|
583 if len(errors) > 0: |
|
584 self.fail("\n".join(errors)) |
|
585 |
|
586 def CheckCurExecute(self): |
|
587 def run(cur, errors): |
|
588 try: |
|
589 cur.execute("select name from test") |
|
590 errors.append("did not raise ProgrammingError") |
|
591 return |
|
592 except sqlite.ProgrammingError: |
|
593 return |
|
594 except: |
|
595 errors.append("raised wrong exception") |
|
596 |
|
597 errors = [] |
|
598 self.cur.execute("insert into test(name) values ('a')") |
|
599 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) |
|
600 t.start() |
|
601 t.join() |
|
602 if len(errors) > 0: |
|
603 self.fail("\n".join(errors)) |
|
604 |
|
605 def CheckCurIterNext(self): |
|
606 def run(cur, errors): |
|
607 try: |
|
608 row = cur.fetchone() |
|
609 errors.append("did not raise ProgrammingError") |
|
610 return |
|
611 except sqlite.ProgrammingError: |
|
612 return |
|
613 except: |
|
614 errors.append("raised wrong exception") |
|
615 |
|
616 errors = [] |
|
617 self.cur.execute("insert into test(name) values ('a')") |
|
618 self.cur.execute("select name from test") |
|
619 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) |
|
620 t.start() |
|
621 t.join() |
|
622 if len(errors) > 0: |
|
623 self.fail("\n".join(errors)) |
|
624 |
|
625 class ConstructorTests(unittest.TestCase): |
|
626 def CheckDate(self): |
|
627 d = sqlite.Date(2004, 10, 28) |
|
628 |
|
629 def CheckTime(self): |
|
630 t = sqlite.Time(12, 39, 35) |
|
631 |
|
632 def CheckTimestamp(self): |
|
633 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) |
|
634 |
|
635 def CheckDateFromTicks(self): |
|
636 d = sqlite.DateFromTicks(42) |
|
637 |
|
638 def CheckTimeFromTicks(self): |
|
639 t = sqlite.TimeFromTicks(42) |
|
640 |
|
641 def CheckTimestampFromTicks(self): |
|
642 ts = sqlite.TimestampFromTicks(42) |
|
643 |
|
644 def CheckBinary(self): |
|
645 b = sqlite.Binary(chr(0) + "'") |
|
646 |
|
647 class ExtensionTests(unittest.TestCase): |
|
648 def CheckScriptStringSql(self): |
|
649 con = sqlite.connect(":memory:") |
|
650 cur = con.cursor() |
|
651 cur.executescript(""" |
|
652 -- bla bla |
|
653 /* a stupid comment */ |
|
654 create table a(i); |
|
655 insert into a(i) values (5); |
|
656 """) |
|
657 cur.execute("select i from a") |
|
658 res = cur.fetchone()[0] |
|
659 self.failUnlessEqual(res, 5) |
|
660 |
|
661 def CheckScriptStringUnicode(self): |
|
662 con = sqlite.connect(":memory:") |
|
663 cur = con.cursor() |
|
664 cur.executescript(u""" |
|
665 create table a(i); |
|
666 insert into a(i) values (5); |
|
667 select i from a; |
|
668 delete from a; |
|
669 insert into a(i) values (6); |
|
670 """) |
|
671 cur.execute("select i from a") |
|
672 res = cur.fetchone()[0] |
|
673 self.failUnlessEqual(res, 6) |
|
674 |
|
675 def CheckScriptErrorIncomplete(self): |
|
676 con = sqlite.connect(":memory:") |
|
677 cur = con.cursor() |
|
678 raised = False |
|
679 try: |
|
680 cur.executescript("create table test(sadfsadfdsa") |
|
681 except sqlite.ProgrammingError: |
|
682 raised = True |
|
683 self.failUnlessEqual(raised, True, "should have raised an exception") |
|
684 |
|
685 def CheckScriptErrorNormal(self): |
|
686 con = sqlite.connect(":memory:") |
|
687 cur = con.cursor() |
|
688 raised = False |
|
689 try: |
|
690 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") |
|
691 except sqlite.OperationalError: |
|
692 raised = True |
|
693 self.failUnlessEqual(raised, True, "should have raised an exception") |
|
694 |
|
695 def CheckConnectionExecute(self): |
|
696 con = sqlite.connect(":memory:") |
|
697 result = con.execute("select 5").fetchone()[0] |
|
698 self.failUnlessEqual(result, 5, "Basic test of Connection.execute") |
|
699 |
|
700 def CheckConnectionExecutemany(self): |
|
701 con = sqlite.connect(":memory:") |
|
702 con.execute("create table test(foo)") |
|
703 con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) |
|
704 result = con.execute("select foo from test order by foo").fetchall() |
|
705 self.failUnlessEqual(result[0][0], 3, "Basic test of Connection.executemany") |
|
706 self.failUnlessEqual(result[1][0], 4, "Basic test of Connection.executemany") |
|
707 |
|
708 def CheckConnectionExecutescript(self): |
|
709 con = sqlite.connect(":memory:") |
|
710 con.executescript("create table test(foo); insert into test(foo) values (5);") |
|
711 result = con.execute("select foo from test").fetchone()[0] |
|
712 self.failUnlessEqual(result, 5, "Basic test of Connection.executescript") |
|
713 |
|
714 class ClosedTests(unittest.TestCase): |
|
715 def setUp(self): |
|
716 pass |
|
717 |
|
718 def tearDown(self): |
|
719 pass |
|
720 |
|
721 def CheckClosedConCursor(self): |
|
722 con = sqlite.connect(":memory:") |
|
723 con.close() |
|
724 try: |
|
725 cur = con.cursor() |
|
726 self.fail("Should have raised a ProgrammingError") |
|
727 except sqlite.ProgrammingError: |
|
728 pass |
|
729 except: |
|
730 self.fail("Should have raised a ProgrammingError") |
|
731 |
|
732 def CheckClosedConCommit(self): |
|
733 con = sqlite.connect(":memory:") |
|
734 con.close() |
|
735 try: |
|
736 con.commit() |
|
737 self.fail("Should have raised a ProgrammingError") |
|
738 except sqlite.ProgrammingError: |
|
739 pass |
|
740 except: |
|
741 self.fail("Should have raised a ProgrammingError") |
|
742 |
|
743 def CheckClosedConRollback(self): |
|
744 con = sqlite.connect(":memory:") |
|
745 con.close() |
|
746 try: |
|
747 con.rollback() |
|
748 self.fail("Should have raised a ProgrammingError") |
|
749 except sqlite.ProgrammingError: |
|
750 pass |
|
751 except: |
|
752 self.fail("Should have raised a ProgrammingError") |
|
753 |
|
754 def CheckClosedCurExecute(self): |
|
755 con = sqlite.connect(":memory:") |
|
756 cur = con.cursor() |
|
757 con.close() |
|
758 try: |
|
759 cur.execute("select 4") |
|
760 self.fail("Should have raised a ProgrammingError") |
|
761 except sqlite.ProgrammingError: |
|
762 pass |
|
763 except: |
|
764 self.fail("Should have raised a ProgrammingError") |
|
765 |
|
766 def suite(): |
|
767 module_suite = unittest.makeSuite(ModuleTests, "Check") |
|
768 connection_suite = unittest.makeSuite(ConnectionTests, "Check") |
|
769 cursor_suite = unittest.makeSuite(CursorTests, "Check") |
|
770 thread_suite = unittest.makeSuite(ThreadTests, "Check") |
|
771 constructor_suite = unittest.makeSuite(ConstructorTests, "Check") |
|
772 ext_suite = unittest.makeSuite(ExtensionTests, "Check") |
|
773 closed_suite = unittest.makeSuite(ClosedTests, "Check") |
|
774 return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_suite)) |
|
775 |
|
776 def test(): |
|
777 runner = unittest.TextTestRunner() |
|
778 runner.run(suite()) |
|
779 |
|
780 if __name__ == "__main__": |
|
781 test() |