|
1 # A parallelized "find(1)" using the thread module. |
|
2 |
|
3 # This demonstrates the use of a work queue and worker threads. |
|
4 # It really does do more stats/sec when using multiple threads, |
|
5 # although the improvement is only about 20-30 percent. |
|
6 # (That was 8 years ago. In 2002, on Linux, I can't measure |
|
7 # a speedup. :-( ) |
|
8 |
|
9 # I'm too lazy to write a command line parser for the full find(1) |
|
10 # command line syntax, so the predicate it searches for is wired-in, |
|
11 # see function selector() below. (It currently searches for files with |
|
12 # world write permission.) |
|
13 |
|
14 # Usage: parfind.py [-w nworkers] [directory] ... |
|
15 # Default nworkers is 4 |
|
16 |
|
17 |
|
18 import sys |
|
19 import getopt |
|
20 import string |
|
21 import time |
|
22 import os |
|
23 from stat import * |
|
24 import thread |
|
25 |
|
26 |
|
27 # Work queue class. Usage: |
|
28 # wq = WorkQ() |
|
29 # wq.addwork(func, (arg1, arg2, ...)) # one or more calls |
|
30 # wq.run(nworkers) |
|
31 # The work is done when wq.run() completes. |
|
32 # The function calls executed by the workers may add more work. |
|
33 # Don't use keyboard interrupts! |
|
34 |
|
35 class WorkQ: |
|
36 |
|
37 # Invariants: |
|
38 |
|
39 # - busy and work are only modified when mutex is locked |
|
40 # - len(work) is the number of jobs ready to be taken |
|
41 # - busy is the number of jobs being done |
|
42 # - todo is locked iff there is no work and somebody is busy |
|
43 |
|
44 def __init__(self): |
|
45 self.mutex = thread.allocate() |
|
46 self.todo = thread.allocate() |
|
47 self.todo.acquire() |
|
48 self.work = [] |
|
49 self.busy = 0 |
|
50 |
|
51 def addwork(self, func, args): |
|
52 job = (func, args) |
|
53 self.mutex.acquire() |
|
54 self.work.append(job) |
|
55 self.mutex.release() |
|
56 if len(self.work) == 1: |
|
57 self.todo.release() |
|
58 |
|
59 def _getwork(self): |
|
60 self.todo.acquire() |
|
61 self.mutex.acquire() |
|
62 if self.busy == 0 and len(self.work) == 0: |
|
63 self.mutex.release() |
|
64 self.todo.release() |
|
65 return None |
|
66 job = self.work[0] |
|
67 del self.work[0] |
|
68 self.busy = self.busy + 1 |
|
69 self.mutex.release() |
|
70 if len(self.work) > 0: |
|
71 self.todo.release() |
|
72 return job |
|
73 |
|
74 def _donework(self): |
|
75 self.mutex.acquire() |
|
76 self.busy = self.busy - 1 |
|
77 if self.busy == 0 and len(self.work) == 0: |
|
78 self.todo.release() |
|
79 self.mutex.release() |
|
80 |
|
81 def _worker(self): |
|
82 time.sleep(0.00001) # Let other threads run |
|
83 while 1: |
|
84 job = self._getwork() |
|
85 if not job: |
|
86 break |
|
87 func, args = job |
|
88 apply(func, args) |
|
89 self._donework() |
|
90 |
|
91 def run(self, nworkers): |
|
92 if not self.work: |
|
93 return # Nothing to do |
|
94 for i in range(nworkers-1): |
|
95 thread.start_new(self._worker, ()) |
|
96 self._worker() |
|
97 self.todo.acquire() |
|
98 |
|
99 |
|
100 # Main program |
|
101 |
|
102 def main(): |
|
103 nworkers = 4 |
|
104 opts, args = getopt.getopt(sys.argv[1:], '-w:') |
|
105 for opt, arg in opts: |
|
106 if opt == '-w': |
|
107 nworkers = string.atoi(arg) |
|
108 if not args: |
|
109 args = [os.curdir] |
|
110 |
|
111 wq = WorkQ() |
|
112 for dir in args: |
|
113 wq.addwork(find, (dir, selector, wq)) |
|
114 |
|
115 t1 = time.time() |
|
116 wq.run(nworkers) |
|
117 t2 = time.time() |
|
118 |
|
119 sys.stderr.write('Total time %r sec.\n' % (t2-t1)) |
|
120 |
|
121 |
|
122 # The predicate -- defines what files we look for. |
|
123 # Feel free to change this to suit your purpose |
|
124 |
|
125 def selector(dir, name, fullname, stat): |
|
126 # Look for world writable files that are not symlinks |
|
127 return (stat[ST_MODE] & 0002) != 0 and not S_ISLNK(stat[ST_MODE]) |
|
128 |
|
129 |
|
130 # The find procedure -- calls wq.addwork() for subdirectories |
|
131 |
|
132 def find(dir, pred, wq): |
|
133 try: |
|
134 names = os.listdir(dir) |
|
135 except os.error, msg: |
|
136 print repr(dir), ':', msg |
|
137 return |
|
138 for name in names: |
|
139 if name not in (os.curdir, os.pardir): |
|
140 fullname = os.path.join(dir, name) |
|
141 try: |
|
142 stat = os.lstat(fullname) |
|
143 except os.error, msg: |
|
144 print repr(fullname), ':', msg |
|
145 continue |
|
146 if pred(dir, name, fullname, stat): |
|
147 print fullname |
|
148 if S_ISDIR(stat[ST_MODE]): |
|
149 if not os.path.ismount(fullname): |
|
150 wq.addwork(find, (fullname, pred, wq)) |
|
151 |
|
152 |
|
153 # Call the main program |
|
154 |
|
155 main() |