symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_io.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """Unit tests for io.py."""
       
     2 from __future__ import print_function
       
     3 from __future__ import unicode_literals
       
     4 
       
     5 import os
       
     6 import sys
       
     7 import time
       
     8 import array
       
     9 import threading
       
    10 import random
       
    11 import unittest
       
    12 from itertools import chain, cycle
       
    13 from test import test_support
       
    14 
       
    15 import codecs
       
    16 import io  # The module under test
       
    17 
       
    18 
       
    19 class MockRawIO(io.RawIOBase):
       
    20 
       
    21     def __init__(self, read_stack=()):
       
    22         self._read_stack = list(read_stack)
       
    23         self._write_stack = []
       
    24 
       
    25     def read(self, n=None):
       
    26         try:
       
    27             return self._read_stack.pop(0)
       
    28         except:
       
    29             return b""
       
    30 
       
    31     def write(self, b):
       
    32         self._write_stack.append(b[:])
       
    33         return len(b)
       
    34 
       
    35     def writable(self):
       
    36         return True
       
    37 
       
    38     def fileno(self):
       
    39         return 42
       
    40 
       
    41     def readable(self):
       
    42         return True
       
    43 
       
    44     def seekable(self):
       
    45         return True
       
    46 
       
    47     def seek(self, pos, whence):
       
    48         pass
       
    49 
       
    50     def tell(self):
       
    51         return 42
       
    52 
       
    53 
       
    54 class MockFileIO(io.BytesIO):
       
    55 
       
    56     def __init__(self, data):
       
    57         self.read_history = []
       
    58         io.BytesIO.__init__(self, data)
       
    59 
       
    60     def read(self, n=None):
       
    61         res = io.BytesIO.read(self, n)
       
    62         self.read_history.append(None if res is None else len(res))
       
    63         return res
       
    64 
       
    65 
       
    66 class MockNonBlockWriterIO(io.RawIOBase):
       
    67 
       
    68     def __init__(self, blocking_script):
       
    69         self._blocking_script = list(blocking_script)
       
    70         self._write_stack = []
       
    71 
       
    72     def write(self, b):
       
    73         self._write_stack.append(b[:])
       
    74         n = self._blocking_script.pop(0)
       
    75         if (n < 0):
       
    76             raise io.BlockingIOError(0, "test blocking", -n)
       
    77         else:
       
    78             return n
       
    79 
       
    80     def writable(self):
       
    81         return True
       
    82 
       
    83 
       
    84 class IOTest(unittest.TestCase):
       
    85 
       
    86     def tearDown(self):
       
    87         test_support.unlink(test_support.TESTFN)
       
    88 
       
    89     def write_ops(self, f):
       
    90         self.assertEqual(f.write(b"blah."), 5)
       
    91         self.assertEqual(f.seek(0), 0)
       
    92         self.assertEqual(f.write(b"Hello."), 6)
       
    93         self.assertEqual(f.tell(), 6)
       
    94         self.assertEqual(f.seek(-1, 1), 5)
       
    95         self.assertEqual(f.tell(), 5)
       
    96         self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
       
    97         self.assertEqual(f.seek(0), 0)
       
    98         self.assertEqual(f.write(b"h"), 1)
       
    99         self.assertEqual(f.seek(-1, 2), 13)
       
   100         self.assertEqual(f.tell(), 13)
       
   101         self.assertEqual(f.truncate(12), 12)
       
   102         self.assertEqual(f.tell(), 12)
       
   103         self.assertRaises(TypeError, f.seek, 0.0)
       
   104 
       
   105     def read_ops(self, f, buffered=False):
       
   106         data = f.read(5)
       
   107         self.assertEqual(data, b"hello")
       
   108         data = bytearray(data)
       
   109         self.assertEqual(f.readinto(data), 5)
       
   110         self.assertEqual(data, b" worl")
       
   111         self.assertEqual(f.readinto(data), 2)
       
   112         self.assertEqual(len(data), 5)
       
   113         self.assertEqual(data[:2], b"d\n")
       
   114         self.assertEqual(f.seek(0), 0)
       
   115         self.assertEqual(f.read(20), b"hello world\n")
       
   116         self.assertEqual(f.read(1), b"")
       
   117         self.assertEqual(f.readinto(bytearray(b"x")), 0)
       
   118         self.assertEqual(f.seek(-6, 2), 6)
       
   119         self.assertEqual(f.read(5), b"world")
       
   120         self.assertEqual(f.read(0), b"")
       
   121         self.assertEqual(f.readinto(bytearray()), 0)
       
   122         self.assertEqual(f.seek(-6, 1), 5)
       
   123         self.assertEqual(f.read(5), b" worl")
       
   124         self.assertEqual(f.tell(), 10)
       
   125         self.assertRaises(TypeError, f.seek, 0.0)
       
   126         if buffered:
       
   127             f.seek(0)
       
   128             self.assertEqual(f.read(), b"hello world\n")
       
   129             f.seek(6)
       
   130             self.assertEqual(f.read(), b"world\n")
       
   131             self.assertEqual(f.read(), b"")
       
   132 
       
   133     LARGE = 2**31
       
   134 
       
   135     def large_file_ops(self, f):
       
   136         assert f.readable()
       
   137         assert f.writable()
       
   138         self.assertEqual(f.seek(self.LARGE), self.LARGE)
       
   139         self.assertEqual(f.tell(), self.LARGE)
       
   140         self.assertEqual(f.write(b"xxx"), 3)
       
   141         self.assertEqual(f.tell(), self.LARGE + 3)
       
   142         self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
       
   143         self.assertEqual(f.truncate(), self.LARGE + 2)
       
   144         self.assertEqual(f.tell(), self.LARGE + 2)
       
   145         self.assertEqual(f.seek(0, 2), self.LARGE + 2)
       
   146         self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
       
   147         self.assertEqual(f.tell(), self.LARGE + 1)
       
   148         self.assertEqual(f.seek(0, 2), self.LARGE + 1)
       
   149         self.assertEqual(f.seek(-1, 2), self.LARGE)
       
   150         self.assertEqual(f.read(2), b"x")
       
   151 
       
   152     def test_raw_file_io(self):
       
   153         f = io.open(test_support.TESTFN, "wb", buffering=0)
       
   154         self.assertEqual(f.readable(), False)
       
   155         self.assertEqual(f.writable(), True)
       
   156         self.assertEqual(f.seekable(), True)
       
   157         self.write_ops(f)
       
   158         f.close()
       
   159         f = io.open(test_support.TESTFN, "rb", buffering=0)
       
   160         self.assertEqual(f.readable(), True)
       
   161         self.assertEqual(f.writable(), False)
       
   162         self.assertEqual(f.seekable(), True)
       
   163         self.read_ops(f)
       
   164         f.close()
       
   165 
       
   166     def test_buffered_file_io(self):
       
   167         f = io.open(test_support.TESTFN, "wb")
       
   168         self.assertEqual(f.readable(), False)
       
   169         self.assertEqual(f.writable(), True)
       
   170         self.assertEqual(f.seekable(), True)
       
   171         self.write_ops(f)
       
   172         f.close()
       
   173         f = io.open(test_support.TESTFN, "rb")
       
   174         self.assertEqual(f.readable(), True)
       
   175         self.assertEqual(f.writable(), False)
       
   176         self.assertEqual(f.seekable(), True)
       
   177         self.read_ops(f, True)
       
   178         f.close()
       
   179 
       
   180     def test_readline(self):
       
   181         f = io.open(test_support.TESTFN, "wb")
       
   182         f.write(b"abc\ndef\nxyzzy\nfoo")
       
   183         f.close()
       
   184         f = io.open(test_support.TESTFN, "rb")
       
   185         self.assertEqual(f.readline(), b"abc\n")
       
   186         self.assertEqual(f.readline(10), b"def\n")
       
   187         self.assertEqual(f.readline(2), b"xy")
       
   188         self.assertEqual(f.readline(4), b"zzy\n")
       
   189         self.assertEqual(f.readline(), b"foo")
       
   190         f.close()
       
   191 
       
   192     def test_raw_bytes_io(self):
       
   193         f = io.BytesIO()
       
   194         self.write_ops(f)
       
   195         data = f.getvalue()
       
   196         self.assertEqual(data, b"hello world\n")
       
   197         f = io.BytesIO(data)
       
   198         self.read_ops(f, True)
       
   199 
       
   200     def test_large_file_ops(self):
       
   201         # On Windows and Mac OSX this test comsumes large resources; It takes
       
   202         # a long time to build the >2GB file and takes >2GB of disk space
       
   203         # therefore the resource must be enabled to run this test.
       
   204         if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
       
   205             if not test_support.is_resource_enabled("largefile"):
       
   206                 print("\nTesting large file ops skipped on %s." % sys.platform,
       
   207                       file=sys.stderr)
       
   208                 print("It requires %d bytes and a long time." % self.LARGE,
       
   209                       file=sys.stderr)
       
   210                 print("Use 'regrtest.py -u largefile test_io' to run it.",
       
   211                       file=sys.stderr)
       
   212                 return
       
   213         f = io.open(test_support.TESTFN, "w+b", 0)
       
   214         self.large_file_ops(f)
       
   215         f.close()
       
   216         f = io.open(test_support.TESTFN, "w+b")
       
   217         self.large_file_ops(f)
       
   218         f.close()
       
   219 
       
   220     def test_with_open(self):
       
   221         for bufsize in (0, 1, 100):
       
   222             f = None
       
   223             with open(test_support.TESTFN, "wb", bufsize) as f:
       
   224                 f.write(b"xxx")
       
   225             self.assertEqual(f.closed, True)
       
   226             f = None
       
   227             try:
       
   228                 with open(test_support.TESTFN, "wb", bufsize) as f:
       
   229                     1/0
       
   230             except ZeroDivisionError:
       
   231                 self.assertEqual(f.closed, True)
       
   232             else:
       
   233                 self.fail("1/0 didn't raise an exception")
       
   234 
       
   235     def test_destructor(self):
       
   236         record = []
       
   237         class MyFileIO(io.FileIO):
       
   238             def __del__(self):
       
   239                 record.append(1)
       
   240                 io.FileIO.__del__(self)
       
   241             def close(self):
       
   242                 record.append(2)
       
   243                 io.FileIO.close(self)
       
   244             def flush(self):
       
   245                 record.append(3)
       
   246                 io.FileIO.flush(self)
       
   247         f = MyFileIO(test_support.TESTFN, "w")
       
   248         f.write("xxx")
       
   249         del f
       
   250         self.assertEqual(record, [1, 2, 3])
       
   251 
       
   252     def test_close_flushes(self):
       
   253         f = io.open(test_support.TESTFN, "wb")
       
   254         f.write(b"xxx")
       
   255         f.close()
       
   256         f = io.open(test_support.TESTFN, "rb")
       
   257         self.assertEqual(f.read(), b"xxx")
       
   258         f.close()
       
   259 
       
   260     def XXXtest_array_writes(self):
       
   261         # XXX memory view not available yet
       
   262         a = array.array('i', range(10))
       
   263         n = len(memoryview(a))
       
   264         f = io.open(test_support.TESTFN, "wb", 0)
       
   265         self.assertEqual(f.write(a), n)
       
   266         f.close()
       
   267         f = io.open(test_support.TESTFN, "wb")
       
   268         self.assertEqual(f.write(a), n)
       
   269         f.close()
       
   270 
       
   271     def test_closefd(self):
       
   272         self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
       
   273                           closefd=False)
       
   274 
       
   275 class MemorySeekTestMixin:
       
   276 
       
   277     def testInit(self):
       
   278         buf = self.buftype("1234567890")
       
   279         bytesIo = self.ioclass(buf)
       
   280 
       
   281     def testRead(self):
       
   282         buf = self.buftype("1234567890")
       
   283         bytesIo = self.ioclass(buf)
       
   284 
       
   285         self.assertEquals(buf[:1], bytesIo.read(1))
       
   286         self.assertEquals(buf[1:5], bytesIo.read(4))
       
   287         self.assertEquals(buf[5:], bytesIo.read(900))
       
   288         self.assertEquals(self.EOF, bytesIo.read())
       
   289 
       
   290     def testReadNoArgs(self):
       
   291         buf = self.buftype("1234567890")
       
   292         bytesIo = self.ioclass(buf)
       
   293 
       
   294         self.assertEquals(buf, bytesIo.read())
       
   295         self.assertEquals(self.EOF, bytesIo.read())
       
   296 
       
   297     def testSeek(self):
       
   298         buf = self.buftype("1234567890")
       
   299         bytesIo = self.ioclass(buf)
       
   300 
       
   301         bytesIo.read(5)
       
   302         bytesIo.seek(0)
       
   303         self.assertEquals(buf, bytesIo.read())
       
   304 
       
   305         bytesIo.seek(3)
       
   306         self.assertEquals(buf[3:], bytesIo.read())
       
   307         self.assertRaises(TypeError, bytesIo.seek, 0.0)
       
   308 
       
   309     def testTell(self):
       
   310         buf = self.buftype("1234567890")
       
   311         bytesIo = self.ioclass(buf)
       
   312 
       
   313         self.assertEquals(0, bytesIo.tell())
       
   314         bytesIo.seek(5)
       
   315         self.assertEquals(5, bytesIo.tell())
       
   316         bytesIo.seek(10000)
       
   317         self.assertEquals(10000, bytesIo.tell())
       
   318 
       
   319 
       
   320 class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
       
   321     @staticmethod
       
   322     def buftype(s):
       
   323         return s.encode("utf-8")
       
   324     ioclass = io.BytesIO
       
   325     EOF = b""
       
   326 
       
   327 
       
   328 class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
       
   329     buftype = str
       
   330     ioclass = io.StringIO
       
   331     EOF = ""
       
   332 
       
   333 
       
   334 class BufferedReaderTest(unittest.TestCase):
       
   335 
       
   336     def testRead(self):
       
   337         rawio = MockRawIO((b"abc", b"d", b"efg"))
       
   338         bufio = io.BufferedReader(rawio)
       
   339 
       
   340         self.assertEquals(b"abcdef", bufio.read(6))
       
   341 
       
   342     def testBuffering(self):
       
   343         data = b"abcdefghi"
       
   344         dlen = len(data)
       
   345 
       
   346         tests = [
       
   347             [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
       
   348             [ 100, [ 3, 3, 3],     [ dlen ]    ],
       
   349             [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
       
   350         ]
       
   351 
       
   352         for bufsize, buf_read_sizes, raw_read_sizes in tests:
       
   353             rawio = MockFileIO(data)
       
   354             bufio = io.BufferedReader(rawio, buffer_size=bufsize)
       
   355             pos = 0
       
   356             for nbytes in buf_read_sizes:
       
   357                 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
       
   358                 pos += nbytes
       
   359             self.assertEquals(rawio.read_history, raw_read_sizes)
       
   360 
       
   361     def testReadNonBlocking(self):
       
   362         # Inject some None's in there to simulate EWOULDBLOCK
       
   363         rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
       
   364         bufio = io.BufferedReader(rawio)
       
   365 
       
   366         self.assertEquals(b"abcd", bufio.read(6))
       
   367         self.assertEquals(b"e", bufio.read(1))
       
   368         self.assertEquals(b"fg", bufio.read())
       
   369         self.assert_(None is bufio.read())
       
   370         self.assertEquals(b"", bufio.read())
       
   371 
       
   372     def testReadToEof(self):
       
   373         rawio = MockRawIO((b"abc", b"d", b"efg"))
       
   374         bufio = io.BufferedReader(rawio)
       
   375 
       
   376         self.assertEquals(b"abcdefg", bufio.read(9000))
       
   377 
       
   378     def testReadNoArgs(self):
       
   379         rawio = MockRawIO((b"abc", b"d", b"efg"))
       
   380         bufio = io.BufferedReader(rawio)
       
   381 
       
   382         self.assertEquals(b"abcdefg", bufio.read())
       
   383 
       
   384     def testFileno(self):
       
   385         rawio = MockRawIO((b"abc", b"d", b"efg"))
       
   386         bufio = io.BufferedReader(rawio)
       
   387 
       
   388         self.assertEquals(42, bufio.fileno())
       
   389 
       
   390     def testFilenoNoFileno(self):
       
   391         # XXX will we always have fileno() function? If so, kill
       
   392         # this test. Else, write it.
       
   393         pass
       
   394 
       
   395     def testThreads(self):
       
   396         try:
       
   397             # Write out many bytes with exactly the same number of 0's,
       
   398             # 1's... 255's. This will help us check that concurrent reading
       
   399             # doesn't duplicate or forget contents.
       
   400             N = 1000
       
   401             l = range(256) * N
       
   402             random.shuffle(l)
       
   403             s = bytes(bytearray(l))
       
   404             with io.open(test_support.TESTFN, "wb") as f:
       
   405                 f.write(s)
       
   406             with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
       
   407                 bufio = io.BufferedReader(raw, 8)
       
   408                 errors = []
       
   409                 results = []
       
   410                 def f():
       
   411                     try:
       
   412                         # Intra-buffer read then buffer-flushing read
       
   413                         for n in cycle([1, 19]):
       
   414                             s = bufio.read(n)
       
   415                             if not s:
       
   416                                 break
       
   417                             # list.append() is atomic
       
   418                             results.append(s)
       
   419                     except Exception as e:
       
   420                         errors.append(e)
       
   421                         raise
       
   422                 threads = [threading.Thread(target=f) for x in range(20)]
       
   423                 for t in threads:
       
   424                     t.start()
       
   425                 time.sleep(0.02) # yield
       
   426                 for t in threads:
       
   427                     t.join()
       
   428                 self.assertFalse(errors,
       
   429                     "the following exceptions were caught: %r" % errors)
       
   430                 s = b''.join(results)
       
   431                 for i in range(256):
       
   432                     c = bytes(bytearray([i]))
       
   433                     self.assertEqual(s.count(c), N)
       
   434         finally:
       
   435             test_support.unlink(test_support.TESTFN)
       
   436 
       
   437 
       
   438 
       
   439 class BufferedWriterTest(unittest.TestCase):
       
   440 
       
   441     def testWrite(self):
       
   442         # Write to the buffered IO but don't overflow the buffer.
       
   443         writer = MockRawIO()
       
   444         bufio = io.BufferedWriter(writer, 8)
       
   445 
       
   446         bufio.write(b"abc")
       
   447 
       
   448         self.assertFalse(writer._write_stack)
       
   449 
       
   450     def testWriteOverflow(self):
       
   451         writer = MockRawIO()
       
   452         bufio = io.BufferedWriter(writer, 8)
       
   453 
       
   454         bufio.write(b"abc")
       
   455         bufio.write(b"defghijkl")
       
   456 
       
   457         self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
       
   458 
       
   459     def testWriteNonBlocking(self):
       
   460         raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
       
   461         bufio = io.BufferedWriter(raw, 8, 16)
       
   462 
       
   463         bufio.write(b"asdf")
       
   464         bufio.write(b"asdfa")
       
   465         self.assertEquals(b"asdfasdfa", raw._write_stack[0])
       
   466 
       
   467         bufio.write(b"asdfasdfasdf")
       
   468         self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
       
   469         bufio.write(b"asdfasdfasdf")
       
   470         self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
       
   471         self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
       
   472 
       
   473         bufio.write(b"asdfasdfasdf")
       
   474 
       
   475         # XXX I don't like this test. It relies too heavily on how the
       
   476         # algorithm actually works, which we might change. Refactor
       
   477         # later.
       
   478 
       
   479     def testFileno(self):
       
   480         rawio = MockRawIO((b"abc", b"d", b"efg"))
       
   481         bufio = io.BufferedWriter(rawio)
       
   482 
       
   483         self.assertEquals(42, bufio.fileno())
       
   484 
       
   485     def testFlush(self):
       
   486         writer = MockRawIO()
       
   487         bufio = io.BufferedWriter(writer, 8)
       
   488 
       
   489         bufio.write(b"abc")
       
   490         bufio.flush()
       
   491 
       
   492         self.assertEquals(b"abc", writer._write_stack[0])
       
   493 
       
   494     def testThreads(self):
       
   495         # BufferedWriter should not raise exceptions or crash
       
   496         # when called from multiple threads.
       
   497         try:
       
   498             # We use a real file object because it allows us to
       
   499             # exercise situations where the GIL is released before
       
   500             # writing the buffer to the raw streams. This is in addition
       
   501             # to concurrency issues due to switching threads in the middle
       
   502             # of Python code.
       
   503             with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
       
   504                 bufio = io.BufferedWriter(raw, 8)
       
   505                 errors = []
       
   506                 def f():
       
   507                     try:
       
   508                         # Write enough bytes to flush the buffer
       
   509                         s = b"a" * 19
       
   510                         for i in range(50):
       
   511                             bufio.write(s)
       
   512                     except Exception as e:
       
   513                         errors.append(e)
       
   514                         raise
       
   515                 threads = [threading.Thread(target=f) for x in range(20)]
       
   516                 for t in threads:
       
   517                     t.start()
       
   518                 time.sleep(0.02) # yield
       
   519                 for t in threads:
       
   520                     t.join()
       
   521                 self.assertFalse(errors,
       
   522                     "the following exceptions were caught: %r" % errors)
       
   523         finally:
       
   524             test_support.unlink(test_support.TESTFN)
       
   525 
       
   526 
       
   527 class BufferedRWPairTest(unittest.TestCase):
       
   528 
       
   529     def testRWPair(self):
       
   530         r = MockRawIO(())
       
   531         w = MockRawIO()
       
   532         pair = io.BufferedRWPair(r, w)
       
   533 
       
   534         # XXX need implementation
       
   535 
       
   536 
       
   537 class BufferedRandomTest(unittest.TestCase):
       
   538 
       
   539     def testReadAndWrite(self):
       
   540         raw = MockRawIO((b"asdf", b"ghjk"))
       
   541         rw = io.BufferedRandom(raw, 8, 12)
       
   542 
       
   543         self.assertEqual(b"as", rw.read(2))
       
   544         rw.write(b"ddd")
       
   545         rw.write(b"eee")
       
   546         self.assertFalse(raw._write_stack) # Buffer writes
       
   547         self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
       
   548         self.assertEquals(b"dddeee", raw._write_stack[0])
       
   549 
       
   550     def testSeekAndTell(self):
       
   551         raw = io.BytesIO(b"asdfghjkl")
       
   552         rw = io.BufferedRandom(raw)
       
   553 
       
   554         self.assertEquals(b"as", rw.read(2))
       
   555         self.assertEquals(2, rw.tell())
       
   556         rw.seek(0, 0)
       
   557         self.assertEquals(b"asdf", rw.read(4))
       
   558 
       
   559         rw.write(b"asdf")
       
   560         rw.seek(0, 0)
       
   561         self.assertEquals(b"asdfasdfl", rw.read())
       
   562         self.assertEquals(9, rw.tell())
       
   563         rw.seek(-4, 2)
       
   564         self.assertEquals(5, rw.tell())
       
   565         rw.seek(2, 1)
       
   566         self.assertEquals(7, rw.tell())
       
   567         self.assertEquals(b"fl", rw.read(11))
       
   568         self.assertRaises(TypeError, rw.seek, 0.0)
       
   569 
       
   570 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
       
   571 # properties:
       
   572 #   - A single output character can correspond to many bytes of input.
       
   573 #   - The number of input bytes to complete the character can be
       
   574 #     undetermined until the last input byte is received.
       
   575 #   - The number of input bytes can vary depending on previous input.
       
   576 #   - A single input byte can correspond to many characters of output.
       
   577 #   - The number of output characters can be undetermined until the
       
   578 #     last input byte is received.
       
   579 #   - The number of output characters can vary depending on previous input.
       
   580 
       
   581 class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
       
   582     """
       
   583     For testing seek/tell behavior with a stateful, buffering decoder.
       
   584 
       
   585     Input is a sequence of words.  Words may be fixed-length (length set
       
   586     by input) or variable-length (period-terminated).  In variable-length
       
   587     mode, extra periods are ignored.  Possible words are:
       
   588       - 'i' followed by a number sets the input length, I (maximum 99).
       
   589         When I is set to 0, words are space-terminated.
       
   590       - 'o' followed by a number sets the output length, O (maximum 99).
       
   591       - Any other word is converted into a word followed by a period on
       
   592         the output.  The output word consists of the input word truncated
       
   593         or padded out with hyphens to make its length equal to O.  If O
       
   594         is 0, the word is output verbatim without truncating or padding.
       
   595     I and O are initially set to 1.  When I changes, any buffered input is
       
   596     re-scanned according to the new I.  EOF also terminates the last word.
       
   597     """
       
   598 
       
   599     def __init__(self, errors='strict'):
       
   600         codecs.IncrementalDecoder.__init__(self, errors)
       
   601         self.reset()
       
   602 
       
   603     def __repr__(self):
       
   604         return '<SID %x>' % id(self)
       
   605 
       
   606     def reset(self):
       
   607         self.i = 1
       
   608         self.o = 1
       
   609         self.buffer = bytearray()
       
   610 
       
   611     def getstate(self):
       
   612         i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
       
   613         return bytes(self.buffer), i*100 + o
       
   614 
       
   615     def setstate(self, state):
       
   616         buffer, io = state
       
   617         self.buffer = bytearray(buffer)
       
   618         i, o = divmod(io, 100)
       
   619         self.i, self.o = i ^ 1, o ^ 1
       
   620 
       
   621     def decode(self, input, final=False):
       
   622         output = ''
       
   623         for b in input:
       
   624             if self.i == 0: # variable-length, terminated with period
       
   625                 if b == '.':
       
   626                     if self.buffer:
       
   627                         output += self.process_word()
       
   628                 else:
       
   629                     self.buffer.append(b)
       
   630             else: # fixed-length, terminate after self.i bytes
       
   631                 self.buffer.append(b)
       
   632                 if len(self.buffer) == self.i:
       
   633                     output += self.process_word()
       
   634         if final and self.buffer: # EOF terminates the last word
       
   635             output += self.process_word()
       
   636         return output
       
   637 
       
   638     def process_word(self):
       
   639         output = ''
       
   640         if self.buffer[0] == ord('i'):
       
   641             self.i = min(99, int(self.buffer[1:] or 0)) # set input length
       
   642         elif self.buffer[0] == ord('o'):
       
   643             self.o = min(99, int(self.buffer[1:] or 0)) # set output length
       
   644         else:
       
   645             output = self.buffer.decode('ascii')
       
   646             if len(output) < self.o:
       
   647                 output += '-'*self.o # pad out with hyphens
       
   648             if self.o:
       
   649                 output = output[:self.o] # truncate to output length
       
   650             output += '.'
       
   651         self.buffer = bytearray()
       
   652         return output
       
   653 
       
   654     codecEnabled = False
       
   655 
       
   656     @classmethod
       
   657     def lookupTestDecoder(cls, name):
       
   658         if cls.codecEnabled and name == 'test_decoder':
       
   659             return codecs.CodecInfo(
       
   660                 name='test_decoder', encode=None, decode=None,
       
   661                 incrementalencoder=None,
       
   662                 streamreader=None, streamwriter=None,
       
   663                 incrementaldecoder=cls)
       
   664 
       
   665 # Register the previous decoder for testing.
       
   666 # Disabled by default, tests will enable it.
       
   667 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
       
   668 
       
   669 
       
   670 class StatefulIncrementalDecoderTest(unittest.TestCase):
       
   671     """
       
   672     Make sure the StatefulIncrementalDecoder actually works.
       
   673     """
       
   674 
       
   675     test_cases = [
       
   676         # I=1, O=1 (fixed-length input == fixed-length output)
       
   677         (b'abcd', False, 'a.b.c.d.'),
       
   678         # I=0, O=0 (variable-length input, variable-length output)
       
   679         (b'oiabcd', True, 'abcd.'),
       
   680         # I=0, O=0 (should ignore extra periods)
       
   681         (b'oi...abcd...', True, 'abcd.'),
       
   682         # I=0, O=6 (variable-length input, fixed-length output)
       
   683         (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
       
   684         # I=2, O=6 (fixed-length input < fixed-length output)
       
   685         (b'i.i2.o6xyz', True, 'xy----.z-----.'),
       
   686         # I=6, O=3 (fixed-length input > fixed-length output)
       
   687         (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
       
   688         # I=0, then 3; O=29, then 15 (with longer output)
       
   689         (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
       
   690          'a----------------------------.' +
       
   691          'b----------------------------.' +
       
   692          'cde--------------------------.' +
       
   693          'abcdefghijabcde.' +
       
   694          'a.b------------.' +
       
   695          '.c.------------.' +
       
   696          'd.e------------.' +
       
   697          'k--------------.' +
       
   698          'l--------------.' +
       
   699          'm--------------.')
       
   700     ]
       
   701 
       
   702     def testDecoder(self):
       
   703         # Try a few one-shot test cases.
       
   704         for input, eof, output in self.test_cases:
       
   705             d = StatefulIncrementalDecoder()
       
   706             self.assertEquals(d.decode(input, eof), output)
       
   707 
       
   708         # Also test an unfinished decode, followed by forcing EOF.
       
   709         d = StatefulIncrementalDecoder()
       
   710         self.assertEquals(d.decode(b'oiabcd'), '')
       
   711         self.assertEquals(d.decode(b'', 1), 'abcd.')
       
   712 
       
   713 class TextIOWrapperTest(unittest.TestCase):
       
   714 
       
   715     def setUp(self):
       
   716         self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
       
   717         self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
       
   718 
       
   719     def tearDown(self):
       
   720         test_support.unlink(test_support.TESTFN)
       
   721 
       
   722     def testLineBuffering(self):
       
   723         r = io.BytesIO()
       
   724         b = io.BufferedWriter(r, 1000)
       
   725         t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
       
   726         t.write(u"X")
       
   727         self.assertEquals(r.getvalue(), b"")  # No flush happened
       
   728         t.write(u"Y\nZ")
       
   729         self.assertEquals(r.getvalue(), b"XY\nZ")  # All got flushed
       
   730         t.write(u"A\rB")
       
   731         self.assertEquals(r.getvalue(), b"XY\nZA\rB")
       
   732 
       
   733     def testEncodingErrorsReading(self):
       
   734         # (1) default
       
   735         b = io.BytesIO(b"abc\n\xff\n")
       
   736         t = io.TextIOWrapper(b, encoding="ascii")
       
   737         self.assertRaises(UnicodeError, t.read)
       
   738         # (2) explicit strict
       
   739         b = io.BytesIO(b"abc\n\xff\n")
       
   740         t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
       
   741         self.assertRaises(UnicodeError, t.read)
       
   742         # (3) ignore
       
   743         b = io.BytesIO(b"abc\n\xff\n")
       
   744         t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
       
   745         self.assertEquals(t.read(), "abc\n\n")
       
   746         # (4) replace
       
   747         b = io.BytesIO(b"abc\n\xff\n")
       
   748         t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
       
   749         self.assertEquals(t.read(), u"abc\n\ufffd\n")
       
   750 
       
   751     def testEncodingErrorsWriting(self):
       
   752         # (1) default
       
   753         b = io.BytesIO()
       
   754         t = io.TextIOWrapper(b, encoding="ascii")
       
   755         self.assertRaises(UnicodeError, t.write, u"\xff")
       
   756         # (2) explicit strict
       
   757         b = io.BytesIO()
       
   758         t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
       
   759         self.assertRaises(UnicodeError, t.write, u"\xff")
       
   760         # (3) ignore
       
   761         b = io.BytesIO()
       
   762         t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
       
   763                              newline="\n")
       
   764         t.write(u"abc\xffdef\n")
       
   765         t.flush()
       
   766         self.assertEquals(b.getvalue(), b"abcdef\n")
       
   767         # (4) replace
       
   768         b = io.BytesIO()
       
   769         t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
       
   770                              newline="\n")
       
   771         t.write(u"abc\xffdef\n")
       
   772         t.flush()
       
   773         self.assertEquals(b.getvalue(), b"abc?def\n")
       
   774 
       
   775     def testNewlinesInput(self):
       
   776         testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
       
   777         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
       
   778         for newline, expected in [
       
   779             (None, normalized.decode("ascii").splitlines(True)),
       
   780             ("", testdata.decode("ascii").splitlines(True)),
       
   781             ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
       
   782             ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
       
   783             ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
       
   784             ]:
       
   785             buf = io.BytesIO(testdata)
       
   786             txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
       
   787             self.assertEquals(txt.readlines(), expected)
       
   788             txt.seek(0)
       
   789             self.assertEquals(txt.read(), "".join(expected))
       
   790 
       
   791     def testNewlinesOutput(self):
       
   792         testdict = {
       
   793             "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
       
   794             "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
       
   795             "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
       
   796             "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
       
   797             }
       
   798         tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
       
   799         for newline, expected in tests:
       
   800             buf = io.BytesIO()
       
   801             txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
       
   802             txt.write("AAA\nB")
       
   803             txt.write("BB\nCCC\n")
       
   804             txt.write("X\rY\r\nZ")
       
   805             txt.flush()
       
   806             self.assertEquals(buf.closed, False)
       
   807             self.assertEquals(buf.getvalue(), expected)
       
   808 
       
   809     def testNewlines(self):
       
   810         input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
       
   811 
       
   812         tests = [
       
   813             [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
       
   814             [ '', input_lines ],
       
   815             [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
       
   816             [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
       
   817             [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
       
   818         ]
       
   819 
       
   820         encodings = ('utf-8', 'latin-1')
       
   821 
       
   822         # Try a range of buffer sizes to test the case where \r is the last
       
   823         # character in TextIOWrapper._pending_line.
       
   824         for encoding in encodings:
       
   825             # XXX: str.encode() should return bytes
       
   826             data = bytes(''.join(input_lines).encode(encoding))
       
   827             for do_reads in (False, True):
       
   828                 for bufsize in range(1, 10):
       
   829                     for newline, exp_lines in tests:
       
   830                         bufio = io.BufferedReader(io.BytesIO(data), bufsize)
       
   831                         textio = io.TextIOWrapper(bufio, newline=newline,
       
   832                                                   encoding=encoding)
       
   833                         if do_reads:
       
   834                             got_lines = []
       
   835                             while True:
       
   836                                 c2 = textio.read(2)
       
   837                                 if c2 == '':
       
   838                                     break
       
   839                                 self.assertEquals(len(c2), 2)
       
   840                                 got_lines.append(c2 + textio.readline())
       
   841                         else:
       
   842                             got_lines = list(textio)
       
   843 
       
   844                         for got_line, exp_line in zip(got_lines, exp_lines):
       
   845                             self.assertEquals(got_line, exp_line)
       
   846                         self.assertEquals(len(got_lines), len(exp_lines))
       
   847 
       
   848     def testNewlinesInput(self):
       
   849         testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
       
   850         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
       
   851         for newline, expected in [
       
   852             (None, normalized.decode("ascii").splitlines(True)),
       
   853             ("", testdata.decode("ascii").splitlines(True)),
       
   854             ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
       
   855             ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
       
   856             ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
       
   857             ]:
       
   858             buf = io.BytesIO(testdata)
       
   859             txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
       
   860             self.assertEquals(txt.readlines(), expected)
       
   861             txt.seek(0)
       
   862             self.assertEquals(txt.read(), "".join(expected))
       
   863 
       
   864     def testNewlinesOutput(self):
       
   865         data = u"AAA\nBBB\rCCC\n"
       
   866         data_lf = b"AAA\nBBB\rCCC\n"
       
   867         data_cr = b"AAA\rBBB\rCCC\r"
       
   868         data_crlf = b"AAA\r\nBBB\rCCC\r\n"
       
   869         save_linesep = os.linesep
       
   870         try:
       
   871             for os.linesep, newline, expected in [
       
   872                 ("\n", None, data_lf),
       
   873                 ("\r\n", None, data_crlf),
       
   874                 ("\n", "", data_lf),
       
   875                 ("\r\n", "", data_lf),
       
   876                 ("\n", "\n", data_lf),
       
   877                 ("\r\n", "\n", data_lf),
       
   878                 ("\n", "\r", data_cr),
       
   879                 ("\r\n", "\r", data_cr),
       
   880                 ("\n", "\r\n", data_crlf),
       
   881                 ("\r\n", "\r\n", data_crlf),
       
   882                 ]:
       
   883                 buf = io.BytesIO()
       
   884                 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
       
   885                 txt.write(data)
       
   886                 txt.close()
       
   887                 self.assertEquals(buf.closed, True)
       
   888                 self.assertRaises(ValueError, buf.getvalue)
       
   889         finally:
       
   890             os.linesep = save_linesep
       
   891 
       
   892     # Systematic tests of the text I/O API
       
   893 
       
   894     def testBasicIO(self):
       
   895         for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
       
   896             for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
       
   897                 f = io.open(test_support.TESTFN, "w+", encoding=enc)
       
   898                 f._CHUNK_SIZE = chunksize
       
   899                 self.assertEquals(f.write(u"abc"), 3)
       
   900                 f.close()
       
   901                 f = io.open(test_support.TESTFN, "r+", encoding=enc)
       
   902                 f._CHUNK_SIZE = chunksize
       
   903                 self.assertEquals(f.tell(), 0)
       
   904                 self.assertEquals(f.read(), u"abc")
       
   905                 cookie = f.tell()
       
   906                 self.assertEquals(f.seek(0), 0)
       
   907                 self.assertEquals(f.read(2), u"ab")
       
   908                 self.assertEquals(f.read(1), u"c")
       
   909                 self.assertEquals(f.read(1), u"")
       
   910                 self.assertEquals(f.read(), u"")
       
   911                 self.assertEquals(f.tell(), cookie)
       
   912                 self.assertEquals(f.seek(0), 0)
       
   913                 self.assertEquals(f.seek(0, 2), cookie)
       
   914                 self.assertEquals(f.write(u"def"), 3)
       
   915                 self.assertEquals(f.seek(cookie), cookie)
       
   916                 self.assertEquals(f.read(), u"def")
       
   917                 if enc.startswith("utf"):
       
   918                     self.multi_line_test(f, enc)
       
   919                 f.close()
       
   920 
       
   921     def multi_line_test(self, f, enc):
       
   922         f.seek(0)
       
   923         f.truncate()
       
   924         sample = u"s\xff\u0fff\uffff"
       
   925         wlines = []
       
   926         for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
       
   927             chars = []
       
   928             for i in range(size):
       
   929                 chars.append(sample[i % len(sample)])
       
   930             line = u"".join(chars) + u"\n"
       
   931             wlines.append((f.tell(), line))
       
   932             f.write(line)
       
   933         f.seek(0)
       
   934         rlines = []
       
   935         while True:
       
   936             pos = f.tell()
       
   937             line = f.readline()
       
   938             if not line:
       
   939                 break
       
   940             rlines.append((pos, line))
       
   941         self.assertEquals(rlines, wlines)
       
   942 
       
   943     def testTelling(self):
       
   944         f = io.open(test_support.TESTFN, "w+", encoding="utf8")
       
   945         p0 = f.tell()
       
   946         f.write(u"\xff\n")
       
   947         p1 = f.tell()
       
   948         f.write(u"\xff\n")
       
   949         p2 = f.tell()
       
   950         f.seek(0)
       
   951         self.assertEquals(f.tell(), p0)
       
   952         self.assertEquals(f.readline(), u"\xff\n")
       
   953         self.assertEquals(f.tell(), p1)
       
   954         self.assertEquals(f.readline(), u"\xff\n")
       
   955         self.assertEquals(f.tell(), p2)
       
   956         f.seek(0)
       
   957         for line in f:
       
   958             self.assertEquals(line, u"\xff\n")
       
   959             self.assertRaises(IOError, f.tell)
       
   960         self.assertEquals(f.tell(), p2)
       
   961         f.close()
       
   962 
       
   963     def testSeeking(self):
       
   964         chunk_size = io.TextIOWrapper._CHUNK_SIZE
       
   965         prefix_size = chunk_size - 2
       
   966         u_prefix = "a" * prefix_size
       
   967         prefix = bytes(u_prefix.encode("utf-8"))
       
   968         self.assertEquals(len(u_prefix), len(prefix))
       
   969         u_suffix = "\u8888\n"
       
   970         suffix = bytes(u_suffix.encode("utf-8"))
       
   971         line = prefix + suffix
       
   972         f = io.open(test_support.TESTFN, "wb")
       
   973         f.write(line*2)
       
   974         f.close()
       
   975         f = io.open(test_support.TESTFN, "r", encoding="utf-8")
       
   976         s = f.read(prefix_size)
       
   977         self.assertEquals(s, unicode(prefix, "ascii"))
       
   978         self.assertEquals(f.tell(), prefix_size)
       
   979         self.assertEquals(f.readline(), u_suffix)
       
   980 
       
   981     def testSeekingToo(self):
       
   982         # Regression test for a specific bug
       
   983         data = b'\xe0\xbf\xbf\n'
       
   984         f = io.open(test_support.TESTFN, "wb")
       
   985         f.write(data)
       
   986         f.close()
       
   987         f = io.open(test_support.TESTFN, "r", encoding="utf-8")
       
   988         f._CHUNK_SIZE  # Just test that it exists
       
   989         f._CHUNK_SIZE = 2
       
   990         f.readline()
       
   991         f.tell()
       
   992 
       
   993     def testSeekAndTell(self):
       
   994         """Test seek/tell using the StatefulIncrementalDecoder."""
       
   995 
       
   996         def testSeekAndTellWithData(data, min_pos=0):
       
   997             """Tell/seek to various points within a data stream and ensure
       
   998             that the decoded data returned by read() is consistent."""
       
   999             f = io.open(test_support.TESTFN, 'wb')
       
  1000             f.write(data)
       
  1001             f.close()
       
  1002             f = io.open(test_support.TESTFN, encoding='test_decoder')
       
  1003             decoded = f.read()
       
  1004             f.close()
       
  1005 
       
  1006             for i in range(min_pos, len(decoded) + 1): # seek positions
       
  1007                 for j in [1, 5, len(decoded) - i]: # read lengths
       
  1008                     f = io.open(test_support.TESTFN, encoding='test_decoder')
       
  1009                     self.assertEquals(f.read(i), decoded[:i])
       
  1010                     cookie = f.tell()
       
  1011                     self.assertEquals(f.read(j), decoded[i:i + j])
       
  1012                     f.seek(cookie)
       
  1013                     self.assertEquals(f.read(), decoded[i:])
       
  1014                     f.close()
       
  1015 
       
  1016         # Enable the test decoder.
       
  1017         StatefulIncrementalDecoder.codecEnabled = 1
       
  1018 
       
  1019         # Run the tests.
       
  1020         try:
       
  1021             # Try each test case.
       
  1022             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
       
  1023                 testSeekAndTellWithData(input)
       
  1024 
       
  1025             # Position each test case so that it crosses a chunk boundary.
       
  1026             CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
       
  1027             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
       
  1028                 offset = CHUNK_SIZE - len(input)//2
       
  1029                 prefix = b'.'*offset
       
  1030                 # Don't bother seeking into the prefix (takes too long).
       
  1031                 min_pos = offset*2
       
  1032                 testSeekAndTellWithData(prefix + input, min_pos)
       
  1033 
       
  1034         # Ensure our test decoder won't interfere with subsequent tests.
       
  1035         finally:
       
  1036             StatefulIncrementalDecoder.codecEnabled = 0
       
  1037 
       
  1038     def testEncodedWrites(self):
       
  1039         data = u"1234567890"
       
  1040         tests = ("utf-16",
       
  1041                  "utf-16-le",
       
  1042                  "utf-16-be",
       
  1043                  "utf-32",
       
  1044                  "utf-32-le",
       
  1045                  "utf-32-be")
       
  1046         for encoding in tests:
       
  1047             buf = io.BytesIO()
       
  1048             f = io.TextIOWrapper(buf, encoding=encoding)
       
  1049             # Check if the BOM is written only once (see issue1753).
       
  1050             f.write(data)
       
  1051             f.write(data)
       
  1052             f.seek(0)
       
  1053             self.assertEquals(f.read(), data * 2)
       
  1054             self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
       
  1055 
       
  1056     def timingTest(self):
       
  1057         timer = time.time
       
  1058         enc = "utf8"
       
  1059         line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
       
  1060         nlines = 10000
       
  1061         nchars = len(line)
       
  1062         nbytes = len(line.encode(enc))
       
  1063         for chunk_size in (32, 64, 128, 256):
       
  1064             f = io.open(test_support.TESTFN, "w+", encoding=enc)
       
  1065             f._CHUNK_SIZE = chunk_size
       
  1066             t0 = timer()
       
  1067             for i in range(nlines):
       
  1068                 f.write(line)
       
  1069             f.flush()
       
  1070             t1 = timer()
       
  1071             f.seek(0)
       
  1072             for line in f:
       
  1073                 pass
       
  1074             t2 = timer()
       
  1075             f.seek(0)
       
  1076             while f.readline():
       
  1077                 pass
       
  1078             t3 = timer()
       
  1079             f.seek(0)
       
  1080             while f.readline():
       
  1081                 f.tell()
       
  1082             t4 = timer()
       
  1083             f.close()
       
  1084             if test_support.verbose:
       
  1085                 print("\nTiming test: %d lines of %d characters (%d bytes)" %
       
  1086                       (nlines, nchars, nbytes))
       
  1087                 print("File chunk size:          %6s" % f._CHUNK_SIZE)
       
  1088                 print("Writing:                  %6.3f seconds" % (t1-t0))
       
  1089                 print("Reading using iteration:  %6.3f seconds" % (t2-t1))
       
  1090                 print("Reading using readline(): %6.3f seconds" % (t3-t2))
       
  1091                 print("Using readline()+tell():  %6.3f seconds" % (t4-t3))
       
  1092 
       
  1093     def testReadOneByOne(self):
       
  1094         txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
       
  1095         reads = ""
       
  1096         while True:
       
  1097             c = txt.read(1)
       
  1098             if not c:
       
  1099                 break
       
  1100             reads += c
       
  1101         self.assertEquals(reads, "AA\nBB")
       
  1102 
       
  1103     # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
       
  1104     def testReadByChunk(self):
       
  1105         # make sure "\r\n" straddles 128 char boundary.
       
  1106         txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
       
  1107         reads = ""
       
  1108         while True:
       
  1109             c = txt.read(128)
       
  1110             if not c:
       
  1111                 break
       
  1112             reads += c
       
  1113         self.assertEquals(reads, "A"*127+"\nB")
       
  1114 
       
  1115     def test_issue1395_1(self):
       
  1116         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
       
  1117 
       
  1118         # read one char at a time
       
  1119         reads = ""
       
  1120         while True:
       
  1121             c = txt.read(1)
       
  1122             if not c:
       
  1123                 break
       
  1124             reads += c
       
  1125         self.assertEquals(reads, self.normalized)
       
  1126 
       
  1127     def test_issue1395_2(self):
       
  1128         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
       
  1129         txt._CHUNK_SIZE = 4
       
  1130 
       
  1131         reads = ""
       
  1132         while True:
       
  1133             c = txt.read(4)
       
  1134             if not c:
       
  1135                 break
       
  1136             reads += c
       
  1137         self.assertEquals(reads, self.normalized)
       
  1138 
       
  1139     def test_issue1395_3(self):
       
  1140         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
       
  1141         txt._CHUNK_SIZE = 4
       
  1142 
       
  1143         reads = txt.read(4)
       
  1144         reads += txt.read(4)
       
  1145         reads += txt.readline()
       
  1146         reads += txt.readline()
       
  1147         reads += txt.readline()
       
  1148         self.assertEquals(reads, self.normalized)
       
  1149 
       
  1150     def test_issue1395_4(self):
       
  1151         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
       
  1152         txt._CHUNK_SIZE = 4
       
  1153 
       
  1154         reads = txt.read(4)
       
  1155         reads += txt.read()
       
  1156         self.assertEquals(reads, self.normalized)
       
  1157 
       
  1158     def test_issue1395_5(self):
       
  1159         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
       
  1160         txt._CHUNK_SIZE = 4
       
  1161 
       
  1162         reads = txt.read(4)
       
  1163         pos = txt.tell()
       
  1164         txt.seek(0)
       
  1165         txt.seek(pos)
       
  1166         self.assertEquals(txt.read(4), "BBB\n")
       
  1167 
       
  1168     def test_issue2282(self):
       
  1169         buffer = io.BytesIO(self.testdata)
       
  1170         txt = io.TextIOWrapper(buffer, encoding="ascii")
       
  1171 
       
  1172         self.assertEqual(buffer.seekable(), txt.seekable())
       
  1173 
       
  1174     def test_newline_decoder(self):
       
  1175         import codecs
       
  1176         decoder = codecs.getincrementaldecoder("utf-8")()
       
  1177         decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
       
  1178 
       
  1179         self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
       
  1180 
       
  1181         self.assertEquals(decoder.decode(b'\xe8'), u"")
       
  1182         self.assertEquals(decoder.decode(b'\xa2'), u"")
       
  1183         self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
       
  1184 
       
  1185         self.assertEquals(decoder.decode(b'\xe8'), u"")
       
  1186         self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
       
  1187 
       
  1188         decoder.setstate((b'', 0))
       
  1189         self.assertEquals(decoder.decode(b'\n'), u"\n")
       
  1190         self.assertEquals(decoder.decode(b'\r'), u"")
       
  1191         self.assertEquals(decoder.decode(b'', final=True), u"\n")
       
  1192         self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
       
  1193 
       
  1194         self.assertEquals(decoder.decode(b'\r'), u"")
       
  1195         self.assertEquals(decoder.decode(b'a'), u"\na")
       
  1196 
       
  1197         self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
       
  1198         self.assertEquals(decoder.decode(b'\r'), u"")
       
  1199         self.assertEquals(decoder.decode(b'\r'), u"\n")
       
  1200         self.assertEquals(decoder.decode(b'\na'), u"\na")
       
  1201 
       
  1202         self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
       
  1203         self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
       
  1204         self.assertEquals(decoder.decode(b'\n'), u"\n")
       
  1205         self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
       
  1206         self.assertEquals(decoder.decode(b'\n'), u"\n")
       
  1207 
       
  1208         decoder = codecs.getincrementaldecoder("utf-8")()
       
  1209         decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
       
  1210         self.assertEquals(decoder.newlines, None)
       
  1211         decoder.decode(b"abc\n\r")
       
  1212         self.assertEquals(decoder.newlines, u'\n')
       
  1213         decoder.decode(b"\nabc")
       
  1214         self.assertEquals(decoder.newlines, ('\n', '\r\n'))
       
  1215         decoder.decode(b"abc\r")
       
  1216         self.assertEquals(decoder.newlines, ('\n', '\r\n'))
       
  1217         decoder.decode(b"abc")
       
  1218         self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
       
  1219         decoder.decode(b"abc\r")
       
  1220         decoder.reset()
       
  1221         self.assertEquals(decoder.decode(b"abc"), "abc")
       
  1222         self.assertEquals(decoder.newlines, None)
       
  1223 
       
  1224 # XXX Tests for open()
       
  1225 
       
  1226 class MiscIOTest(unittest.TestCase):
       
  1227 
       
  1228     def testImport__all__(self):
       
  1229         for name in io.__all__:
       
  1230             obj = getattr(io, name, None)
       
  1231             self.assert_(obj is not None, name)
       
  1232             if name == "open":
       
  1233                 continue
       
  1234             elif "error" in name.lower():
       
  1235                 self.assert_(issubclass(obj, Exception), name)
       
  1236             else:
       
  1237                 self.assert_(issubclass(obj, io.IOBase))
       
  1238 
       
  1239 
       
  1240 def test_main():
       
  1241     test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
       
  1242                               BufferedReaderTest, BufferedWriterTest,
       
  1243                               BufferedRWPairTest, BufferedRandomTest,
       
  1244                               StatefulIncrementalDecoderTest,
       
  1245                               TextIOWrapperTest, MiscIOTest)
       
  1246 
       
  1247 if __name__ == "__main__":
       
  1248     unittest.main()