|
1 import sys |
|
2 import os |
|
3 import marshal |
|
4 import imp |
|
5 import struct |
|
6 import time |
|
7 import unittest |
|
8 |
|
9 import zlib # implied prerequisite |
|
10 from zipfile import ZipFile, ZipInfo, ZIP_STORED, ZIP_DEFLATED |
|
11 from test import test_support |
|
12 from test.test_importhooks import ImportHooksBaseTestCase, test_src, test_co |
|
13 |
|
14 import zipimport |
|
15 import linecache |
|
16 import doctest |
|
17 import inspect |
|
18 import StringIO |
|
19 from traceback import extract_tb, extract_stack, print_tb |
|
20 raise_src = 'def do_raise(): raise TypeError\n' |
|
21 |
|
22 # so we only run testAFakeZlib once if this test is run repeatedly |
|
23 # which happens when we look for ref leaks |
|
24 test_imported = False |
|
25 |
|
26 |
|
27 def make_pyc(co, mtime): |
|
28 data = marshal.dumps(co) |
|
29 if type(mtime) is type(0.0): |
|
30 # Mac mtimes need a bit of special casing |
|
31 if mtime < 0x7fffffff: |
|
32 mtime = int(mtime) |
|
33 else: |
|
34 mtime = int(-0x100000000L + long(mtime)) |
|
35 pyc = imp.get_magic() + struct.pack("<i", int(mtime)) + data |
|
36 return pyc |
|
37 |
|
38 def module_path_to_dotted_name(path): |
|
39 return path.replace(os.sep, '.') |
|
40 |
|
41 NOW = time.time() |
|
42 test_pyc = make_pyc(test_co, NOW) |
|
43 |
|
44 |
|
45 if __debug__: |
|
46 pyc_ext = ".pyc" |
|
47 else: |
|
48 pyc_ext = ".pyo" |
|
49 |
|
50 |
|
51 TESTMOD = "ziptestmodule" |
|
52 TESTPACK = "ziptestpackage" |
|
53 TESTPACK2 = "ziptestpackage2" |
|
54 TEMP_ZIP = os.path.abspath("junk95142" + os.extsep + "zip") |
|
55 |
|
56 class UncompressedZipImportTestCase(ImportHooksBaseTestCase): |
|
57 |
|
58 compression = ZIP_STORED |
|
59 |
|
60 def setUp(self): |
|
61 # We're reusing the zip archive path, so we must clear the |
|
62 # cached directory info and linecache |
|
63 linecache.clearcache() |
|
64 zipimport._zip_directory_cache.clear() |
|
65 ImportHooksBaseTestCase.setUp(self) |
|
66 |
|
67 def doTest(self, expected_ext, files, *modules, **kw): |
|
68 z = ZipFile(TEMP_ZIP, "w") |
|
69 try: |
|
70 for name, (mtime, data) in files.items(): |
|
71 zinfo = ZipInfo(name, time.localtime(mtime)) |
|
72 zinfo.compress_type = self.compression |
|
73 z.writestr(zinfo, data) |
|
74 z.close() |
|
75 |
|
76 stuff = kw.get("stuff", None) |
|
77 if stuff is not None: |
|
78 # Prepend 'stuff' to the start of the zipfile |
|
79 f = open(TEMP_ZIP, "rb") |
|
80 data = f.read() |
|
81 f.close() |
|
82 |
|
83 f = open(TEMP_ZIP, "wb") |
|
84 f.write(stuff) |
|
85 f.write(data) |
|
86 f.close() |
|
87 |
|
88 sys.path.insert(0, TEMP_ZIP) |
|
89 |
|
90 mod = __import__(".".join(modules), globals(), locals(), |
|
91 ["__dummy__"]) |
|
92 |
|
93 call = kw.get('call') |
|
94 if call is not None: |
|
95 call(mod) |
|
96 |
|
97 if expected_ext: |
|
98 file = mod.get_file() |
|
99 self.assertEquals(file, os.path.join(TEMP_ZIP, |
|
100 *modules) + expected_ext) |
|
101 finally: |
|
102 z.close() |
|
103 os.remove(TEMP_ZIP) |
|
104 |
|
105 def testAFakeZlib(self): |
|
106 # |
|
107 # This could cause a stack overflow before: importing zlib.py |
|
108 # from a compressed archive would cause zlib to be imported |
|
109 # which would find zlib.py in the archive, which would... etc. |
|
110 # |
|
111 # This test *must* be executed first: it must be the first one |
|
112 # to trigger zipimport to import zlib (zipimport caches the |
|
113 # zlib.decompress function object, after which the problem being |
|
114 # tested here wouldn't be a problem anymore... |
|
115 # (Hence the 'A' in the test method name: to make it the first |
|
116 # item in a list sorted by name, like unittest.makeSuite() does.) |
|
117 # |
|
118 # This test fails on platforms on which the zlib module is |
|
119 # statically linked, but the problem it tests for can't |
|
120 # occur in that case (builtin modules are always found first), |
|
121 # so we'll simply skip it then. Bug #765456. |
|
122 # |
|
123 if "zlib" in sys.builtin_module_names: |
|
124 return |
|
125 if "zlib" in sys.modules: |
|
126 del sys.modules["zlib"] |
|
127 files = {"zlib.py": (NOW, test_src)} |
|
128 try: |
|
129 self.doTest(".py", files, "zlib") |
|
130 except ImportError: |
|
131 if self.compression != ZIP_DEFLATED: |
|
132 self.fail("expected test to not raise ImportError") |
|
133 else: |
|
134 if self.compression != ZIP_STORED: |
|
135 self.fail("expected test to raise ImportError") |
|
136 |
|
137 def testPy(self): |
|
138 files = {TESTMOD + ".py": (NOW, test_src)} |
|
139 self.doTest(".py", files, TESTMOD) |
|
140 |
|
141 def testPyc(self): |
|
142 files = {TESTMOD + pyc_ext: (NOW, test_pyc)} |
|
143 self.doTest(pyc_ext, files, TESTMOD) |
|
144 |
|
145 def testBoth(self): |
|
146 files = {TESTMOD + ".py": (NOW, test_src), |
|
147 TESTMOD + pyc_ext: (NOW, test_pyc)} |
|
148 self.doTest(pyc_ext, files, TESTMOD) |
|
149 |
|
150 def testEmptyPy(self): |
|
151 files = {TESTMOD + ".py": (NOW, "")} |
|
152 self.doTest(None, files, TESTMOD) |
|
153 |
|
154 def testBadMagic(self): |
|
155 # make pyc magic word invalid, forcing loading from .py |
|
156 m0 = ord(test_pyc[0]) |
|
157 m0 ^= 0x04 # flip an arbitrary bit |
|
158 badmagic_pyc = chr(m0) + test_pyc[1:] |
|
159 files = {TESTMOD + ".py": (NOW, test_src), |
|
160 TESTMOD + pyc_ext: (NOW, badmagic_pyc)} |
|
161 self.doTest(".py", files, TESTMOD) |
|
162 |
|
163 def testBadMagic2(self): |
|
164 # make pyc magic word invalid, causing an ImportError |
|
165 m0 = ord(test_pyc[0]) |
|
166 m0 ^= 0x04 # flip an arbitrary bit |
|
167 badmagic_pyc = chr(m0) + test_pyc[1:] |
|
168 files = {TESTMOD + pyc_ext: (NOW, badmagic_pyc)} |
|
169 try: |
|
170 self.doTest(".py", files, TESTMOD) |
|
171 except ImportError: |
|
172 pass |
|
173 else: |
|
174 self.fail("expected ImportError; import from bad pyc") |
|
175 |
|
176 def testBadMTime(self): |
|
177 t3 = ord(test_pyc[7]) |
|
178 t3 ^= 0x02 # flip the second bit -- not the first as that one |
|
179 # isn't stored in the .py's mtime in the zip archive. |
|
180 badtime_pyc = test_pyc[:7] + chr(t3) + test_pyc[8:] |
|
181 files = {TESTMOD + ".py": (NOW, test_src), |
|
182 TESTMOD + pyc_ext: (NOW, badtime_pyc)} |
|
183 self.doTest(".py", files, TESTMOD) |
|
184 |
|
185 def testPackage(self): |
|
186 packdir = TESTPACK + os.sep |
|
187 files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc), |
|
188 packdir + TESTMOD + pyc_ext: (NOW, test_pyc)} |
|
189 self.doTest(pyc_ext, files, TESTPACK, TESTMOD) |
|
190 |
|
191 def testDeepPackage(self): |
|
192 packdir = TESTPACK + os.sep |
|
193 packdir2 = packdir + TESTPACK2 + os.sep |
|
194 files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc), |
|
195 packdir2 + "__init__" + pyc_ext: (NOW, test_pyc), |
|
196 packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)} |
|
197 self.doTest(pyc_ext, files, TESTPACK, TESTPACK2, TESTMOD) |
|
198 |
|
199 def testZipImporterMethods(self): |
|
200 packdir = TESTPACK + os.sep |
|
201 packdir2 = packdir + TESTPACK2 + os.sep |
|
202 files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc), |
|
203 packdir2 + "__init__" + pyc_ext: (NOW, test_pyc), |
|
204 packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)} |
|
205 |
|
206 z = ZipFile(TEMP_ZIP, "w") |
|
207 try: |
|
208 for name, (mtime, data) in files.items(): |
|
209 zinfo = ZipInfo(name, time.localtime(mtime)) |
|
210 zinfo.compress_type = self.compression |
|
211 z.writestr(zinfo, data) |
|
212 z.close() |
|
213 |
|
214 zi = zipimport.zipimporter(TEMP_ZIP) |
|
215 self.assertEquals(zi.archive, TEMP_ZIP) |
|
216 self.assertEquals(zi.is_package(TESTPACK), True) |
|
217 zi.load_module(TESTPACK) |
|
218 |
|
219 self.assertEquals(zi.is_package(packdir + '__init__'), False) |
|
220 self.assertEquals(zi.is_package(packdir + TESTPACK2), True) |
|
221 self.assertEquals(zi.is_package(packdir2 + TESTMOD), False) |
|
222 |
|
223 mod_name = packdir2 + TESTMOD |
|
224 mod = __import__(module_path_to_dotted_name(mod_name)) |
|
225 self.assertEquals(zi.get_source(TESTPACK), None) |
|
226 self.assertEquals(zi.get_source(mod_name), None) |
|
227 |
|
228 # test prefix and archivepath members |
|
229 zi2 = zipimport.zipimporter(TEMP_ZIP + os.sep + TESTPACK) |
|
230 self.assertEquals(zi2.archive, TEMP_ZIP) |
|
231 self.assertEquals(zi2.prefix, TESTPACK + os.sep) |
|
232 finally: |
|
233 z.close() |
|
234 os.remove(TEMP_ZIP) |
|
235 |
|
236 def testZipImporterMethodsInSubDirectory(self): |
|
237 packdir = TESTPACK + os.sep |
|
238 packdir2 = packdir + TESTPACK2 + os.sep |
|
239 files = {packdir2 + "__init__" + pyc_ext: (NOW, test_pyc), |
|
240 packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)} |
|
241 |
|
242 z = ZipFile(TEMP_ZIP, "w") |
|
243 try: |
|
244 for name, (mtime, data) in files.items(): |
|
245 zinfo = ZipInfo(name, time.localtime(mtime)) |
|
246 zinfo.compress_type = self.compression |
|
247 z.writestr(zinfo, data) |
|
248 z.close() |
|
249 |
|
250 zi = zipimport.zipimporter(TEMP_ZIP + os.sep + packdir) |
|
251 self.assertEquals(zi.archive, TEMP_ZIP) |
|
252 self.assertEquals(zi.prefix, packdir) |
|
253 self.assertEquals(zi.is_package(TESTPACK2), True) |
|
254 zi.load_module(TESTPACK2) |
|
255 |
|
256 self.assertEquals(zi.is_package(TESTPACK2 + os.sep + '__init__'), False) |
|
257 self.assertEquals(zi.is_package(TESTPACK2 + os.sep + TESTMOD), False) |
|
258 |
|
259 mod_name = TESTPACK2 + os.sep + TESTMOD |
|
260 mod = __import__(module_path_to_dotted_name(mod_name)) |
|
261 self.assertEquals(zi.get_source(TESTPACK2), None) |
|
262 self.assertEquals(zi.get_source(mod_name), None) |
|
263 finally: |
|
264 z.close() |
|
265 os.remove(TEMP_ZIP) |
|
266 |
|
267 def testGetData(self): |
|
268 z = ZipFile(TEMP_ZIP, "w") |
|
269 z.compression = self.compression |
|
270 try: |
|
271 name = "testdata.dat" |
|
272 data = "".join([chr(x) for x in range(256)]) * 500 |
|
273 z.writestr(name, data) |
|
274 z.close() |
|
275 zi = zipimport.zipimporter(TEMP_ZIP) |
|
276 self.assertEquals(data, zi.get_data(name)) |
|
277 self.assert_('zipimporter object' in repr(zi)) |
|
278 finally: |
|
279 z.close() |
|
280 os.remove(TEMP_ZIP) |
|
281 |
|
282 def testImporterAttr(self): |
|
283 src = """if 1: # indent hack |
|
284 def get_file(): |
|
285 return __file__ |
|
286 if __loader__.get_data("some.data") != "some data": |
|
287 raise AssertionError, "bad data"\n""" |
|
288 pyc = make_pyc(compile(src, "<???>", "exec"), NOW) |
|
289 files = {TESTMOD + pyc_ext: (NOW, pyc), |
|
290 "some.data": (NOW, "some data")} |
|
291 self.doTest(pyc_ext, files, TESTMOD) |
|
292 |
|
293 def testImport_WithStuff(self): |
|
294 # try importing from a zipfile which contains additional |
|
295 # stuff at the beginning of the file |
|
296 files = {TESTMOD + ".py": (NOW, test_src)} |
|
297 self.doTest(".py", files, TESTMOD, |
|
298 stuff="Some Stuff"*31) |
|
299 |
|
300 def assertModuleSource(self, module): |
|
301 self.assertEqual(inspect.getsource(module), test_src) |
|
302 |
|
303 def testGetSource(self): |
|
304 files = {TESTMOD + ".py": (NOW, test_src)} |
|
305 self.doTest(".py", files, TESTMOD, call=self.assertModuleSource) |
|
306 |
|
307 def testGetCompiledSource(self): |
|
308 pyc = make_pyc(compile(test_src, "<???>", "exec"), NOW) |
|
309 files = {TESTMOD + ".py": (NOW, test_src), |
|
310 TESTMOD + pyc_ext: (NOW, pyc)} |
|
311 self.doTest(pyc_ext, files, TESTMOD, call=self.assertModuleSource) |
|
312 |
|
313 def runDoctest(self, callback): |
|
314 files = {TESTMOD + ".py": (NOW, test_src), |
|
315 "xyz.txt": (NOW, ">>> log.append(True)\n")} |
|
316 self.doTest(".py", files, TESTMOD, call=callback) |
|
317 |
|
318 def doDoctestFile(self, module): |
|
319 log = [] |
|
320 old_master, doctest.master = doctest.master, None |
|
321 try: |
|
322 doctest.testfile( |
|
323 'xyz.txt', package=module, module_relative=True, |
|
324 globs=locals() |
|
325 ) |
|
326 finally: |
|
327 doctest.master = old_master |
|
328 self.assertEqual(log,[True]) |
|
329 |
|
330 def testDoctestFile(self): |
|
331 self.runDoctest(self.doDoctestFile) |
|
332 |
|
333 def doDoctestSuite(self, module): |
|
334 log = [] |
|
335 doctest.DocFileTest( |
|
336 'xyz.txt', package=module, module_relative=True, |
|
337 globs=locals() |
|
338 ).run() |
|
339 self.assertEqual(log,[True]) |
|
340 |
|
341 def testDoctestSuite(self): |
|
342 self.runDoctest(self.doDoctestSuite) |
|
343 |
|
344 |
|
345 def doTraceback(self, module): |
|
346 try: |
|
347 module.do_raise() |
|
348 except: |
|
349 tb = sys.exc_info()[2].tb_next |
|
350 |
|
351 f,lno,n,line = extract_tb(tb, 1)[0] |
|
352 self.assertEqual(line, raise_src.strip()) |
|
353 |
|
354 f,lno,n,line = extract_stack(tb.tb_frame, 1)[0] |
|
355 self.assertEqual(line, raise_src.strip()) |
|
356 |
|
357 s = StringIO.StringIO() |
|
358 print_tb(tb, 1, s) |
|
359 self.failUnless(s.getvalue().endswith(raise_src)) |
|
360 else: |
|
361 raise AssertionError("This ought to be impossible") |
|
362 |
|
363 def testTraceback(self): |
|
364 files = {TESTMOD + ".py": (NOW, raise_src)} |
|
365 self.doTest(None, files, TESTMOD, call=self.doTraceback) |
|
366 |
|
367 |
|
368 class CompressedZipImportTestCase(UncompressedZipImportTestCase): |
|
369 compression = ZIP_DEFLATED |
|
370 |
|
371 |
|
372 class BadFileZipImportTestCase(unittest.TestCase): |
|
373 def assertZipFailure(self, filename): |
|
374 self.assertRaises(zipimport.ZipImportError, |
|
375 zipimport.zipimporter, filename) |
|
376 |
|
377 def testNoFile(self): |
|
378 self.assertZipFailure('AdfjdkFJKDFJjdklfjs') |
|
379 |
|
380 def testEmptyFilename(self): |
|
381 self.assertZipFailure('') |
|
382 |
|
383 def testBadArgs(self): |
|
384 self.assertRaises(TypeError, zipimport.zipimporter, None) |
|
385 self.assertRaises(TypeError, zipimport.zipimporter, TESTMOD, kwd=None) |
|
386 |
|
387 def testFilenameTooLong(self): |
|
388 self.assertZipFailure('A' * 33000) |
|
389 |
|
390 def testEmptyFile(self): |
|
391 test_support.unlink(TESTMOD) |
|
392 open(TESTMOD, 'w+').close() |
|
393 self.assertZipFailure(TESTMOD) |
|
394 |
|
395 def testFileUnreadable(self): |
|
396 test_support.unlink(TESTMOD) |
|
397 fd = os.open(TESTMOD, os.O_CREAT, 000) |
|
398 try: |
|
399 os.close(fd) |
|
400 self.assertZipFailure(TESTMOD) |
|
401 finally: |
|
402 # If we leave "the read-only bit" set on Windows, nothing can |
|
403 # delete TESTMOD, and later tests suffer bogus failures. |
|
404 os.chmod(TESTMOD, 0666) |
|
405 test_support.unlink(TESTMOD) |
|
406 |
|
407 def testNotZipFile(self): |
|
408 test_support.unlink(TESTMOD) |
|
409 fp = open(TESTMOD, 'w+') |
|
410 fp.write('a' * 22) |
|
411 fp.close() |
|
412 self.assertZipFailure(TESTMOD) |
|
413 |
|
414 # XXX: disabled until this works on Big-endian machines |
|
415 def _testBogusZipFile(self): |
|
416 test_support.unlink(TESTMOD) |
|
417 fp = open(TESTMOD, 'w+') |
|
418 fp.write(struct.pack('=I', 0x06054B50)) |
|
419 fp.write('a' * 18) |
|
420 fp.close() |
|
421 z = zipimport.zipimporter(TESTMOD) |
|
422 |
|
423 try: |
|
424 self.assertRaises(TypeError, z.find_module, None) |
|
425 self.assertRaises(TypeError, z.load_module, None) |
|
426 self.assertRaises(TypeError, z.is_package, None) |
|
427 self.assertRaises(TypeError, z.get_code, None) |
|
428 self.assertRaises(TypeError, z.get_data, None) |
|
429 self.assertRaises(TypeError, z.get_source, None) |
|
430 |
|
431 error = zipimport.ZipImportError |
|
432 self.assertEqual(z.find_module('abc'), None) |
|
433 |
|
434 self.assertRaises(error, z.load_module, 'abc') |
|
435 self.assertRaises(error, z.get_code, 'abc') |
|
436 self.assertRaises(IOError, z.get_data, 'abc') |
|
437 self.assertRaises(error, z.get_source, 'abc') |
|
438 self.assertRaises(error, z.is_package, 'abc') |
|
439 finally: |
|
440 zipimport._zip_directory_cache.clear() |
|
441 |
|
442 |
|
443 def cleanup(): |
|
444 # this is necessary if test is run repeated (like when finding leaks) |
|
445 global test_imported |
|
446 if test_imported: |
|
447 zipimport._zip_directory_cache.clear() |
|
448 if hasattr(UncompressedZipImportTestCase, 'testAFakeZlib'): |
|
449 delattr(UncompressedZipImportTestCase, 'testAFakeZlib') |
|
450 if hasattr(CompressedZipImportTestCase, 'testAFakeZlib'): |
|
451 delattr(CompressedZipImportTestCase, 'testAFakeZlib') |
|
452 test_imported = True |
|
453 |
|
454 def test_main(): |
|
455 cleanup() |
|
456 try: |
|
457 test_support.run_unittest( |
|
458 UncompressedZipImportTestCase, |
|
459 CompressedZipImportTestCase, |
|
460 BadFileZipImportTestCase, |
|
461 ) |
|
462 finally: |
|
463 test_support.unlink(TESTMOD) |
|
464 |
|
465 if __name__ == "__main__": |
|
466 test_main() |