diff -r 000000000000 -r ae805ac0140d python-2.5.2/win32/Lib/test/test_zlib.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/python-2.5.2/win32/Lib/test/test_zlib.py Fri Apr 03 17:19:34 2009 +0100 @@ -0,0 +1,481 @@ +import unittest +from test import test_support +import zlib +import random + +# print test_support.TESTFN + +def getbuf(): + # This was in the original. Avoid non-repeatable sources. + # Left here (unused) in case something wants to be done with it. + import imp + try: + t = imp.find_module('test_zlib') + file = t[0] + except ImportError: + file = open(__file__) + buf = file.read() * 8 + file.close() + return buf + + + +class ChecksumTestCase(unittest.TestCase): + # checksum test cases + def test_crc32start(self): + self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) + self.assert_(zlib.crc32("abc", 0xffffffff)) + + def test_crc32empty(self): + self.assertEqual(zlib.crc32("", 0), 0) + self.assertEqual(zlib.crc32("", 1), 1) + self.assertEqual(zlib.crc32("", 432), 432) + + def test_adler32start(self): + self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) + self.assert_(zlib.adler32("abc", 0xffffffff)) + + def test_adler32empty(self): + self.assertEqual(zlib.adler32("", 0), 0) + self.assertEqual(zlib.adler32("", 1), 1) + self.assertEqual(zlib.adler32("", 432), 432) + + def assertEqual32(self, seen, expected): + # 32-bit values masked -- checksums on 32- vs 64- bit machines + # This is important if bit 31 (0x08000000L) is set. + self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) + + def test_penguins(self): + self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) + self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) + self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) + self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) + + self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) + self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) + + + +class ExceptionTestCase(unittest.TestCase): + # make sure we generate some expected errors + def test_bigbits(self): + # specifying total bits too large causes an error + self.assertRaises(zlib.error, + zlib.compress, 'ERROR', zlib.MAX_WBITS + 1) + + def test_badcompressobj(self): + # verify failure on building compress object with bad params + self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) + + def test_baddecompressobj(self): + # verify failure on building decompress object with bad params + self.assertRaises(ValueError, zlib.decompressobj, 0) + + + +class CompressTestCase(unittest.TestCase): + # Test compression in one go (whole message compression) + def test_speech(self): + x = zlib.compress(HAMLET_SCENE) + self.assertEqual(zlib.decompress(x), HAMLET_SCENE) + + def test_speech128(self): + # compress more data + data = HAMLET_SCENE * 128 + x = zlib.compress(data) + self.assertEqual(zlib.decompress(x), data) + + + + +class CompressObjectTestCase(unittest.TestCase): + # Test compression object + def test_pair(self): + # straightforward compress/decompress objects + data = HAMLET_SCENE * 128 + co = zlib.compressobj() + x1 = co.compress(data) + x2 = co.flush() + self.assertRaises(zlib.error, co.flush) # second flush should not work + dco = zlib.decompressobj() + y1 = dco.decompress(x1 + x2) + y2 = dco.flush() + self.assertEqual(data, y1 + y2) + + def test_compressoptions(self): + # specify lots of options to compressobj() + level = 2 + method = zlib.DEFLATED + wbits = -12 + memlevel = 9 + strategy = zlib.Z_FILTERED + co = zlib.compressobj(level, method, wbits, memlevel, strategy) + x1 = co.compress(HAMLET_SCENE) + x2 = co.flush() + dco = zlib.decompressobj(wbits) + y1 = dco.decompress(x1 + x2) + y2 = dco.flush() + self.assertEqual(HAMLET_SCENE, y1 + y2) + + def test_compressincremental(self): + # compress object in steps, decompress object as one-shot + data = HAMLET_SCENE * 128 + co = zlib.compressobj() + bufs = [] + for i in range(0, len(data), 256): + bufs.append(co.compress(data[i:i+256])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + + dco = zlib.decompressobj() + y1 = dco.decompress(''.join(bufs)) + y2 = dco.flush() + self.assertEqual(data, y1 + y2) + + def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): + # compress object in steps, decompress object in steps + source = source or HAMLET_SCENE + data = source * 128 + co = zlib.compressobj() + bufs = [] + for i in range(0, len(data), cx): + bufs.append(co.compress(data[i:i+cx])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + + self.assertEqual(data, zlib.decompress(combuf)) + + dco = zlib.decompressobj() + bufs = [] + for i in range(0, len(combuf), dcx): + bufs.append(dco.decompress(combuf[i:i+dcx])) + self.assertEqual('', dco.unconsumed_tail, ######## + "(A) uct should be '': not %d long" % + len(dco.unconsumed_tail)) + if flush: + bufs.append(dco.flush()) + else: + while True: + chunk = dco.decompress('') + if chunk: + bufs.append(chunk) + else: + break + self.assertEqual('', dco.unconsumed_tail, ######## + "(B) uct should be '': not %d long" % + len(dco.unconsumed_tail)) + self.assertEqual(data, ''.join(bufs)) + # Failure means: "decompressobj with init options failed" + + def test_decompincflush(self): + self.test_decompinc(flush=True) + + def test_decompimax(self, source=None, cx=256, dcx=64): + # compress in steps, decompress in length-restricted steps + source = source or HAMLET_SCENE + # Check a decompression object with max_length specified + data = source * 128 + co = zlib.compressobj() + bufs = [] + for i in range(0, len(data), cx): + bufs.append(co.compress(data[i:i+cx])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + self.assertEqual(data, zlib.decompress(combuf), + 'compressed data failure') + + dco = zlib.decompressobj() + bufs = [] + cb = combuf + while cb: + #max_length = 1 + len(cb)//10 + chunk = dco.decompress(cb, dcx) + self.failIf(len(chunk) > dcx, + 'chunk too big (%d>%d)' % (len(chunk), dcx)) + bufs.append(chunk) + cb = dco.unconsumed_tail + bufs.append(dco.flush()) + self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + + def test_decompressmaxlen(self, flush=False): + # Check a decompression object with max_length specified + data = HAMLET_SCENE * 128 + co = zlib.compressobj() + bufs = [] + for i in range(0, len(data), 256): + bufs.append(co.compress(data[i:i+256])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + self.assertEqual(data, zlib.decompress(combuf), + 'compressed data failure') + + dco = zlib.decompressobj() + bufs = [] + cb = combuf + while cb: + max_length = 1 + len(cb)//10 + chunk = dco.decompress(cb, max_length) + self.failIf(len(chunk) > max_length, + 'chunk too big (%d>%d)' % (len(chunk),max_length)) + bufs.append(chunk) + cb = dco.unconsumed_tail + if flush: + bufs.append(dco.flush()) + else: + while chunk: + chunk = dco.decompress('', max_length) + self.failIf(len(chunk) > max_length, + 'chunk too big (%d>%d)' % (len(chunk),max_length)) + bufs.append(chunk) + self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + + def test_decompressmaxlenflush(self): + self.test_decompressmaxlen(flush=True) + + def test_maxlenmisc(self): + # Misc tests of max_length + dco = zlib.decompressobj() + self.assertRaises(ValueError, dco.decompress, "", -1) + self.assertEqual('', dco.unconsumed_tail) + + def test_flushes(self): + # Test flush() with the various options, using all the + # different levels in order to provide more variations. + sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] + sync_opt = [getattr(zlib, opt) for opt in sync_opt + if hasattr(zlib, opt)] + data = HAMLET_SCENE * 8 + + for sync in sync_opt: + for level in range(10): + obj = zlib.compressobj( level ) + a = obj.compress( data[:3000] ) + b = obj.flush( sync ) + c = obj.compress( data[3000:] ) + d = obj.flush() + self.assertEqual(zlib.decompress(''.join([a,b,c,d])), + data, ("Decompress failed: flush " + "mode=%i, level=%i") % (sync, level)) + del obj + + def test_odd_flush(self): + # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 + import random + + if hasattr(zlib, 'Z_SYNC_FLUSH'): + # Testing on 17K of "random" data + + # Create compressor and decompressor objects + co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) + dco = zlib.decompressobj() + + # Try 17K of data + # generate random data stream + try: + # In 2.3 and later, WichmannHill is the RNG of the bug report + gen = random.WichmannHill() + except AttributeError: + try: + # 2.2 called it Random + gen = random.Random() + except AttributeError: + # others might simply have a single RNG + gen = random + gen.seed(1) + data = genblock(1, 17 * 1024, generator=gen) + + # compress, sync-flush, and decompress + first = co.compress(data) + second = co.flush(zlib.Z_SYNC_FLUSH) + expanded = dco.decompress(first + second) + + # if decompressed data is different from the input data, choke. + self.assertEqual(expanded, data, "17K random source doesn't match") + + def test_empty_flush(self): + # Test that calling .flush() on unused objects works. + # (Bug #1083110 -- calling .flush() on decompress objects + # caused a core dump.) + + co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) + self.failUnless(co.flush()) # Returns a zlib header + dco = zlib.decompressobj() + self.assertEqual(dco.flush(), "") # Returns nothing + + if hasattr(zlib.compressobj(), "copy"): + def test_compresscopy(self): + # Test copying a compression object + data0 = HAMLET_SCENE + data1 = HAMLET_SCENE.swapcase() + c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) + bufs0 = [] + bufs0.append(c0.compress(data0)) + + c1 = c0.copy() + bufs1 = bufs0[:] + + bufs0.append(c0.compress(data0)) + bufs0.append(c0.flush()) + s0 = ''.join(bufs0) + + bufs1.append(c1.compress(data1)) + bufs1.append(c1.flush()) + s1 = ''.join(bufs1) + + self.assertEqual(zlib.decompress(s0),data0+data0) + self.assertEqual(zlib.decompress(s1),data0+data1) + + def test_badcompresscopy(self): + # Test copying a compression object in an inconsistent state + c = zlib.compressobj() + c.compress(HAMLET_SCENE) + c.flush() + self.assertRaises(ValueError, c.copy) + + if hasattr(zlib.decompressobj(), "copy"): + def test_decompresscopy(self): + # Test copying a decompression object + data = HAMLET_SCENE + comp = zlib.compress(data) + + d0 = zlib.decompressobj() + bufs0 = [] + bufs0.append(d0.decompress(comp[:32])) + + d1 = d0.copy() + bufs1 = bufs0[:] + + bufs0.append(d0.decompress(comp[32:])) + s0 = ''.join(bufs0) + + bufs1.append(d1.decompress(comp[32:])) + s1 = ''.join(bufs1) + + self.assertEqual(s0,s1) + self.assertEqual(s0,data) + + def test_baddecompresscopy(self): + # Test copying a compression object in an inconsistent state + data = zlib.compress(HAMLET_SCENE) + d = zlib.decompressobj() + d.decompress(data) + d.flush() + self.assertRaises(ValueError, d.copy) + +def genblock(seed, length, step=1024, generator=random): + """length-byte stream of random data from a seed (in step-byte blocks).""" + if seed is not None: + generator.seed(seed) + randint = generator.randint + if length < step or step < 2: + step = length + blocks = [] + for i in range(0, length, step): + blocks.append(''.join([chr(randint(0,255)) + for x in range(step)])) + return ''.join(blocks)[:length] + + + +def choose_lines(source, number, seed=None, generator=random): + """Return a list of number lines randomly chosen from the source""" + if seed is not None: + generator.seed(seed) + sources = source.split('\n') + return [generator.choice(sources) for n in range(number)] + + + +HAMLET_SCENE = """ +LAERTES + + O, fear me not. + I stay too long: but here my father comes. + + Enter POLONIUS + + A double blessing is a double grace, + Occasion smiles upon a second leave. + +LORD POLONIUS + + Yet here, Laertes! aboard, aboard, for shame! + The wind sits in the shoulder of your sail, + And you are stay'd for. There; my blessing with thee! + And these few precepts in thy memory + See thou character. Give thy thoughts no tongue, + Nor any unproportioned thought his act. + Be thou familiar, but by no means vulgar. + Those friends thou hast, and their adoption tried, + Grapple them to thy soul with hoops of steel; + But do not dull thy palm with entertainment + Of each new-hatch'd, unfledged comrade. Beware + Of entrance to a quarrel, but being in, + Bear't that the opposed may beware of thee. + Give every man thy ear, but few thy voice; + Take each man's censure, but reserve thy judgment. + Costly thy habit as thy purse can buy, + But not express'd in fancy; rich, not gaudy; + For the apparel oft proclaims the man, + And they in France of the best rank and station + Are of a most select and generous chief in that. + Neither a borrower nor a lender be; + For loan oft loses both itself and friend, + And borrowing dulls the edge of husbandry. + This above all: to thine ownself be true, + And it must follow, as the night the day, + Thou canst not then be false to any man. + Farewell: my blessing season this in thee! + +LAERTES + + Most humbly do I take my leave, my lord. + +LORD POLONIUS + + The time invites you; go; your servants tend. + +LAERTES + + Farewell, Ophelia; and remember well + What I have said to you. + +OPHELIA + + 'Tis in my memory lock'd, + And you yourself shall keep the key of it. + +LAERTES + + Farewell. +""" + + +def test_main(): + test_support.run_unittest( + ChecksumTestCase, + ExceptionTestCase, + CompressTestCase, + CompressObjectTestCase + ) + +if __name__ == "__main__": + test_main() + +def test(tests=''): + if not tests: tests = 'o' + testcases = [] + if 'k' in tests: testcases.append(ChecksumTestCase) + if 'x' in tests: testcases.append(ExceptionTestCase) + if 'c' in tests: testcases.append(CompressTestCase) + if 'o' in tests: testcases.append(CompressObjectTestCase) + test_support.run_unittest(*testcases) + +if False: + import sys + sys.path.insert(1, '/Py23Src/python/dist/src/Lib/test') + import test_zlib as tz + ts, ut = tz.test_support, tz.unittest + su = ut.TestSuite() + su.addTest(ut.makeSuite(tz.CompressTestCase)) + ts.run_suite(su)