symbian-qemu-0.9.1-12/python-2.6.1/Lib/asyncore.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 # -*- Mode: Python -*-
       
     2 #   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
       
     3 #   Author: Sam Rushing <rushing@nightmare.com>
       
     4 
       
     5 # ======================================================================
       
     6 # Copyright 1996 by Sam Rushing
       
     7 #
       
     8 #                         All Rights Reserved
       
     9 #
       
    10 # Permission to use, copy, modify, and distribute this software and
       
    11 # its documentation for any purpose and without fee is hereby
       
    12 # granted, provided that the above copyright notice appear in all
       
    13 # copies and that both that copyright notice and this permission
       
    14 # notice appear in supporting documentation, and that the name of Sam
       
    15 # Rushing not be used in advertising or publicity pertaining to
       
    16 # distribution of the software without specific, written prior
       
    17 # permission.
       
    18 #
       
    19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
       
    20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
       
    21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
       
    22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
       
    23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
       
    24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
       
    25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
       
    26 # ======================================================================
       
    27 
       
    28 """Basic infrastructure for asynchronous socket service clients and servers.
       
    29 
       
    30 There are only two ways to have a program on a single processor do "more
       
    31 than one thing at a time".  Multi-threaded programming is the simplest and
       
    32 most popular way to do it, but there is another very different technique,
       
    33 that lets you have nearly all the advantages of multi-threading, without
       
    34 actually using multiple threads. it's really only practical if your program
       
    35 is largely I/O bound. If your program is CPU bound, then pre-emptive
       
    36 scheduled threads are probably what you really need. Network servers are
       
    37 rarely CPU-bound, however.
       
    38 
       
    39 If your operating system supports the select() system call in its I/O
       
    40 library (and nearly all do), then you can use it to juggle multiple
       
    41 communication channels at once; doing other work while your I/O is taking
       
    42 place in the "background."  Although this strategy can seem strange and
       
    43 complex, especially at first, it is in many ways easier to understand and
       
    44 control than multi-threaded programming. The module documented here solves
       
    45 many of the difficult problems for you, making the task of building
       
    46 sophisticated high-performance network servers and clients a snap.
       
    47 """
       
    48 
       
    49 import select
       
    50 import socket
       
    51 import sys
       
    52 import time
       
    53 
       
    54 import os
       
    55 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
       
    56      ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode
       
    57 
       
    58 try:
       
    59     socket_map
       
    60 except NameError:
       
    61     socket_map = {}
       
    62 
       
    63 def _strerror(err):
       
    64     res = os.strerror(err)
       
    65     if res == 'Unknown error':
       
    66         res = errorcode[err]
       
    67     return res
       
    68 
       
    69 class ExitNow(Exception):
       
    70     pass
       
    71 
       
    72 def read(obj):
       
    73     try:
       
    74         obj.handle_read_event()
       
    75     except (ExitNow, KeyboardInterrupt, SystemExit):
       
    76         raise
       
    77     except:
       
    78         obj.handle_error()
       
    79 
       
    80 def write(obj):
       
    81     try:
       
    82         obj.handle_write_event()
       
    83     except (ExitNow, KeyboardInterrupt, SystemExit):
       
    84         raise
       
    85     except:
       
    86         obj.handle_error()
       
    87 
       
    88 def _exception(obj):
       
    89     try:
       
    90         obj.handle_expt_event()
       
    91     except (ExitNow, KeyboardInterrupt, SystemExit):
       
    92         raise
       
    93     except:
       
    94         obj.handle_error()
       
    95 
       
    96 def readwrite(obj, flags):
       
    97     try:
       
    98         if flags & (select.POLLIN | select.POLLPRI):
       
    99             obj.handle_read_event()
       
   100         if flags & select.POLLOUT:
       
   101             obj.handle_write_event()
       
   102         if flags & (select.POLLERR | select.POLLNVAL):
       
   103             obj.handle_expt_event()
       
   104         if flags & select.POLLHUP:
       
   105             obj.handle_close()
       
   106     except (ExitNow, KeyboardInterrupt, SystemExit):
       
   107         raise
       
   108     except:
       
   109         obj.handle_error()
       
   110 
       
   111 def poll(timeout=0.0, map=None):
       
   112     if map is None:
       
   113         map = socket_map
       
   114     if map:
       
   115         r = []; w = []; e = []
       
   116         for fd, obj in map.items():
       
   117             is_r = obj.readable()
       
   118             is_w = obj.writable()
       
   119             if is_r:
       
   120                 r.append(fd)
       
   121             if is_w:
       
   122                 w.append(fd)
       
   123             if is_r or is_w:
       
   124                 e.append(fd)
       
   125         if [] == r == w == e:
       
   126             time.sleep(timeout)
       
   127             return
       
   128 
       
   129         try:
       
   130             r, w, e = select.select(r, w, e, timeout)
       
   131         except select.error, err:
       
   132             if err.args[0] != EINTR:
       
   133                 raise
       
   134             else:
       
   135                 return
       
   136 
       
   137         for fd in r:
       
   138             obj = map.get(fd)
       
   139             if obj is None:
       
   140                 continue
       
   141             read(obj)
       
   142 
       
   143         for fd in w:
       
   144             obj = map.get(fd)
       
   145             if obj is None:
       
   146                 continue
       
   147             write(obj)
       
   148 
       
   149         for fd in e:
       
   150             obj = map.get(fd)
       
   151             if obj is None:
       
   152                 continue
       
   153             _exception(obj)
       
   154 
       
   155 def poll2(timeout=0.0, map=None):
       
   156     # Use the poll() support added to the select module in Python 2.0
       
   157     if map is None:
       
   158         map = socket_map
       
   159     if timeout is not None:
       
   160         # timeout is in milliseconds
       
   161         timeout = int(timeout*1000)
       
   162     pollster = select.poll()
       
   163     if map:
       
   164         for fd, obj in map.items():
       
   165             flags = 0
       
   166             if obj.readable():
       
   167                 flags |= select.POLLIN | select.POLLPRI
       
   168             if obj.writable():
       
   169                 flags |= select.POLLOUT
       
   170             if flags:
       
   171                 # Only check for exceptions if object was either readable
       
   172                 # or writable.
       
   173                 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
       
   174                 pollster.register(fd, flags)
       
   175         try:
       
   176             r = pollster.poll(timeout)
       
   177         except select.error, err:
       
   178             if err.args[0] != EINTR:
       
   179                 raise
       
   180             r = []
       
   181         for fd, flags in r:
       
   182             obj = map.get(fd)
       
   183             if obj is None:
       
   184                 continue
       
   185             readwrite(obj, flags)
       
   186 
       
   187 poll3 = poll2                           # Alias for backward compatibility
       
   188 
       
   189 def loop(timeout=30.0, use_poll=False, map=None, count=None):
       
   190     if map is None:
       
   191         map = socket_map
       
   192 
       
   193     if use_poll and hasattr(select, 'poll'):
       
   194         poll_fun = poll2
       
   195     else:
       
   196         poll_fun = poll
       
   197 
       
   198     if count is None:
       
   199         while map:
       
   200             poll_fun(timeout, map)
       
   201 
       
   202     else:
       
   203         while map and count > 0:
       
   204             poll_fun(timeout, map)
       
   205             count = count - 1
       
   206 
       
   207 class dispatcher:
       
   208 
       
   209     debug = False
       
   210     connected = False
       
   211     accepting = False
       
   212     closing = False
       
   213     addr = None
       
   214 
       
   215     def __init__(self, sock=None, map=None):
       
   216         if map is None:
       
   217             self._map = socket_map
       
   218         else:
       
   219             self._map = map
       
   220 
       
   221         self._fileno = None
       
   222 
       
   223         if sock:
       
   224             # Set to nonblocking just to make sure for cases where we
       
   225             # get a socket from a blocking source.
       
   226             sock.setblocking(0)
       
   227             self.set_socket(sock, map)
       
   228             self.connected = True
       
   229             # The constructor no longer requires that the socket
       
   230             # passed be connected.
       
   231             try:
       
   232                 self.addr = sock.getpeername()
       
   233             except socket.error, err:
       
   234                 if err.args[0] == ENOTCONN:
       
   235                     # To handle the case where we got an unconnected
       
   236                     # socket.
       
   237                     self.connected = False
       
   238                 else:
       
   239                     # The socket is broken in some unknown way, alert
       
   240                     # the user and remove it from the map (to prevent
       
   241                     # polling of broken sockets).
       
   242                     self.del_channel(map)
       
   243                     raise
       
   244         else:
       
   245             self.socket = None
       
   246 
       
   247     def __repr__(self):
       
   248         status = [self.__class__.__module__+"."+self.__class__.__name__]
       
   249         if self.accepting and self.addr:
       
   250             status.append('listening')
       
   251         elif self.connected:
       
   252             status.append('connected')
       
   253         if self.addr is not None:
       
   254             try:
       
   255                 status.append('%s:%d' % self.addr)
       
   256             except TypeError:
       
   257                 status.append(repr(self.addr))
       
   258         return '<%s at %#x>' % (' '.join(status), id(self))
       
   259 
       
   260     def add_channel(self, map=None):
       
   261         #self.log_info('adding channel %s' % self)
       
   262         if map is None:
       
   263             map = self._map
       
   264         map[self._fileno] = self
       
   265 
       
   266     def del_channel(self, map=None):
       
   267         fd = self._fileno
       
   268         if map is None:
       
   269             map = self._map
       
   270         if fd in map:
       
   271             #self.log_info('closing channel %d:%s' % (fd, self))
       
   272             del map[fd]
       
   273         self._fileno = None
       
   274 
       
   275     def create_socket(self, family, type):
       
   276         self.family_and_type = family, type
       
   277         sock = socket.socket(family, type)
       
   278         sock.setblocking(0)
       
   279         self.set_socket(sock)
       
   280 
       
   281     def set_socket(self, sock, map=None):
       
   282         self.socket = sock
       
   283 ##        self.__dict__['socket'] = sock
       
   284         self._fileno = sock.fileno()
       
   285         self.add_channel(map)
       
   286 
       
   287     def set_reuse_addr(self):
       
   288         # try to re-use a server port if possible
       
   289         try:
       
   290             self.socket.setsockopt(
       
   291                 socket.SOL_SOCKET, socket.SO_REUSEADDR,
       
   292                 self.socket.getsockopt(socket.SOL_SOCKET,
       
   293                                        socket.SO_REUSEADDR) | 1
       
   294                 )
       
   295         except socket.error:
       
   296             pass
       
   297 
       
   298     # ==================================================
       
   299     # predicates for select()
       
   300     # these are used as filters for the lists of sockets
       
   301     # to pass to select().
       
   302     # ==================================================
       
   303 
       
   304     def readable(self):
       
   305         return True
       
   306 
       
   307     def writable(self):
       
   308         return True
       
   309 
       
   310     # ==================================================
       
   311     # socket object methods.
       
   312     # ==================================================
       
   313 
       
   314     def listen(self, num):
       
   315         self.accepting = True
       
   316         if os.name == 'nt' and num > 5:
       
   317             num = 5
       
   318         return self.socket.listen(num)
       
   319 
       
   320     def bind(self, addr):
       
   321         self.addr = addr
       
   322         return self.socket.bind(addr)
       
   323 
       
   324     def connect(self, address):
       
   325         self.connected = False
       
   326         err = self.socket.connect_ex(address)
       
   327         # XXX Should interpret Winsock return values
       
   328         if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
       
   329             return
       
   330         if err in (0, EISCONN):
       
   331             self.addr = address
       
   332             self.handle_connect_event()
       
   333         else:
       
   334             raise socket.error(err, errorcode[err])
       
   335 
       
   336     def accept(self):
       
   337         # XXX can return either an address pair or None
       
   338         try:
       
   339             conn, addr = self.socket.accept()
       
   340             return conn, addr
       
   341         except socket.error, why:
       
   342             if why.args[0] == EWOULDBLOCK:
       
   343                 pass
       
   344             else:
       
   345                 raise
       
   346 
       
   347     def send(self, data):
       
   348         try:
       
   349             result = self.socket.send(data)
       
   350             return result
       
   351         except socket.error, why:
       
   352             if why.args[0] == EWOULDBLOCK:
       
   353                 return 0
       
   354             elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
       
   355                 self.handle_close()
       
   356                 return 0
       
   357             else:
       
   358                 raise
       
   359 
       
   360     def recv(self, buffer_size):
       
   361         try:
       
   362             data = self.socket.recv(buffer_size)
       
   363             if not data:
       
   364                 # a closed connection is indicated by signaling
       
   365                 # a read condition, and having recv() return 0.
       
   366                 self.handle_close()
       
   367                 return ''
       
   368             else:
       
   369                 return data
       
   370         except socket.error, why:
       
   371             # winsock sometimes throws ENOTCONN
       
   372             if why.args[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]:
       
   373                 self.handle_close()
       
   374                 return ''
       
   375             else:
       
   376                 raise
       
   377 
       
   378     def close(self):
       
   379         self.connected = False
       
   380         self.accepting = False
       
   381         self.del_channel()
       
   382         try:
       
   383             self.socket.close()
       
   384         except socket.error, why:
       
   385             if why.args[0] not in (ENOTCONN, EBADF):
       
   386                 raise
       
   387 
       
   388     # cheap inheritance, used to pass all other attribute
       
   389     # references to the underlying socket object.
       
   390     def __getattr__(self, attr):
       
   391         return getattr(self.socket, attr)
       
   392 
       
   393     # log and log_info may be overridden to provide more sophisticated
       
   394     # logging and warning methods. In general, log is for 'hit' logging
       
   395     # and 'log_info' is for informational, warning and error logging.
       
   396 
       
   397     def log(self, message):
       
   398         sys.stderr.write('log: %s\n' % str(message))
       
   399 
       
   400     def log_info(self, message, type='info'):
       
   401         if __debug__ or type != 'info':
       
   402             print '%s: %s' % (type, message)
       
   403 
       
   404     def handle_read_event(self):
       
   405         if self.accepting:
       
   406             # accepting sockets are never connected, they "spawn" new
       
   407             # sockets that are connected
       
   408             self.handle_accept()
       
   409         elif not self.connected:
       
   410             self.handle_connect_event()
       
   411             self.handle_read()
       
   412         else:
       
   413             self.handle_read()
       
   414 
       
   415     def handle_connect_event(self):
       
   416         self.connected = True
       
   417         self.handle_connect()
       
   418 
       
   419     def handle_write_event(self):
       
   420         if self.accepting:
       
   421             # Accepting sockets shouldn't get a write event.
       
   422             # We will pretend it didn't happen.
       
   423             return
       
   424 
       
   425         if not self.connected:
       
   426             #check for errors
       
   427             err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
       
   428             if err != 0:
       
   429                 raise socket.error(err, _strerror(err))
       
   430 
       
   431             self.handle_connect_event()
       
   432         self.handle_write()
       
   433 
       
   434     def handle_expt_event(self):
       
   435         # if the handle_expt is the same default worthless method,
       
   436         # we'll not even bother calling it, we'll instead generate
       
   437         # a useful error
       
   438         x = True
       
   439         try:
       
   440             y1 = self.__class__.handle_expt.im_func
       
   441             y2 = dispatcher.handle_expt.im_func
       
   442             x = y1 is y2
       
   443         except AttributeError:
       
   444             pass
       
   445 
       
   446         if x:
       
   447             err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
       
   448             msg = _strerror(err)
       
   449 
       
   450             raise socket.error(err, msg)
       
   451         else:
       
   452             self.handle_expt()
       
   453 
       
   454     def handle_error(self):
       
   455         nil, t, v, tbinfo = compact_traceback()
       
   456 
       
   457         # sometimes a user repr method will crash.
       
   458         try:
       
   459             self_repr = repr(self)
       
   460         except:
       
   461             self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
       
   462 
       
   463         self.log_info(
       
   464             'uncaptured python exception, closing channel %s (%s:%s %s)' % (
       
   465                 self_repr,
       
   466                 t,
       
   467                 v,
       
   468                 tbinfo
       
   469                 ),
       
   470             'error'
       
   471             )
       
   472         self.handle_close()
       
   473 
       
   474     def handle_expt(self):
       
   475         self.log_info('unhandled exception', 'warning')
       
   476 
       
   477     def handle_read(self):
       
   478         self.log_info('unhandled read event', 'warning')
       
   479 
       
   480     def handle_write(self):
       
   481         self.log_info('unhandled write event', 'warning')
       
   482 
       
   483     def handle_connect(self):
       
   484         self.log_info('unhandled connect event', 'warning')
       
   485 
       
   486     def handle_accept(self):
       
   487         self.log_info('unhandled accept event', 'warning')
       
   488 
       
   489     def handle_close(self):
       
   490         self.log_info('unhandled close event', 'warning')
       
   491         self.close()
       
   492 
       
   493 # ---------------------------------------------------------------------------
       
   494 # adds simple buffered output capability, useful for simple clients.
       
   495 # [for more sophisticated usage use asynchat.async_chat]
       
   496 # ---------------------------------------------------------------------------
       
   497 
       
   498 class dispatcher_with_send(dispatcher):
       
   499 
       
   500     def __init__(self, sock=None, map=None):
       
   501         dispatcher.__init__(self, sock, map)
       
   502         self.out_buffer = ''
       
   503 
       
   504     def initiate_send(self):
       
   505         num_sent = 0
       
   506         num_sent = dispatcher.send(self, self.out_buffer[:512])
       
   507         self.out_buffer = self.out_buffer[num_sent:]
       
   508 
       
   509     def handle_write(self):
       
   510         self.initiate_send()
       
   511 
       
   512     def writable(self):
       
   513         return (not self.connected) or len(self.out_buffer)
       
   514 
       
   515     def send(self, data):
       
   516         if self.debug:
       
   517             self.log_info('sending %s' % repr(data))
       
   518         self.out_buffer = self.out_buffer + data
       
   519         self.initiate_send()
       
   520 
       
   521 # ---------------------------------------------------------------------------
       
   522 # used for debugging.
       
   523 # ---------------------------------------------------------------------------
       
   524 
       
   525 def compact_traceback():
       
   526     t, v, tb = sys.exc_info()
       
   527     tbinfo = []
       
   528     if not tb: # Must have a traceback
       
   529         raise AssertionError("traceback does not exist")
       
   530     while tb:
       
   531         tbinfo.append((
       
   532             tb.tb_frame.f_code.co_filename,
       
   533             tb.tb_frame.f_code.co_name,
       
   534             str(tb.tb_lineno)
       
   535             ))
       
   536         tb = tb.tb_next
       
   537 
       
   538     # just to be safe
       
   539     del tb
       
   540 
       
   541     file, function, line = tbinfo[-1]
       
   542     info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
       
   543     return (file, function, line), t, v, info
       
   544 
       
   545 def close_all(map=None, ignore_all=False):
       
   546     if map is None:
       
   547         map = socket_map
       
   548     for x in map.values():
       
   549         try:
       
   550             x.close()
       
   551         except OSError, x:
       
   552             if x.args[0] == EBADF:
       
   553                 pass
       
   554             elif not ignore_all:
       
   555                 raise
       
   556         except (ExitNow, KeyboardInterrupt, SystemExit):
       
   557             raise
       
   558         except:
       
   559             if not ignore_all:
       
   560                 raise
       
   561     map.clear()
       
   562 
       
   563 # Asynchronous File I/O:
       
   564 #
       
   565 # After a little research (reading man pages on various unixen, and
       
   566 # digging through the linux kernel), I've determined that select()
       
   567 # isn't meant for doing asynchronous file i/o.
       
   568 # Heartening, though - reading linux/mm/filemap.c shows that linux
       
   569 # supports asynchronous read-ahead.  So _MOST_ of the time, the data
       
   570 # will be sitting in memory for us already when we go to read it.
       
   571 #
       
   572 # What other OS's (besides NT) support async file i/o?  [VMS?]
       
   573 #
       
   574 # Regardless, this is useful for pipes, and stdin/stdout...
       
   575 
       
   576 if os.name == 'posix':
       
   577     import fcntl
       
   578 
       
   579     class file_wrapper:
       
   580         # Here we override just enough to make a file
       
   581         # look like a socket for the purposes of asyncore.
       
   582         # The passed fd is automatically os.dup()'d
       
   583 
       
   584         def __init__(self, fd):
       
   585             self.fd = os.dup(fd)
       
   586 
       
   587         def recv(self, *args):
       
   588             return os.read(self.fd, *args)
       
   589 
       
   590         def send(self, *args):
       
   591             return os.write(self.fd, *args)
       
   592 
       
   593         read = recv
       
   594         write = send
       
   595 
       
   596         def close(self):
       
   597             os.close(self.fd)
       
   598 
       
   599         def fileno(self):
       
   600             return self.fd
       
   601 
       
   602     class file_dispatcher(dispatcher):
       
   603 
       
   604         def __init__(self, fd, map=None):
       
   605             dispatcher.__init__(self, None, map)
       
   606             self.connected = True
       
   607             try:
       
   608                 fd = fd.fileno()
       
   609             except AttributeError:
       
   610                 pass
       
   611             self.set_file(fd)
       
   612             # set it to non-blocking mode
       
   613             flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
       
   614             flags = flags | os.O_NONBLOCK
       
   615             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
       
   616 
       
   617         def set_file(self, fd):
       
   618             self.socket = file_wrapper(fd)
       
   619             self._fileno = self.socket.fileno()
       
   620             self.add_channel()