|
1 from test.test_support import TESTFN, run_unittest |
|
2 import mmap |
|
3 import unittest |
|
4 import os, re |
|
5 |
|
6 PAGESIZE = mmap.PAGESIZE |
|
7 |
|
8 class MmapTests(unittest.TestCase): |
|
9 |
|
10 def setUp(self): |
|
11 if os.path.exists(TESTFN): |
|
12 os.unlink(TESTFN) |
|
13 |
|
14 def tearDown(self): |
|
15 try: |
|
16 os.unlink(TESTFN) |
|
17 except OSError: |
|
18 pass |
|
19 |
|
20 def test_basic(self): |
|
21 # Test mmap module on Unix systems and Windows |
|
22 |
|
23 # Create a file to be mmap'ed. |
|
24 f = open(TESTFN, 'w+') |
|
25 try: |
|
26 # Write 2 pages worth of data to the file |
|
27 f.write('\0'* PAGESIZE) |
|
28 f.write('foo') |
|
29 f.write('\0'* (PAGESIZE-3) ) |
|
30 f.flush() |
|
31 m = mmap.mmap(f.fileno(), 2 * PAGESIZE) |
|
32 f.close() |
|
33 |
|
34 # Simple sanity checks |
|
35 |
|
36 tp = str(type(m)) # SF bug 128713: segfaulted on Linux |
|
37 self.assertEqual(m.find('foo'), PAGESIZE) |
|
38 |
|
39 self.assertEqual(len(m), 2*PAGESIZE) |
|
40 |
|
41 self.assertEqual(m[0], '\0') |
|
42 self.assertEqual(m[0:3], '\0\0\0') |
|
43 |
|
44 # Modify the file's content |
|
45 m[0] = '3' |
|
46 m[PAGESIZE +3: PAGESIZE +3+3] = 'bar' |
|
47 |
|
48 # Check that the modification worked |
|
49 self.assertEqual(m[0], '3') |
|
50 self.assertEqual(m[0:3], '3\0\0') |
|
51 self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], '\0foobar\0') |
|
52 |
|
53 m.flush() |
|
54 |
|
55 # Test doing a regular expression match in an mmap'ed file |
|
56 match = re.search('[A-Za-z]+', m) |
|
57 if match is None: |
|
58 self.fail('regex match on mmap failed!') |
|
59 else: |
|
60 start, end = match.span(0) |
|
61 length = end - start |
|
62 |
|
63 self.assertEqual(start, PAGESIZE) |
|
64 self.assertEqual(end, PAGESIZE + 6) |
|
65 |
|
66 # test seeking around (try to overflow the seek implementation) |
|
67 m.seek(0,0) |
|
68 self.assertEqual(m.tell(), 0) |
|
69 m.seek(42,1) |
|
70 self.assertEqual(m.tell(), 42) |
|
71 m.seek(0,2) |
|
72 self.assertEqual(m.tell(), len(m)) |
|
73 |
|
74 # Try to seek to negative position... |
|
75 self.assertRaises(ValueError, m.seek, -1) |
|
76 |
|
77 # Try to seek beyond end of mmap... |
|
78 self.assertRaises(ValueError, m.seek, 1, 2) |
|
79 |
|
80 # Try to seek to negative position... |
|
81 self.assertRaises(ValueError, m.seek, -len(m)-1, 2) |
|
82 |
|
83 # Try resizing map |
|
84 try: |
|
85 m.resize(512) |
|
86 except SystemError: |
|
87 # resize() not supported |
|
88 # No messages are printed, since the output of this test suite |
|
89 # would then be different across platforms. |
|
90 pass |
|
91 else: |
|
92 # resize() is supported |
|
93 self.assertEqual(len(m), 512) |
|
94 # Check that we can no longer seek beyond the new size. |
|
95 self.assertRaises(ValueError, m.seek, 513, 0) |
|
96 |
|
97 # Check that the underlying file is truncated too |
|
98 # (bug #728515) |
|
99 f = open(TESTFN) |
|
100 f.seek(0, 2) |
|
101 self.assertEqual(f.tell(), 512) |
|
102 f.close() |
|
103 self.assertEqual(m.size(), 512) |
|
104 |
|
105 m.close() |
|
106 |
|
107 finally: |
|
108 try: |
|
109 f.close() |
|
110 except OSError: |
|
111 pass |
|
112 |
|
113 def test_access_parameter(self): |
|
114 # Test for "access" keyword parameter |
|
115 mapsize = 10 |
|
116 open(TESTFN, "wb").write("a"*mapsize) |
|
117 f = open(TESTFN, "rb") |
|
118 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ) |
|
119 self.assertEqual(m[:], 'a'*mapsize, "Readonly memory map data incorrect.") |
|
120 |
|
121 # Ensuring that readonly mmap can't be slice assigned |
|
122 try: |
|
123 m[:] = 'b'*mapsize |
|
124 except TypeError: |
|
125 pass |
|
126 else: |
|
127 self.fail("Able to write to readonly memory map") |
|
128 |
|
129 # Ensuring that readonly mmap can't be item assigned |
|
130 try: |
|
131 m[0] = 'b' |
|
132 except TypeError: |
|
133 pass |
|
134 else: |
|
135 self.fail("Able to write to readonly memory map") |
|
136 |
|
137 # Ensuring that readonly mmap can't be write() to |
|
138 try: |
|
139 m.seek(0,0) |
|
140 m.write('abc') |
|
141 except TypeError: |
|
142 pass |
|
143 else: |
|
144 self.fail("Able to write to readonly memory map") |
|
145 |
|
146 # Ensuring that readonly mmap can't be write_byte() to |
|
147 try: |
|
148 m.seek(0,0) |
|
149 m.write_byte('d') |
|
150 except TypeError: |
|
151 pass |
|
152 else: |
|
153 self.fail("Able to write to readonly memory map") |
|
154 |
|
155 # Ensuring that readonly mmap can't be resized |
|
156 try: |
|
157 m.resize(2*mapsize) |
|
158 except SystemError: # resize is not universally supported |
|
159 pass |
|
160 except TypeError: |
|
161 pass |
|
162 else: |
|
163 self.fail("Able to resize readonly memory map") |
|
164 f.close() |
|
165 del m, f |
|
166 self.assertEqual(open(TESTFN, "rb").read(), 'a'*mapsize, |
|
167 "Readonly memory map data file was modified") |
|
168 |
|
169 # Opening mmap with size too big |
|
170 import sys |
|
171 f = open(TESTFN, "r+b") |
|
172 try: |
|
173 m = mmap.mmap(f.fileno(), mapsize+1) |
|
174 except ValueError: |
|
175 # we do not expect a ValueError on Windows |
|
176 # CAUTION: This also changes the size of the file on disk, and |
|
177 # later tests assume that the length hasn't changed. We need to |
|
178 # repair that. |
|
179 if sys.platform.startswith('win'): |
|
180 self.fail("Opening mmap with size+1 should work on Windows.") |
|
181 else: |
|
182 # we expect a ValueError on Unix, but not on Windows |
|
183 if not sys.platform.startswith('win'): |
|
184 self.fail("Opening mmap with size+1 should raise ValueError.") |
|
185 m.close() |
|
186 f.close() |
|
187 if sys.platform.startswith('win'): |
|
188 # Repair damage from the resizing test. |
|
189 f = open(TESTFN, 'r+b') |
|
190 f.truncate(mapsize) |
|
191 f.close() |
|
192 |
|
193 # Opening mmap with access=ACCESS_WRITE |
|
194 f = open(TESTFN, "r+b") |
|
195 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE) |
|
196 # Modifying write-through memory map |
|
197 m[:] = 'c'*mapsize |
|
198 self.assertEqual(m[:], 'c'*mapsize, |
|
199 "Write-through memory map memory not updated properly.") |
|
200 m.flush() |
|
201 m.close() |
|
202 f.close() |
|
203 f = open(TESTFN, 'rb') |
|
204 stuff = f.read() |
|
205 f.close() |
|
206 self.assertEqual(stuff, 'c'*mapsize, |
|
207 "Write-through memory map data file not updated properly.") |
|
208 |
|
209 # Opening mmap with access=ACCESS_COPY |
|
210 f = open(TESTFN, "r+b") |
|
211 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY) |
|
212 # Modifying copy-on-write memory map |
|
213 m[:] = 'd'*mapsize |
|
214 self.assertEqual(m[:], 'd' * mapsize, |
|
215 "Copy-on-write memory map data not written correctly.") |
|
216 m.flush() |
|
217 self.assertEqual(open(TESTFN, "rb").read(), 'c'*mapsize, |
|
218 "Copy-on-write test data file should not be modified.") |
|
219 # Ensuring copy-on-write maps cannot be resized |
|
220 self.assertRaises(TypeError, m.resize, 2*mapsize) |
|
221 f.close() |
|
222 del m, f |
|
223 |
|
224 # Ensuring invalid access parameter raises exception |
|
225 f = open(TESTFN, "r+b") |
|
226 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4) |
|
227 f.close() |
|
228 |
|
229 if os.name == "posix": |
|
230 # Try incompatible flags, prot and access parameters. |
|
231 f = open(TESTFN, "r+b") |
|
232 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, |
|
233 flags=mmap.MAP_PRIVATE, |
|
234 prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE) |
|
235 f.close() |
|
236 |
|
237 def test_bad_file_desc(self): |
|
238 # Try opening a bad file descriptor... |
|
239 self.assertRaises(mmap.error, mmap.mmap, -2, 4096) |
|
240 |
|
241 def test_tougher_find(self): |
|
242 # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2, |
|
243 # searching for data with embedded \0 bytes didn't work. |
|
244 f = open(TESTFN, 'w+') |
|
245 |
|
246 data = 'aabaac\x00deef\x00\x00aa\x00' |
|
247 n = len(data) |
|
248 f.write(data) |
|
249 f.flush() |
|
250 m = mmap.mmap(f.fileno(), n) |
|
251 f.close() |
|
252 |
|
253 for start in range(n+1): |
|
254 for finish in range(start, n+1): |
|
255 slice = data[start : finish] |
|
256 self.assertEqual(m.find(slice), data.find(slice)) |
|
257 self.assertEqual(m.find(slice + 'x'), -1) |
|
258 m.close() |
|
259 |
|
260 def test_find_end(self): |
|
261 # test the new 'end' parameter works as expected |
|
262 f = open(TESTFN, 'w+') |
|
263 data = 'one two ones' |
|
264 n = len(data) |
|
265 f.write(data) |
|
266 f.flush() |
|
267 m = mmap.mmap(f.fileno(), n) |
|
268 f.close() |
|
269 |
|
270 self.assertEqual(m.find('one'), 0) |
|
271 self.assertEqual(m.find('ones'), 8) |
|
272 self.assertEqual(m.find('one', 0, -1), 0) |
|
273 self.assertEqual(m.find('one', 1), 8) |
|
274 self.assertEqual(m.find('one', 1, -1), 8) |
|
275 self.assertEqual(m.find('one', 1, -2), -1) |
|
276 |
|
277 |
|
278 def test_rfind(self): |
|
279 # test the new 'end' parameter works as expected |
|
280 f = open(TESTFN, 'w+') |
|
281 data = 'one two ones' |
|
282 n = len(data) |
|
283 f.write(data) |
|
284 f.flush() |
|
285 m = mmap.mmap(f.fileno(), n) |
|
286 f.close() |
|
287 |
|
288 self.assertEqual(m.rfind('one'), 8) |
|
289 self.assertEqual(m.rfind('one '), 0) |
|
290 self.assertEqual(m.rfind('one', 0, -1), 8) |
|
291 self.assertEqual(m.rfind('one', 0, -2), 0) |
|
292 self.assertEqual(m.rfind('one', 1, -1), 8) |
|
293 self.assertEqual(m.rfind('one', 1, -2), -1) |
|
294 |
|
295 |
|
296 def test_double_close(self): |
|
297 # make sure a double close doesn't crash on Solaris (Bug# 665913) |
|
298 f = open(TESTFN, 'w+') |
|
299 |
|
300 f.write(2**16 * 'a') # Arbitrary character |
|
301 f.close() |
|
302 |
|
303 f = open(TESTFN) |
|
304 mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ) |
|
305 mf.close() |
|
306 mf.close() |
|
307 f.close() |
|
308 |
|
309 def test_entire_file(self): |
|
310 # test mapping of entire file by passing 0 for map length |
|
311 if hasattr(os, "stat"): |
|
312 f = open(TESTFN, "w+") |
|
313 |
|
314 f.write(2**16 * 'm') # Arbitrary character |
|
315 f.close() |
|
316 |
|
317 f = open(TESTFN, "rb+") |
|
318 mf = mmap.mmap(f.fileno(), 0) |
|
319 self.assertEqual(len(mf), 2**16, "Map size should equal file size.") |
|
320 self.assertEqual(mf.read(2**16), 2**16 * "m") |
|
321 mf.close() |
|
322 f.close() |
|
323 |
|
324 def test_move(self): |
|
325 # make move works everywhere (64-bit format problem earlier) |
|
326 f = open(TESTFN, 'w+') |
|
327 |
|
328 f.write("ABCDEabcde") # Arbitrary character |
|
329 f.flush() |
|
330 |
|
331 mf = mmap.mmap(f.fileno(), 10) |
|
332 mf.move(5, 0, 5) |
|
333 self.assertEqual(mf[:], "ABCDEABCDE", "Map move should have duplicated front 5") |
|
334 mf.close() |
|
335 f.close() |
|
336 |
|
337 def test_anonymous(self): |
|
338 # anonymous mmap.mmap(-1, PAGE) |
|
339 m = mmap.mmap(-1, PAGESIZE) |
|
340 for x in xrange(PAGESIZE): |
|
341 self.assertEqual(m[x], '\0', "anonymously mmap'ed contents should be zero") |
|
342 |
|
343 for x in xrange(PAGESIZE): |
|
344 m[x] = ch = chr(x & 255) |
|
345 self.assertEqual(m[x], ch) |
|
346 |
|
347 def test_extended_getslice(self): |
|
348 # Test extended slicing by comparing with list slicing. |
|
349 s = "".join(chr(c) for c in reversed(range(256))) |
|
350 m = mmap.mmap(-1, len(s)) |
|
351 m[:] = s |
|
352 self.assertEqual(m[:], s) |
|
353 indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300) |
|
354 for start in indices: |
|
355 for stop in indices: |
|
356 # Skip step 0 (invalid) |
|
357 for step in indices[1:]: |
|
358 self.assertEqual(m[start:stop:step], |
|
359 s[start:stop:step]) |
|
360 |
|
361 def test_extended_set_del_slice(self): |
|
362 # Test extended slicing by comparing with list slicing. |
|
363 s = "".join(chr(c) for c in reversed(range(256))) |
|
364 m = mmap.mmap(-1, len(s)) |
|
365 indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300) |
|
366 for start in indices: |
|
367 for stop in indices: |
|
368 # Skip invalid step 0 |
|
369 for step in indices[1:]: |
|
370 m[:] = s |
|
371 self.assertEqual(m[:], s) |
|
372 L = list(s) |
|
373 # Make sure we have a slice of exactly the right length, |
|
374 # but with different data. |
|
375 data = L[start:stop:step] |
|
376 data = "".join(reversed(data)) |
|
377 L[start:stop:step] = data |
|
378 m[start:stop:step] = data |
|
379 self.assertEquals(m[:], "".join(L)) |
|
380 |
|
381 def make_mmap_file (self, f, halfsize): |
|
382 # Write 2 pages worth of data to the file |
|
383 f.write ('\0' * halfsize) |
|
384 f.write ('foo') |
|
385 f.write ('\0' * (halfsize - 3)) |
|
386 f.flush () |
|
387 return mmap.mmap (f.fileno(), 0) |
|
388 |
|
389 def test_offset (self): |
|
390 f = open (TESTFN, 'w+b') |
|
391 |
|
392 try: # unlink TESTFN no matter what |
|
393 halfsize = mmap.ALLOCATIONGRANULARITY |
|
394 m = self.make_mmap_file (f, halfsize) |
|
395 m.close () |
|
396 f.close () |
|
397 |
|
398 mapsize = halfsize * 2 |
|
399 # Try invalid offset |
|
400 f = open(TESTFN, "r+b") |
|
401 for offset in [-2, -1, None]: |
|
402 try: |
|
403 m = mmap.mmap(f.fileno(), mapsize, offset=offset) |
|
404 self.assertEqual(0, 1) |
|
405 except (ValueError, TypeError, OverflowError): |
|
406 pass |
|
407 else: |
|
408 self.assertEqual(0, 0) |
|
409 f.close() |
|
410 |
|
411 # Try valid offset, hopefully 8192 works on all OSes |
|
412 f = open(TESTFN, "r+b") |
|
413 m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize) |
|
414 self.assertEqual(m[0:3], 'foo') |
|
415 f.close() |
|
416 m.close() |
|
417 |
|
418 finally: |
|
419 f.close() |
|
420 try: |
|
421 os.unlink(TESTFN) |
|
422 except OSError: |
|
423 pass |
|
424 |
|
425 def test_subclass(self): |
|
426 class anon_mmap(mmap.mmap): |
|
427 def __new__(klass, *args, **kwargs): |
|
428 return mmap.mmap.__new__(klass, -1, *args, **kwargs) |
|
429 anon_mmap(PAGESIZE) |
|
430 |
|
431 def test_prot_readonly(self): |
|
432 if not hasattr(mmap, 'PROT_READ'): |
|
433 return |
|
434 mapsize = 10 |
|
435 open(TESTFN, "wb").write("a"*mapsize) |
|
436 f = open(TESTFN, "rb") |
|
437 m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ) |
|
438 self.assertRaises(TypeError, m.write, "foo") |
|
439 f.close() |
|
440 |
|
441 def test_error(self): |
|
442 self.assert_(issubclass(mmap.error, EnvironmentError)) |
|
443 self.assert_("mmap.error" in str(mmap.error)) |
|
444 |
|
445 |
|
446 def test_main(): |
|
447 run_unittest(MmapTests) |
|
448 |
|
449 if __name__ == '__main__': |
|
450 test_main() |