|
1 # -*- Mode: Python -*- |
|
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp |
|
3 # Author: Sam Rushing <rushing@nightmare.com> |
|
4 |
|
5 # ====================================================================== |
|
6 # Copyright 1996 by Sam Rushing |
|
7 # |
|
8 # All Rights Reserved |
|
9 # |
|
10 # Permission to use, copy, modify, and distribute this software and |
|
11 # its documentation for any purpose and without fee is hereby |
|
12 # granted, provided that the above copyright notice appear in all |
|
13 # copies and that both that copyright notice and this permission |
|
14 # notice appear in supporting documentation, and that the name of Sam |
|
15 # Rushing not be used in advertising or publicity pertaining to |
|
16 # distribution of the software without specific, written prior |
|
17 # permission. |
|
18 # |
|
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, |
|
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN |
|
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR |
|
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS |
|
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, |
|
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN |
|
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
|
26 # ====================================================================== |
|
27 |
|
28 """Basic infrastructure for asynchronous socket service clients and servers. |
|
29 |
|
30 There are only two ways to have a program on a single processor do "more |
|
31 than one thing at a time". Multi-threaded programming is the simplest and |
|
32 most popular way to do it, but there is another very different technique, |
|
33 that lets you have nearly all the advantages of multi-threading, without |
|
34 actually using multiple threads. it's really only practical if your program |
|
35 is largely I/O bound. If your program is CPU bound, then pre-emptive |
|
36 scheduled threads are probably what you really need. Network servers are |
|
37 rarely CPU-bound, however. |
|
38 |
|
39 If your operating system supports the select() system call in its I/O |
|
40 library (and nearly all do), then you can use it to juggle multiple |
|
41 communication channels at once; doing other work while your I/O is taking |
|
42 place in the "background." Although this strategy can seem strange and |
|
43 complex, especially at first, it is in many ways easier to understand and |
|
44 control than multi-threaded programming. The module documented here solves |
|
45 many of the difficult problems for you, making the task of building |
|
46 sophisticated high-performance network servers and clients a snap. |
|
47 """ |
|
48 |
|
49 import select |
|
50 import socket |
|
51 import sys |
|
52 import time |
|
53 |
|
54 import os |
|
55 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ |
|
56 ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode |
|
57 |
|
58 try: |
|
59 socket_map |
|
60 except NameError: |
|
61 socket_map = {} |
|
62 |
|
63 def _strerror(err): |
|
64 res = os.strerror(err) |
|
65 if res == 'Unknown error': |
|
66 res = errorcode[err] |
|
67 return res |
|
68 |
|
69 class ExitNow(Exception): |
|
70 pass |
|
71 |
|
72 def read(obj): |
|
73 try: |
|
74 obj.handle_read_event() |
|
75 except (ExitNow, KeyboardInterrupt, SystemExit): |
|
76 raise |
|
77 except: |
|
78 obj.handle_error() |
|
79 |
|
80 def write(obj): |
|
81 try: |
|
82 obj.handle_write_event() |
|
83 except (ExitNow, KeyboardInterrupt, SystemExit): |
|
84 raise |
|
85 except: |
|
86 obj.handle_error() |
|
87 |
|
88 def _exception(obj): |
|
89 try: |
|
90 obj.handle_expt_event() |
|
91 except (ExitNow, KeyboardInterrupt, SystemExit): |
|
92 raise |
|
93 except: |
|
94 obj.handle_error() |
|
95 |
|
96 def readwrite(obj, flags): |
|
97 try: |
|
98 if flags & (select.POLLIN | select.POLLPRI): |
|
99 obj.handle_read_event() |
|
100 if flags & select.POLLOUT: |
|
101 obj.handle_write_event() |
|
102 if flags & (select.POLLERR | select.POLLNVAL): |
|
103 obj.handle_expt_event() |
|
104 if flags & select.POLLHUP: |
|
105 obj.handle_close() |
|
106 except (ExitNow, KeyboardInterrupt, SystemExit): |
|
107 raise |
|
108 except: |
|
109 obj.handle_error() |
|
110 |
|
111 def poll(timeout=0.0, map=None): |
|
112 if map is None: |
|
113 map = socket_map |
|
114 if map: |
|
115 r = []; w = []; e = [] |
|
116 for fd, obj in map.items(): |
|
117 is_r = obj.readable() |
|
118 is_w = obj.writable() |
|
119 if is_r: |
|
120 r.append(fd) |
|
121 if is_w: |
|
122 w.append(fd) |
|
123 if is_r or is_w: |
|
124 e.append(fd) |
|
125 if [] == r == w == e: |
|
126 time.sleep(timeout) |
|
127 return |
|
128 |
|
129 try: |
|
130 r, w, e = select.select(r, w, e, timeout) |
|
131 except select.error, err: |
|
132 if err.args[0] != EINTR: |
|
133 raise |
|
134 else: |
|
135 return |
|
136 |
|
137 for fd in r: |
|
138 obj = map.get(fd) |
|
139 if obj is None: |
|
140 continue |
|
141 read(obj) |
|
142 |
|
143 for fd in w: |
|
144 obj = map.get(fd) |
|
145 if obj is None: |
|
146 continue |
|
147 write(obj) |
|
148 |
|
149 for fd in e: |
|
150 obj = map.get(fd) |
|
151 if obj is None: |
|
152 continue |
|
153 _exception(obj) |
|
154 |
|
155 def poll2(timeout=0.0, map=None): |
|
156 # Use the poll() support added to the select module in Python 2.0 |
|
157 if map is None: |
|
158 map = socket_map |
|
159 if timeout is not None: |
|
160 # timeout is in milliseconds |
|
161 timeout = int(timeout*1000) |
|
162 pollster = select.poll() |
|
163 if map: |
|
164 for fd, obj in map.items(): |
|
165 flags = 0 |
|
166 if obj.readable(): |
|
167 flags |= select.POLLIN | select.POLLPRI |
|
168 if obj.writable(): |
|
169 flags |= select.POLLOUT |
|
170 if flags: |
|
171 # Only check for exceptions if object was either readable |
|
172 # or writable. |
|
173 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL |
|
174 pollster.register(fd, flags) |
|
175 try: |
|
176 r = pollster.poll(timeout) |
|
177 except select.error, err: |
|
178 if err.args[0] != EINTR: |
|
179 raise |
|
180 r = [] |
|
181 for fd, flags in r: |
|
182 obj = map.get(fd) |
|
183 if obj is None: |
|
184 continue |
|
185 readwrite(obj, flags) |
|
186 |
|
187 poll3 = poll2 # Alias for backward compatibility |
|
188 |
|
189 def loop(timeout=30.0, use_poll=False, map=None, count=None): |
|
190 if map is None: |
|
191 map = socket_map |
|
192 |
|
193 if use_poll and hasattr(select, 'poll'): |
|
194 poll_fun = poll2 |
|
195 else: |
|
196 poll_fun = poll |
|
197 |
|
198 if count is None: |
|
199 while map: |
|
200 poll_fun(timeout, map) |
|
201 |
|
202 else: |
|
203 while map and count > 0: |
|
204 poll_fun(timeout, map) |
|
205 count = count - 1 |
|
206 |
|
207 class dispatcher: |
|
208 |
|
209 debug = False |
|
210 connected = False |
|
211 accepting = False |
|
212 closing = False |
|
213 addr = None |
|
214 |
|
215 def __init__(self, sock=None, map=None): |
|
216 if map is None: |
|
217 self._map = socket_map |
|
218 else: |
|
219 self._map = map |
|
220 |
|
221 self._fileno = None |
|
222 |
|
223 if sock: |
|
224 # Set to nonblocking just to make sure for cases where we |
|
225 # get a socket from a blocking source. |
|
226 sock.setblocking(0) |
|
227 self.set_socket(sock, map) |
|
228 self.connected = True |
|
229 # The constructor no longer requires that the socket |
|
230 # passed be connected. |
|
231 try: |
|
232 self.addr = sock.getpeername() |
|
233 except socket.error, err: |
|
234 if err.args[0] == ENOTCONN: |
|
235 # To handle the case where we got an unconnected |
|
236 # socket. |
|
237 self.connected = False |
|
238 else: |
|
239 # The socket is broken in some unknown way, alert |
|
240 # the user and remove it from the map (to prevent |
|
241 # polling of broken sockets). |
|
242 self.del_channel(map) |
|
243 raise |
|
244 else: |
|
245 self.socket = None |
|
246 |
|
247 def __repr__(self): |
|
248 status = [self.__class__.__module__+"."+self.__class__.__name__] |
|
249 if self.accepting and self.addr: |
|
250 status.append('listening') |
|
251 elif self.connected: |
|
252 status.append('connected') |
|
253 if self.addr is not None: |
|
254 try: |
|
255 status.append('%s:%d' % self.addr) |
|
256 except TypeError: |
|
257 status.append(repr(self.addr)) |
|
258 return '<%s at %#x>' % (' '.join(status), id(self)) |
|
259 |
|
260 def add_channel(self, map=None): |
|
261 #self.log_info('adding channel %s' % self) |
|
262 if map is None: |
|
263 map = self._map |
|
264 map[self._fileno] = self |
|
265 |
|
266 def del_channel(self, map=None): |
|
267 fd = self._fileno |
|
268 if map is None: |
|
269 map = self._map |
|
270 if fd in map: |
|
271 #self.log_info('closing channel %d:%s' % (fd, self)) |
|
272 del map[fd] |
|
273 self._fileno = None |
|
274 |
|
275 def create_socket(self, family, type): |
|
276 self.family_and_type = family, type |
|
277 sock = socket.socket(family, type) |
|
278 sock.setblocking(0) |
|
279 self.set_socket(sock) |
|
280 |
|
281 def set_socket(self, sock, map=None): |
|
282 self.socket = sock |
|
283 ## self.__dict__['socket'] = sock |
|
284 self._fileno = sock.fileno() |
|
285 self.add_channel(map) |
|
286 |
|
287 def set_reuse_addr(self): |
|
288 # try to re-use a server port if possible |
|
289 try: |
|
290 self.socket.setsockopt( |
|
291 socket.SOL_SOCKET, socket.SO_REUSEADDR, |
|
292 self.socket.getsockopt(socket.SOL_SOCKET, |
|
293 socket.SO_REUSEADDR) | 1 |
|
294 ) |
|
295 except socket.error: |
|
296 pass |
|
297 |
|
298 # ================================================== |
|
299 # predicates for select() |
|
300 # these are used as filters for the lists of sockets |
|
301 # to pass to select(). |
|
302 # ================================================== |
|
303 |
|
304 def readable(self): |
|
305 return True |
|
306 |
|
307 def writable(self): |
|
308 return True |
|
309 |
|
310 # ================================================== |
|
311 # socket object methods. |
|
312 # ================================================== |
|
313 |
|
314 def listen(self, num): |
|
315 self.accepting = True |
|
316 if os.name == 'nt' and num > 5: |
|
317 num = 5 |
|
318 return self.socket.listen(num) |
|
319 |
|
320 def bind(self, addr): |
|
321 self.addr = addr |
|
322 return self.socket.bind(addr) |
|
323 |
|
324 def connect(self, address): |
|
325 self.connected = False |
|
326 err = self.socket.connect_ex(address) |
|
327 # XXX Should interpret Winsock return values |
|
328 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK): |
|
329 return |
|
330 if err in (0, EISCONN): |
|
331 self.addr = address |
|
332 self.handle_connect_event() |
|
333 else: |
|
334 raise socket.error(err, errorcode[err]) |
|
335 |
|
336 def accept(self): |
|
337 # XXX can return either an address pair or None |
|
338 try: |
|
339 conn, addr = self.socket.accept() |
|
340 return conn, addr |
|
341 except socket.error, why: |
|
342 if why.args[0] == EWOULDBLOCK: |
|
343 pass |
|
344 else: |
|
345 raise |
|
346 |
|
347 def send(self, data): |
|
348 try: |
|
349 result = self.socket.send(data) |
|
350 return result |
|
351 except socket.error, why: |
|
352 if why.args[0] == EWOULDBLOCK: |
|
353 return 0 |
|
354 elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED): |
|
355 self.handle_close() |
|
356 return 0 |
|
357 else: |
|
358 raise |
|
359 |
|
360 def recv(self, buffer_size): |
|
361 try: |
|
362 data = self.socket.recv(buffer_size) |
|
363 if not data: |
|
364 # a closed connection is indicated by signaling |
|
365 # a read condition, and having recv() return 0. |
|
366 self.handle_close() |
|
367 return '' |
|
368 else: |
|
369 return data |
|
370 except socket.error, why: |
|
371 # winsock sometimes throws ENOTCONN |
|
372 if why.args[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]: |
|
373 self.handle_close() |
|
374 return '' |
|
375 else: |
|
376 raise |
|
377 |
|
378 def close(self): |
|
379 self.connected = False |
|
380 self.accepting = False |
|
381 self.del_channel() |
|
382 try: |
|
383 self.socket.close() |
|
384 except socket.error, why: |
|
385 if why.args[0] not in (ENOTCONN, EBADF): |
|
386 raise |
|
387 |
|
388 # cheap inheritance, used to pass all other attribute |
|
389 # references to the underlying socket object. |
|
390 def __getattr__(self, attr): |
|
391 return getattr(self.socket, attr) |
|
392 |
|
393 # log and log_info may be overridden to provide more sophisticated |
|
394 # logging and warning methods. In general, log is for 'hit' logging |
|
395 # and 'log_info' is for informational, warning and error logging. |
|
396 |
|
397 def log(self, message): |
|
398 sys.stderr.write('log: %s\n' % str(message)) |
|
399 |
|
400 def log_info(self, message, type='info'): |
|
401 if __debug__ or type != 'info': |
|
402 print '%s: %s' % (type, message) |
|
403 |
|
404 def handle_read_event(self): |
|
405 if self.accepting: |
|
406 # accepting sockets are never connected, they "spawn" new |
|
407 # sockets that are connected |
|
408 self.handle_accept() |
|
409 elif not self.connected: |
|
410 self.handle_connect_event() |
|
411 self.handle_read() |
|
412 else: |
|
413 self.handle_read() |
|
414 |
|
415 def handle_connect_event(self): |
|
416 self.connected = True |
|
417 self.handle_connect() |
|
418 |
|
419 def handle_write_event(self): |
|
420 if self.accepting: |
|
421 # Accepting sockets shouldn't get a write event. |
|
422 # We will pretend it didn't happen. |
|
423 return |
|
424 |
|
425 if not self.connected: |
|
426 #check for errors |
|
427 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) |
|
428 if err != 0: |
|
429 raise socket.error(err, _strerror(err)) |
|
430 |
|
431 self.handle_connect_event() |
|
432 self.handle_write() |
|
433 |
|
434 def handle_expt_event(self): |
|
435 # if the handle_expt is the same default worthless method, |
|
436 # we'll not even bother calling it, we'll instead generate |
|
437 # a useful error |
|
438 x = True |
|
439 try: |
|
440 y1 = self.__class__.handle_expt.im_func |
|
441 y2 = dispatcher.handle_expt.im_func |
|
442 x = y1 is y2 |
|
443 except AttributeError: |
|
444 pass |
|
445 |
|
446 if x: |
|
447 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) |
|
448 msg = _strerror(err) |
|
449 |
|
450 raise socket.error(err, msg) |
|
451 else: |
|
452 self.handle_expt() |
|
453 |
|
454 def handle_error(self): |
|
455 nil, t, v, tbinfo = compact_traceback() |
|
456 |
|
457 # sometimes a user repr method will crash. |
|
458 try: |
|
459 self_repr = repr(self) |
|
460 except: |
|
461 self_repr = '<__repr__(self) failed for object at %0x>' % id(self) |
|
462 |
|
463 self.log_info( |
|
464 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( |
|
465 self_repr, |
|
466 t, |
|
467 v, |
|
468 tbinfo |
|
469 ), |
|
470 'error' |
|
471 ) |
|
472 self.handle_close() |
|
473 |
|
474 def handle_expt(self): |
|
475 self.log_info('unhandled exception', 'warning') |
|
476 |
|
477 def handle_read(self): |
|
478 self.log_info('unhandled read event', 'warning') |
|
479 |
|
480 def handle_write(self): |
|
481 self.log_info('unhandled write event', 'warning') |
|
482 |
|
483 def handle_connect(self): |
|
484 self.log_info('unhandled connect event', 'warning') |
|
485 |
|
486 def handle_accept(self): |
|
487 self.log_info('unhandled accept event', 'warning') |
|
488 |
|
489 def handle_close(self): |
|
490 self.log_info('unhandled close event', 'warning') |
|
491 self.close() |
|
492 |
|
493 # --------------------------------------------------------------------------- |
|
494 # adds simple buffered output capability, useful for simple clients. |
|
495 # [for more sophisticated usage use asynchat.async_chat] |
|
496 # --------------------------------------------------------------------------- |
|
497 |
|
498 class dispatcher_with_send(dispatcher): |
|
499 |
|
500 def __init__(self, sock=None, map=None): |
|
501 dispatcher.__init__(self, sock, map) |
|
502 self.out_buffer = '' |
|
503 |
|
504 def initiate_send(self): |
|
505 num_sent = 0 |
|
506 num_sent = dispatcher.send(self, self.out_buffer[:512]) |
|
507 self.out_buffer = self.out_buffer[num_sent:] |
|
508 |
|
509 def handle_write(self): |
|
510 self.initiate_send() |
|
511 |
|
512 def writable(self): |
|
513 return (not self.connected) or len(self.out_buffer) |
|
514 |
|
515 def send(self, data): |
|
516 if self.debug: |
|
517 self.log_info('sending %s' % repr(data)) |
|
518 self.out_buffer = self.out_buffer + data |
|
519 self.initiate_send() |
|
520 |
|
521 # --------------------------------------------------------------------------- |
|
522 # used for debugging. |
|
523 # --------------------------------------------------------------------------- |
|
524 |
|
525 def compact_traceback(): |
|
526 t, v, tb = sys.exc_info() |
|
527 tbinfo = [] |
|
528 if not tb: # Must have a traceback |
|
529 raise AssertionError("traceback does not exist") |
|
530 while tb: |
|
531 tbinfo.append(( |
|
532 tb.tb_frame.f_code.co_filename, |
|
533 tb.tb_frame.f_code.co_name, |
|
534 str(tb.tb_lineno) |
|
535 )) |
|
536 tb = tb.tb_next |
|
537 |
|
538 # just to be safe |
|
539 del tb |
|
540 |
|
541 file, function, line = tbinfo[-1] |
|
542 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) |
|
543 return (file, function, line), t, v, info |
|
544 |
|
545 def close_all(map=None, ignore_all=False): |
|
546 if map is None: |
|
547 map = socket_map |
|
548 for x in map.values(): |
|
549 try: |
|
550 x.close() |
|
551 except OSError, x: |
|
552 if x.args[0] == EBADF: |
|
553 pass |
|
554 elif not ignore_all: |
|
555 raise |
|
556 except (ExitNow, KeyboardInterrupt, SystemExit): |
|
557 raise |
|
558 except: |
|
559 if not ignore_all: |
|
560 raise |
|
561 map.clear() |
|
562 |
|
563 # Asynchronous File I/O: |
|
564 # |
|
565 # After a little research (reading man pages on various unixen, and |
|
566 # digging through the linux kernel), I've determined that select() |
|
567 # isn't meant for doing asynchronous file i/o. |
|
568 # Heartening, though - reading linux/mm/filemap.c shows that linux |
|
569 # supports asynchronous read-ahead. So _MOST_ of the time, the data |
|
570 # will be sitting in memory for us already when we go to read it. |
|
571 # |
|
572 # What other OS's (besides NT) support async file i/o? [VMS?] |
|
573 # |
|
574 # Regardless, this is useful for pipes, and stdin/stdout... |
|
575 |
|
576 if os.name == 'posix': |
|
577 import fcntl |
|
578 |
|
579 class file_wrapper: |
|
580 # Here we override just enough to make a file |
|
581 # look like a socket for the purposes of asyncore. |
|
582 # The passed fd is automatically os.dup()'d |
|
583 |
|
584 def __init__(self, fd): |
|
585 self.fd = os.dup(fd) |
|
586 |
|
587 def recv(self, *args): |
|
588 return os.read(self.fd, *args) |
|
589 |
|
590 def send(self, *args): |
|
591 return os.write(self.fd, *args) |
|
592 |
|
593 read = recv |
|
594 write = send |
|
595 |
|
596 def close(self): |
|
597 os.close(self.fd) |
|
598 |
|
599 def fileno(self): |
|
600 return self.fd |
|
601 |
|
602 class file_dispatcher(dispatcher): |
|
603 |
|
604 def __init__(self, fd, map=None): |
|
605 dispatcher.__init__(self, None, map) |
|
606 self.connected = True |
|
607 try: |
|
608 fd = fd.fileno() |
|
609 except AttributeError: |
|
610 pass |
|
611 self.set_file(fd) |
|
612 # set it to non-blocking mode |
|
613 flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) |
|
614 flags = flags | os.O_NONBLOCK |
|
615 fcntl.fcntl(fd, fcntl.F_SETFL, flags) |
|
616 |
|
617 def set_file(self, fd): |
|
618 self.socket = file_wrapper(fd) |
|
619 self._fileno = self.socket.fileno() |
|
620 self.add_channel() |