|
1 #-*- coding: ISO-8859-1 -*- |
|
2 # pysqlite2/test/dbapi.py: tests for DB-API compliance |
|
3 # |
|
4 # Copyright (C) 2004-2005 Gerhard Häring <gh@ghaering.de> |
|
5 # |
|
6 # This file is part of pysqlite. |
|
7 # |
|
8 # This software is provided 'as-is', without any express or implied |
|
9 # warranty. In no event will the authors be held liable for any damages |
|
10 # arising from the use of this software. |
|
11 # |
|
12 # Permission is granted to anyone to use this software for any purpose, |
|
13 # including commercial applications, and to alter it and redistribute it |
|
14 # freely, subject to the following restrictions: |
|
15 # |
|
16 # 1. The origin of this software must not be misrepresented; you must not |
|
17 # claim that you wrote the original software. If you use this software |
|
18 # in a product, an acknowledgment in the product documentation would be |
|
19 # appreciated but is not required. |
|
20 # 2. Altered source versions must be plainly marked as such, and must not be |
|
21 # misrepresented as being the original software. |
|
22 # 3. This notice may not be removed or altered from any source distribution. |
|
23 |
|
24 import unittest |
|
25 import threading |
|
26 import sqlite3 as sqlite |
|
27 |
|
28 class ModuleTests(unittest.TestCase): |
|
29 def CheckAPILevel(self): |
|
30 self.assertEqual(sqlite.apilevel, "2.0", |
|
31 "apilevel is %s, should be 2.0" % sqlite.apilevel) |
|
32 |
|
33 def CheckThreadSafety(self): |
|
34 self.assertEqual(sqlite.threadsafety, 1, |
|
35 "threadsafety is %d, should be 1" % sqlite.threadsafety) |
|
36 |
|
37 def CheckParamStyle(self): |
|
38 self.assertEqual(sqlite.paramstyle, "qmark", |
|
39 "paramstyle is '%s', should be 'qmark'" % |
|
40 sqlite.paramstyle) |
|
41 |
|
42 def CheckWarning(self): |
|
43 self.assert_(issubclass(sqlite.Warning, StandardError), |
|
44 "Warning is not a subclass of StandardError") |
|
45 |
|
46 def CheckError(self): |
|
47 self.failUnless(issubclass(sqlite.Error, StandardError), |
|
48 "Error is not a subclass of StandardError") |
|
49 |
|
50 def CheckInterfaceError(self): |
|
51 self.failUnless(issubclass(sqlite.InterfaceError, sqlite.Error), |
|
52 "InterfaceError is not a subclass of Error") |
|
53 |
|
54 def CheckDatabaseError(self): |
|
55 self.failUnless(issubclass(sqlite.DatabaseError, sqlite.Error), |
|
56 "DatabaseError is not a subclass of Error") |
|
57 |
|
58 def CheckDataError(self): |
|
59 self.failUnless(issubclass(sqlite.DataError, sqlite.DatabaseError), |
|
60 "DataError is not a subclass of DatabaseError") |
|
61 |
|
62 def CheckOperationalError(self): |
|
63 self.failUnless(issubclass(sqlite.OperationalError, sqlite.DatabaseError), |
|
64 "OperationalError is not a subclass of DatabaseError") |
|
65 |
|
66 def CheckIntegrityError(self): |
|
67 self.failUnless(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), |
|
68 "IntegrityError is not a subclass of DatabaseError") |
|
69 |
|
70 def CheckInternalError(self): |
|
71 self.failUnless(issubclass(sqlite.InternalError, sqlite.DatabaseError), |
|
72 "InternalError is not a subclass of DatabaseError") |
|
73 |
|
74 def CheckProgrammingError(self): |
|
75 self.failUnless(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), |
|
76 "ProgrammingError is not a subclass of DatabaseError") |
|
77 |
|
78 def CheckNotSupportedError(self): |
|
79 self.failUnless(issubclass(sqlite.NotSupportedError, |
|
80 sqlite.DatabaseError), |
|
81 "NotSupportedError is not a subclass of DatabaseError") |
|
82 |
|
83 class ConnectionTests(unittest.TestCase): |
|
84 def setUp(self): |
|
85 self.cx = sqlite.connect(":memory:") |
|
86 cu = self.cx.cursor() |
|
87 cu.execute("create table test(id integer primary key, name text)") |
|
88 cu.execute("insert into test(name) values (?)", ("foo",)) |
|
89 |
|
90 def tearDown(self): |
|
91 self.cx.close() |
|
92 |
|
93 def CheckCommit(self): |
|
94 self.cx.commit() |
|
95 |
|
96 def CheckCommitAfterNoChanges(self): |
|
97 """ |
|
98 A commit should also work when no changes were made to the database. |
|
99 """ |
|
100 self.cx.commit() |
|
101 self.cx.commit() |
|
102 |
|
103 def CheckRollback(self): |
|
104 self.cx.rollback() |
|
105 |
|
106 def CheckRollbackAfterNoChanges(self): |
|
107 """ |
|
108 A rollback should also work when no changes were made to the database. |
|
109 """ |
|
110 self.cx.rollback() |
|
111 self.cx.rollback() |
|
112 |
|
113 def CheckCursor(self): |
|
114 cu = self.cx.cursor() |
|
115 |
|
116 def CheckFailedOpen(self): |
|
117 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" |
|
118 try: |
|
119 con = sqlite.connect(YOU_CANNOT_OPEN_THIS) |
|
120 except sqlite.OperationalError: |
|
121 return |
|
122 self.fail("should have raised an OperationalError") |
|
123 |
|
124 def CheckClose(self): |
|
125 self.cx.close() |
|
126 |
|
127 def CheckExceptions(self): |
|
128 # Optional DB-API extension. |
|
129 self.failUnlessEqual(self.cx.Warning, sqlite.Warning) |
|
130 self.failUnlessEqual(self.cx.Error, sqlite.Error) |
|
131 self.failUnlessEqual(self.cx.InterfaceError, sqlite.InterfaceError) |
|
132 self.failUnlessEqual(self.cx.DatabaseError, sqlite.DatabaseError) |
|
133 self.failUnlessEqual(self.cx.DataError, sqlite.DataError) |
|
134 self.failUnlessEqual(self.cx.OperationalError, sqlite.OperationalError) |
|
135 self.failUnlessEqual(self.cx.IntegrityError, sqlite.IntegrityError) |
|
136 self.failUnlessEqual(self.cx.InternalError, sqlite.InternalError) |
|
137 self.failUnlessEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) |
|
138 self.failUnlessEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) |
|
139 |
|
140 class CursorTests(unittest.TestCase): |
|
141 def setUp(self): |
|
142 self.cx = sqlite.connect(":memory:") |
|
143 self.cu = self.cx.cursor() |
|
144 self.cu.execute("create table test(id integer primary key, name text, income number)") |
|
145 self.cu.execute("insert into test(name) values (?)", ("foo",)) |
|
146 |
|
147 def tearDown(self): |
|
148 self.cu.close() |
|
149 self.cx.close() |
|
150 |
|
151 def CheckExecuteNoArgs(self): |
|
152 self.cu.execute("delete from test") |
|
153 |
|
154 def CheckExecuteIllegalSql(self): |
|
155 try: |
|
156 self.cu.execute("select asdf") |
|
157 self.fail("should have raised an OperationalError") |
|
158 except sqlite.OperationalError: |
|
159 return |
|
160 except: |
|
161 self.fail("raised wrong exception") |
|
162 |
|
163 def CheckExecuteTooMuchSql(self): |
|
164 try: |
|
165 self.cu.execute("select 5+4; select 4+5") |
|
166 self.fail("should have raised a Warning") |
|
167 except sqlite.Warning: |
|
168 return |
|
169 except: |
|
170 self.fail("raised wrong exception") |
|
171 |
|
172 def CheckExecuteTooMuchSql2(self): |
|
173 self.cu.execute("select 5+4; -- foo bar") |
|
174 |
|
175 def CheckExecuteTooMuchSql3(self): |
|
176 self.cu.execute(""" |
|
177 select 5+4; |
|
178 |
|
179 /* |
|
180 foo |
|
181 */ |
|
182 """) |
|
183 |
|
184 def CheckExecuteWrongSqlArg(self): |
|
185 try: |
|
186 self.cu.execute(42) |
|
187 self.fail("should have raised a ValueError") |
|
188 except ValueError: |
|
189 return |
|
190 except: |
|
191 self.fail("raised wrong exception.") |
|
192 |
|
193 def CheckExecuteArgInt(self): |
|
194 self.cu.execute("insert into test(id) values (?)", (42,)) |
|
195 |
|
196 def CheckExecuteArgFloat(self): |
|
197 self.cu.execute("insert into test(income) values (?)", (2500.32,)) |
|
198 |
|
199 def CheckExecuteArgString(self): |
|
200 self.cu.execute("insert into test(name) values (?)", ("Hugo",)) |
|
201 |
|
202 def CheckExecuteWrongNoOfArgs1(self): |
|
203 # too many parameters |
|
204 try: |
|
205 self.cu.execute("insert into test(id) values (?)", (17, "Egon")) |
|
206 self.fail("should have raised ProgrammingError") |
|
207 except sqlite.ProgrammingError: |
|
208 pass |
|
209 |
|
210 def CheckExecuteWrongNoOfArgs2(self): |
|
211 # too little parameters |
|
212 try: |
|
213 self.cu.execute("insert into test(id) values (?)") |
|
214 self.fail("should have raised ProgrammingError") |
|
215 except sqlite.ProgrammingError: |
|
216 pass |
|
217 |
|
218 def CheckExecuteWrongNoOfArgs3(self): |
|
219 # no parameters, parameters are needed |
|
220 try: |
|
221 self.cu.execute("insert into test(id) values (?)") |
|
222 self.fail("should have raised ProgrammingError") |
|
223 except sqlite.ProgrammingError: |
|
224 pass |
|
225 |
|
226 def CheckExecuteDictMapping(self): |
|
227 self.cu.execute("insert into test(name) values ('foo')") |
|
228 self.cu.execute("select name from test where name=:name", {"name": "foo"}) |
|
229 row = self.cu.fetchone() |
|
230 self.failUnlessEqual(row[0], "foo") |
|
231 |
|
232 def CheckExecuteDictMappingTooLittleArgs(self): |
|
233 self.cu.execute("insert into test(name) values ('foo')") |
|
234 try: |
|
235 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) |
|
236 self.fail("should have raised ProgrammingError") |
|
237 except sqlite.ProgrammingError: |
|
238 pass |
|
239 |
|
240 def CheckExecuteDictMappingNoArgs(self): |
|
241 self.cu.execute("insert into test(name) values ('foo')") |
|
242 try: |
|
243 self.cu.execute("select name from test where name=:name") |
|
244 self.fail("should have raised ProgrammingError") |
|
245 except sqlite.ProgrammingError: |
|
246 pass |
|
247 |
|
248 def CheckExecuteDictMappingUnnamed(self): |
|
249 self.cu.execute("insert into test(name) values ('foo')") |
|
250 try: |
|
251 self.cu.execute("select name from test where name=?", {"name": "foo"}) |
|
252 self.fail("should have raised ProgrammingError") |
|
253 except sqlite.ProgrammingError: |
|
254 pass |
|
255 |
|
256 def CheckClose(self): |
|
257 self.cu.close() |
|
258 |
|
259 def CheckRowcountExecute(self): |
|
260 self.cu.execute("delete from test") |
|
261 self.cu.execute("insert into test(name) values ('foo')") |
|
262 self.cu.execute("insert into test(name) values ('foo')") |
|
263 self.cu.execute("update test set name='bar'") |
|
264 self.failUnlessEqual(self.cu.rowcount, 2) |
|
265 |
|
266 def CheckRowcountExecutemany(self): |
|
267 self.cu.execute("delete from test") |
|
268 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) |
|
269 self.failUnlessEqual(self.cu.rowcount, 3) |
|
270 |
|
271 def CheckTotalChanges(self): |
|
272 self.cu.execute("insert into test(name) values ('foo')") |
|
273 self.cu.execute("insert into test(name) values ('foo')") |
|
274 if self.cx.total_changes < 2: |
|
275 self.fail("total changes reported wrong value") |
|
276 |
|
277 # Checks for executemany: |
|
278 # Sequences are required by the DB-API, iterators |
|
279 # enhancements in pysqlite. |
|
280 |
|
281 def CheckExecuteManySequence(self): |
|
282 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) |
|
283 |
|
284 def CheckExecuteManyIterator(self): |
|
285 class MyIter: |
|
286 def __init__(self): |
|
287 self.value = 5 |
|
288 |
|
289 def next(self): |
|
290 if self.value == 10: |
|
291 raise StopIteration |
|
292 else: |
|
293 self.value += 1 |
|
294 return (self.value,) |
|
295 |
|
296 self.cu.executemany("insert into test(income) values (?)", MyIter()) |
|
297 |
|
298 def CheckExecuteManyGenerator(self): |
|
299 def mygen(): |
|
300 for i in range(5): |
|
301 yield (i,) |
|
302 |
|
303 self.cu.executemany("insert into test(income) values (?)", mygen()) |
|
304 |
|
305 def CheckExecuteManyWrongSqlArg(self): |
|
306 try: |
|
307 self.cu.executemany(42, [(3,)]) |
|
308 self.fail("should have raised a ValueError") |
|
309 except ValueError: |
|
310 return |
|
311 except: |
|
312 self.fail("raised wrong exception.") |
|
313 |
|
314 def CheckExecuteManySelect(self): |
|
315 try: |
|
316 self.cu.executemany("select ?", [(3,)]) |
|
317 self.fail("should have raised a ProgrammingError") |
|
318 except sqlite.ProgrammingError: |
|
319 return |
|
320 except: |
|
321 self.fail("raised wrong exception.") |
|
322 |
|
323 def CheckExecuteManyNotIterable(self): |
|
324 try: |
|
325 self.cu.executemany("insert into test(income) values (?)", 42) |
|
326 self.fail("should have raised a TypeError") |
|
327 except TypeError: |
|
328 return |
|
329 except Exception, e: |
|
330 print "raised", e.__class__ |
|
331 self.fail("raised wrong exception.") |
|
332 |
|
333 def CheckFetchIter(self): |
|
334 # Optional DB-API extension. |
|
335 self.cu.execute("delete from test") |
|
336 self.cu.execute("insert into test(id) values (?)", (5,)) |
|
337 self.cu.execute("insert into test(id) values (?)", (6,)) |
|
338 self.cu.execute("select id from test order by id") |
|
339 lst = [] |
|
340 for row in self.cu: |
|
341 lst.append(row[0]) |
|
342 self.failUnlessEqual(lst[0], 5) |
|
343 self.failUnlessEqual(lst[1], 6) |
|
344 |
|
345 def CheckFetchone(self): |
|
346 self.cu.execute("select name from test") |
|
347 row = self.cu.fetchone() |
|
348 self.failUnlessEqual(row[0], "foo") |
|
349 row = self.cu.fetchone() |
|
350 self.failUnlessEqual(row, None) |
|
351 |
|
352 def CheckFetchoneNoStatement(self): |
|
353 cur = self.cx.cursor() |
|
354 row = cur.fetchone() |
|
355 self.failUnlessEqual(row, None) |
|
356 |
|
357 def CheckArraySize(self): |
|
358 # must default ot 1 |
|
359 self.failUnlessEqual(self.cu.arraysize, 1) |
|
360 |
|
361 # now set to 2 |
|
362 self.cu.arraysize = 2 |
|
363 |
|
364 # now make the query return 3 rows |
|
365 self.cu.execute("delete from test") |
|
366 self.cu.execute("insert into test(name) values ('A')") |
|
367 self.cu.execute("insert into test(name) values ('B')") |
|
368 self.cu.execute("insert into test(name) values ('C')") |
|
369 self.cu.execute("select name from test") |
|
370 res = self.cu.fetchmany() |
|
371 |
|
372 self.failUnlessEqual(len(res), 2) |
|
373 |
|
374 def CheckFetchmany(self): |
|
375 self.cu.execute("select name from test") |
|
376 res = self.cu.fetchmany(100) |
|
377 self.failUnlessEqual(len(res), 1) |
|
378 res = self.cu.fetchmany(100) |
|
379 self.failUnlessEqual(res, []) |
|
380 |
|
381 def CheckFetchall(self): |
|
382 self.cu.execute("select name from test") |
|
383 res = self.cu.fetchall() |
|
384 self.failUnlessEqual(len(res), 1) |
|
385 res = self.cu.fetchall() |
|
386 self.failUnlessEqual(res, []) |
|
387 |
|
388 def CheckSetinputsizes(self): |
|
389 self.cu.setinputsizes([3, 4, 5]) |
|
390 |
|
391 def CheckSetoutputsize(self): |
|
392 self.cu.setoutputsize(5, 0) |
|
393 |
|
394 def CheckSetoutputsizeNoColumn(self): |
|
395 self.cu.setoutputsize(42) |
|
396 |
|
397 def CheckCursorConnection(self): |
|
398 # Optional DB-API extension. |
|
399 self.failUnlessEqual(self.cu.connection, self.cx) |
|
400 |
|
401 def CheckWrongCursorCallable(self): |
|
402 try: |
|
403 def f(): pass |
|
404 cur = self.cx.cursor(f) |
|
405 self.fail("should have raised a TypeError") |
|
406 except TypeError: |
|
407 return |
|
408 self.fail("should have raised a ValueError") |
|
409 |
|
410 def CheckCursorWrongClass(self): |
|
411 class Foo: pass |
|
412 foo = Foo() |
|
413 try: |
|
414 cur = sqlite.Cursor(foo) |
|
415 self.fail("should have raised a ValueError") |
|
416 except TypeError: |
|
417 pass |
|
418 |
|
419 class ThreadTests(unittest.TestCase): |
|
420 def setUp(self): |
|
421 self.con = sqlite.connect(":memory:") |
|
422 self.cur = self.con.cursor() |
|
423 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)") |
|
424 |
|
425 def tearDown(self): |
|
426 self.cur.close() |
|
427 self.con.close() |
|
428 |
|
429 def CheckConCursor(self): |
|
430 def run(con, errors): |
|
431 try: |
|
432 cur = con.cursor() |
|
433 errors.append("did not raise ProgrammingError") |
|
434 return |
|
435 except sqlite.ProgrammingError: |
|
436 return |
|
437 except: |
|
438 errors.append("raised wrong exception") |
|
439 |
|
440 errors = [] |
|
441 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) |
|
442 t.start() |
|
443 t.join() |
|
444 if len(errors) > 0: |
|
445 self.fail("\n".join(errors)) |
|
446 |
|
447 def CheckConCommit(self): |
|
448 def run(con, errors): |
|
449 try: |
|
450 con.commit() |
|
451 errors.append("did not raise ProgrammingError") |
|
452 return |
|
453 except sqlite.ProgrammingError: |
|
454 return |
|
455 except: |
|
456 errors.append("raised wrong exception") |
|
457 |
|
458 errors = [] |
|
459 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) |
|
460 t.start() |
|
461 t.join() |
|
462 if len(errors) > 0: |
|
463 self.fail("\n".join(errors)) |
|
464 |
|
465 def CheckConRollback(self): |
|
466 def run(con, errors): |
|
467 try: |
|
468 con.rollback() |
|
469 errors.append("did not raise ProgrammingError") |
|
470 return |
|
471 except sqlite.ProgrammingError: |
|
472 return |
|
473 except: |
|
474 errors.append("raised wrong exception") |
|
475 |
|
476 errors = [] |
|
477 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) |
|
478 t.start() |
|
479 t.join() |
|
480 if len(errors) > 0: |
|
481 self.fail("\n".join(errors)) |
|
482 |
|
483 def CheckConClose(self): |
|
484 def run(con, errors): |
|
485 try: |
|
486 con.close() |
|
487 errors.append("did not raise ProgrammingError") |
|
488 return |
|
489 except sqlite.ProgrammingError: |
|
490 return |
|
491 except: |
|
492 errors.append("raised wrong exception") |
|
493 |
|
494 errors = [] |
|
495 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) |
|
496 t.start() |
|
497 t.join() |
|
498 if len(errors) > 0: |
|
499 self.fail("\n".join(errors)) |
|
500 |
|
501 def CheckCurImplicitBegin(self): |
|
502 def run(cur, errors): |
|
503 try: |
|
504 cur.execute("insert into test(name) values ('a')") |
|
505 errors.append("did not raise ProgrammingError") |
|
506 return |
|
507 except sqlite.ProgrammingError: |
|
508 return |
|
509 except: |
|
510 errors.append("raised wrong exception") |
|
511 |
|
512 errors = [] |
|
513 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) |
|
514 t.start() |
|
515 t.join() |
|
516 if len(errors) > 0: |
|
517 self.fail("\n".join(errors)) |
|
518 |
|
519 def CheckCurClose(self): |
|
520 def run(cur, errors): |
|
521 try: |
|
522 cur.close() |
|
523 errors.append("did not raise ProgrammingError") |
|
524 return |
|
525 except sqlite.ProgrammingError: |
|
526 return |
|
527 except: |
|
528 errors.append("raised wrong exception") |
|
529 |
|
530 errors = [] |
|
531 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) |
|
532 t.start() |
|
533 t.join() |
|
534 if len(errors) > 0: |
|
535 self.fail("\n".join(errors)) |
|
536 |
|
537 def CheckCurExecute(self): |
|
538 def run(cur, errors): |
|
539 try: |
|
540 cur.execute("select name from test") |
|
541 errors.append("did not raise ProgrammingError") |
|
542 return |
|
543 except sqlite.ProgrammingError: |
|
544 return |
|
545 except: |
|
546 errors.append("raised wrong exception") |
|
547 |
|
548 errors = [] |
|
549 self.cur.execute("insert into test(name) values ('a')") |
|
550 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) |
|
551 t.start() |
|
552 t.join() |
|
553 if len(errors) > 0: |
|
554 self.fail("\n".join(errors)) |
|
555 |
|
556 def CheckCurIterNext(self): |
|
557 def run(cur, errors): |
|
558 try: |
|
559 row = cur.fetchone() |
|
560 errors.append("did not raise ProgrammingError") |
|
561 return |
|
562 except sqlite.ProgrammingError: |
|
563 return |
|
564 except: |
|
565 errors.append("raised wrong exception") |
|
566 |
|
567 errors = [] |
|
568 self.cur.execute("insert into test(name) values ('a')") |
|
569 self.cur.execute("select name from test") |
|
570 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) |
|
571 t.start() |
|
572 t.join() |
|
573 if len(errors) > 0: |
|
574 self.fail("\n".join(errors)) |
|
575 |
|
576 class ConstructorTests(unittest.TestCase): |
|
577 def CheckDate(self): |
|
578 d = sqlite.Date(2004, 10, 28) |
|
579 |
|
580 def CheckTime(self): |
|
581 t = sqlite.Time(12, 39, 35) |
|
582 |
|
583 def CheckTimestamp(self): |
|
584 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) |
|
585 |
|
586 def CheckDateFromTicks(self): |
|
587 d = sqlite.DateFromTicks(42) |
|
588 |
|
589 def CheckTimeFromTicks(self): |
|
590 t = sqlite.TimeFromTicks(42) |
|
591 |
|
592 def CheckTimestampFromTicks(self): |
|
593 ts = sqlite.TimestampFromTicks(42) |
|
594 |
|
595 def CheckBinary(self): |
|
596 b = sqlite.Binary(chr(0) + "'") |
|
597 |
|
598 class ExtensionTests(unittest.TestCase): |
|
599 def CheckScriptStringSql(self): |
|
600 con = sqlite.connect(":memory:") |
|
601 cur = con.cursor() |
|
602 cur.executescript(""" |
|
603 -- bla bla |
|
604 /* a stupid comment */ |
|
605 create table a(i); |
|
606 insert into a(i) values (5); |
|
607 """) |
|
608 cur.execute("select i from a") |
|
609 res = cur.fetchone()[0] |
|
610 self.failUnlessEqual(res, 5) |
|
611 |
|
612 def CheckScriptStringUnicode(self): |
|
613 con = sqlite.connect(":memory:") |
|
614 cur = con.cursor() |
|
615 cur.executescript(u""" |
|
616 create table a(i); |
|
617 insert into a(i) values (5); |
|
618 select i from a; |
|
619 delete from a; |
|
620 insert into a(i) values (6); |
|
621 """) |
|
622 cur.execute("select i from a") |
|
623 res = cur.fetchone()[0] |
|
624 self.failUnlessEqual(res, 6) |
|
625 |
|
626 def CheckScriptErrorIncomplete(self): |
|
627 con = sqlite.connect(":memory:") |
|
628 cur = con.cursor() |
|
629 raised = False |
|
630 try: |
|
631 cur.executescript("create table test(sadfsadfdsa") |
|
632 except sqlite.ProgrammingError: |
|
633 raised = True |
|
634 self.failUnlessEqual(raised, True, "should have raised an exception") |
|
635 |
|
636 def CheckScriptErrorNormal(self): |
|
637 con = sqlite.connect(":memory:") |
|
638 cur = con.cursor() |
|
639 raised = False |
|
640 try: |
|
641 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") |
|
642 except sqlite.OperationalError: |
|
643 raised = True |
|
644 self.failUnlessEqual(raised, True, "should have raised an exception") |
|
645 |
|
646 def CheckConnectionExecute(self): |
|
647 con = sqlite.connect(":memory:") |
|
648 result = con.execute("select 5").fetchone()[0] |
|
649 self.failUnlessEqual(result, 5, "Basic test of Connection.execute") |
|
650 |
|
651 def CheckConnectionExecutemany(self): |
|
652 con = sqlite.connect(":memory:") |
|
653 con.execute("create table test(foo)") |
|
654 con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) |
|
655 result = con.execute("select foo from test order by foo").fetchall() |
|
656 self.failUnlessEqual(result[0][0], 3, "Basic test of Connection.executemany") |
|
657 self.failUnlessEqual(result[1][0], 4, "Basic test of Connection.executemany") |
|
658 |
|
659 def CheckConnectionExecutescript(self): |
|
660 con = sqlite.connect(":memory:") |
|
661 con.executescript("create table test(foo); insert into test(foo) values (5);") |
|
662 result = con.execute("select foo from test").fetchone()[0] |
|
663 self.failUnlessEqual(result, 5, "Basic test of Connection.executescript") |
|
664 |
|
665 class ClosedTests(unittest.TestCase): |
|
666 def setUp(self): |
|
667 pass |
|
668 |
|
669 def tearDown(self): |
|
670 pass |
|
671 |
|
672 def CheckClosedConCursor(self): |
|
673 con = sqlite.connect(":memory:") |
|
674 con.close() |
|
675 try: |
|
676 cur = con.cursor() |
|
677 self.fail("Should have raised a ProgrammingError") |
|
678 except sqlite.ProgrammingError: |
|
679 pass |
|
680 except: |
|
681 self.fail("Should have raised a ProgrammingError") |
|
682 |
|
683 def CheckClosedConCommit(self): |
|
684 con = sqlite.connect(":memory:") |
|
685 con.close() |
|
686 try: |
|
687 con.commit() |
|
688 self.fail("Should have raised a ProgrammingError") |
|
689 except sqlite.ProgrammingError: |
|
690 pass |
|
691 except: |
|
692 self.fail("Should have raised a ProgrammingError") |
|
693 |
|
694 def CheckClosedConRollback(self): |
|
695 con = sqlite.connect(":memory:") |
|
696 con.close() |
|
697 try: |
|
698 con.rollback() |
|
699 self.fail("Should have raised a ProgrammingError") |
|
700 except sqlite.ProgrammingError: |
|
701 pass |
|
702 except: |
|
703 self.fail("Should have raised a ProgrammingError") |
|
704 |
|
705 def CheckClosedCurExecute(self): |
|
706 con = sqlite.connect(":memory:") |
|
707 cur = con.cursor() |
|
708 con.close() |
|
709 try: |
|
710 cur.execute("select 4") |
|
711 self.fail("Should have raised a ProgrammingError") |
|
712 except sqlite.ProgrammingError: |
|
713 pass |
|
714 except: |
|
715 self.fail("Should have raised a ProgrammingError") |
|
716 |
|
717 def suite(): |
|
718 module_suite = unittest.makeSuite(ModuleTests, "Check") |
|
719 connection_suite = unittest.makeSuite(ConnectionTests, "Check") |
|
720 cursor_suite = unittest.makeSuite(CursorTests, "Check") |
|
721 thread_suite = unittest.makeSuite(ThreadTests, "Check") |
|
722 constructor_suite = unittest.makeSuite(ConstructorTests, "Check") |
|
723 ext_suite = unittest.makeSuite(ExtensionTests, "Check") |
|
724 closed_suite = unittest.makeSuite(ClosedTests, "Check") |
|
725 return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_suite)) |
|
726 |
|
727 def test(): |
|
728 runner = unittest.TextTestRunner() |
|
729 runner.run(suite()) |
|
730 |
|
731 if __name__ == "__main__": |
|
732 test() |