|
1 import pprint |
|
2 import sys |
|
3 import unittest |
|
4 |
|
5 from test import test_support |
|
6 |
|
7 |
|
8 class HookWatcher: |
|
9 def __init__(self): |
|
10 self.frames = [] |
|
11 self.events = [] |
|
12 |
|
13 def callback(self, frame, event, arg): |
|
14 if (event == "call" |
|
15 or event == "return" |
|
16 or event == "exception"): |
|
17 self.add_event(event, frame) |
|
18 |
|
19 def add_event(self, event, frame=None): |
|
20 """Add an event to the log.""" |
|
21 if frame is None: |
|
22 frame = sys._getframe(1) |
|
23 |
|
24 try: |
|
25 frameno = self.frames.index(frame) |
|
26 except ValueError: |
|
27 frameno = len(self.frames) |
|
28 self.frames.append(frame) |
|
29 |
|
30 self.events.append((frameno, event, ident(frame))) |
|
31 |
|
32 def get_events(self): |
|
33 """Remove calls to add_event().""" |
|
34 disallowed = [ident(self.add_event.im_func), ident(ident)] |
|
35 self.frames = None |
|
36 |
|
37 return [item for item in self.events if item[2] not in disallowed] |
|
38 |
|
39 |
|
40 class ProfileSimulator(HookWatcher): |
|
41 def __init__(self, testcase): |
|
42 self.testcase = testcase |
|
43 self.stack = [] |
|
44 HookWatcher.__init__(self) |
|
45 |
|
46 def callback(self, frame, event, arg): |
|
47 # Callback registered with sys.setprofile()/sys.settrace() |
|
48 self.dispatch[event](self, frame) |
|
49 |
|
50 def trace_call(self, frame): |
|
51 self.add_event('call', frame) |
|
52 self.stack.append(frame) |
|
53 |
|
54 def trace_return(self, frame): |
|
55 self.add_event('return', frame) |
|
56 self.stack.pop() |
|
57 |
|
58 def trace_exception(self, frame): |
|
59 self.testcase.fail( |
|
60 "the profiler should never receive exception events") |
|
61 |
|
62 def trace_pass(self, frame): |
|
63 pass |
|
64 |
|
65 dispatch = { |
|
66 'call': trace_call, |
|
67 'exception': trace_exception, |
|
68 'return': trace_return, |
|
69 'c_call': trace_pass, |
|
70 'c_return': trace_pass, |
|
71 'c_exception': trace_pass, |
|
72 } |
|
73 |
|
74 |
|
75 class TestCaseBase(unittest.TestCase): |
|
76 def check_events(self, callable, expected): |
|
77 events = capture_events(callable, self.new_watcher()) |
|
78 if events != expected: |
|
79 self.fail("Expected events:\n%s\nReceived events:\n%s" |
|
80 % (pprint.pformat(expected), pprint.pformat(events))) |
|
81 |
|
82 |
|
83 class ProfileHookTestCase(TestCaseBase): |
|
84 def new_watcher(self): |
|
85 return HookWatcher() |
|
86 |
|
87 def test_simple(self): |
|
88 def f(p): |
|
89 pass |
|
90 f_ident = ident(f) |
|
91 self.check_events(f, [(1, 'call', f_ident), |
|
92 (1, 'return', f_ident), |
|
93 ]) |
|
94 |
|
95 def test_exception(self): |
|
96 def f(p): |
|
97 1/0 |
|
98 f_ident = ident(f) |
|
99 self.check_events(f, [(1, 'call', f_ident), |
|
100 (1, 'return', f_ident), |
|
101 ]) |
|
102 |
|
103 def test_caught_exception(self): |
|
104 def f(p): |
|
105 try: 1/0 |
|
106 except: pass |
|
107 f_ident = ident(f) |
|
108 self.check_events(f, [(1, 'call', f_ident), |
|
109 (1, 'return', f_ident), |
|
110 ]) |
|
111 |
|
112 def test_caught_nested_exception(self): |
|
113 def f(p): |
|
114 try: 1/0 |
|
115 except: pass |
|
116 f_ident = ident(f) |
|
117 self.check_events(f, [(1, 'call', f_ident), |
|
118 (1, 'return', f_ident), |
|
119 ]) |
|
120 |
|
121 def test_nested_exception(self): |
|
122 def f(p): |
|
123 1/0 |
|
124 f_ident = ident(f) |
|
125 self.check_events(f, [(1, 'call', f_ident), |
|
126 # This isn't what I expected: |
|
127 # (0, 'exception', protect_ident), |
|
128 # I expected this again: |
|
129 (1, 'return', f_ident), |
|
130 ]) |
|
131 |
|
132 def test_exception_in_except_clause(self): |
|
133 def f(p): |
|
134 1/0 |
|
135 def g(p): |
|
136 try: |
|
137 f(p) |
|
138 except: |
|
139 try: f(p) |
|
140 except: pass |
|
141 f_ident = ident(f) |
|
142 g_ident = ident(g) |
|
143 self.check_events(g, [(1, 'call', g_ident), |
|
144 (2, 'call', f_ident), |
|
145 (2, 'return', f_ident), |
|
146 (3, 'call', f_ident), |
|
147 (3, 'return', f_ident), |
|
148 (1, 'return', g_ident), |
|
149 ]) |
|
150 |
|
151 def test_exception_propogation(self): |
|
152 def f(p): |
|
153 1/0 |
|
154 def g(p): |
|
155 try: f(p) |
|
156 finally: p.add_event("falling through") |
|
157 f_ident = ident(f) |
|
158 g_ident = ident(g) |
|
159 self.check_events(g, [(1, 'call', g_ident), |
|
160 (2, 'call', f_ident), |
|
161 (2, 'return', f_ident), |
|
162 (1, 'falling through', g_ident), |
|
163 (1, 'return', g_ident), |
|
164 ]) |
|
165 |
|
166 def test_raise_twice(self): |
|
167 def f(p): |
|
168 try: 1/0 |
|
169 except: 1/0 |
|
170 f_ident = ident(f) |
|
171 self.check_events(f, [(1, 'call', f_ident), |
|
172 (1, 'return', f_ident), |
|
173 ]) |
|
174 |
|
175 def test_raise_reraise(self): |
|
176 def f(p): |
|
177 try: 1/0 |
|
178 except: raise |
|
179 f_ident = ident(f) |
|
180 self.check_events(f, [(1, 'call', f_ident), |
|
181 (1, 'return', f_ident), |
|
182 ]) |
|
183 |
|
184 def test_raise(self): |
|
185 def f(p): |
|
186 raise Exception() |
|
187 f_ident = ident(f) |
|
188 self.check_events(f, [(1, 'call', f_ident), |
|
189 (1, 'return', f_ident), |
|
190 ]) |
|
191 |
|
192 def test_distant_exception(self): |
|
193 def f(): |
|
194 1/0 |
|
195 def g(): |
|
196 f() |
|
197 def h(): |
|
198 g() |
|
199 def i(): |
|
200 h() |
|
201 def j(p): |
|
202 i() |
|
203 f_ident = ident(f) |
|
204 g_ident = ident(g) |
|
205 h_ident = ident(h) |
|
206 i_ident = ident(i) |
|
207 j_ident = ident(j) |
|
208 self.check_events(j, [(1, 'call', j_ident), |
|
209 (2, 'call', i_ident), |
|
210 (3, 'call', h_ident), |
|
211 (4, 'call', g_ident), |
|
212 (5, 'call', f_ident), |
|
213 (5, 'return', f_ident), |
|
214 (4, 'return', g_ident), |
|
215 (3, 'return', h_ident), |
|
216 (2, 'return', i_ident), |
|
217 (1, 'return', j_ident), |
|
218 ]) |
|
219 |
|
220 def test_generator(self): |
|
221 def f(): |
|
222 for i in range(2): |
|
223 yield i |
|
224 def g(p): |
|
225 for i in f(): |
|
226 pass |
|
227 f_ident = ident(f) |
|
228 g_ident = ident(g) |
|
229 self.check_events(g, [(1, 'call', g_ident), |
|
230 # call the iterator twice to generate values |
|
231 (2, 'call', f_ident), |
|
232 (2, 'return', f_ident), |
|
233 (2, 'call', f_ident), |
|
234 (2, 'return', f_ident), |
|
235 # once more; returns end-of-iteration with |
|
236 # actually raising an exception |
|
237 (2, 'call', f_ident), |
|
238 (2, 'return', f_ident), |
|
239 (1, 'return', g_ident), |
|
240 ]) |
|
241 |
|
242 def test_stop_iteration(self): |
|
243 def f(): |
|
244 for i in range(2): |
|
245 yield i |
|
246 raise StopIteration |
|
247 def g(p): |
|
248 for i in f(): |
|
249 pass |
|
250 f_ident = ident(f) |
|
251 g_ident = ident(g) |
|
252 self.check_events(g, [(1, 'call', g_ident), |
|
253 # call the iterator twice to generate values |
|
254 (2, 'call', f_ident), |
|
255 (2, 'return', f_ident), |
|
256 (2, 'call', f_ident), |
|
257 (2, 'return', f_ident), |
|
258 # once more to hit the raise: |
|
259 (2, 'call', f_ident), |
|
260 (2, 'return', f_ident), |
|
261 (1, 'return', g_ident), |
|
262 ]) |
|
263 |
|
264 |
|
265 class ProfileSimulatorTestCase(TestCaseBase): |
|
266 def new_watcher(self): |
|
267 return ProfileSimulator(self) |
|
268 |
|
269 def test_simple(self): |
|
270 def f(p): |
|
271 pass |
|
272 f_ident = ident(f) |
|
273 self.check_events(f, [(1, 'call', f_ident), |
|
274 (1, 'return', f_ident), |
|
275 ]) |
|
276 |
|
277 def test_basic_exception(self): |
|
278 def f(p): |
|
279 1/0 |
|
280 f_ident = ident(f) |
|
281 self.check_events(f, [(1, 'call', f_ident), |
|
282 (1, 'return', f_ident), |
|
283 ]) |
|
284 |
|
285 def test_caught_exception(self): |
|
286 def f(p): |
|
287 try: 1/0 |
|
288 except: pass |
|
289 f_ident = ident(f) |
|
290 self.check_events(f, [(1, 'call', f_ident), |
|
291 (1, 'return', f_ident), |
|
292 ]) |
|
293 |
|
294 def test_distant_exception(self): |
|
295 def f(): |
|
296 1/0 |
|
297 def g(): |
|
298 f() |
|
299 def h(): |
|
300 g() |
|
301 def i(): |
|
302 h() |
|
303 def j(p): |
|
304 i() |
|
305 f_ident = ident(f) |
|
306 g_ident = ident(g) |
|
307 h_ident = ident(h) |
|
308 i_ident = ident(i) |
|
309 j_ident = ident(j) |
|
310 self.check_events(j, [(1, 'call', j_ident), |
|
311 (2, 'call', i_ident), |
|
312 (3, 'call', h_ident), |
|
313 (4, 'call', g_ident), |
|
314 (5, 'call', f_ident), |
|
315 (5, 'return', f_ident), |
|
316 (4, 'return', g_ident), |
|
317 (3, 'return', h_ident), |
|
318 (2, 'return', i_ident), |
|
319 (1, 'return', j_ident), |
|
320 ]) |
|
321 |
|
322 |
|
323 def ident(function): |
|
324 if hasattr(function, "f_code"): |
|
325 code = function.f_code |
|
326 else: |
|
327 code = function.func_code |
|
328 return code.co_firstlineno, code.co_name |
|
329 |
|
330 |
|
331 def protect(f, p): |
|
332 try: f(p) |
|
333 except: pass |
|
334 |
|
335 protect_ident = ident(protect) |
|
336 |
|
337 |
|
338 def capture_events(callable, p=None): |
|
339 try: |
|
340 sys.setprofile() |
|
341 except TypeError: |
|
342 pass |
|
343 else: |
|
344 raise test_support.TestFailed( |
|
345 'sys.setprofile() did not raise TypeError') |
|
346 |
|
347 if p is None: |
|
348 p = HookWatcher() |
|
349 sys.setprofile(p.callback) |
|
350 protect(callable, p) |
|
351 sys.setprofile(None) |
|
352 return p.get_events()[1:-1] |
|
353 |
|
354 |
|
355 def show_events(callable): |
|
356 import pprint |
|
357 pprint.pprint(capture_events(callable)) |
|
358 |
|
359 |
|
360 def test_main(): |
|
361 test_support.run_unittest( |
|
362 ProfileHookTestCase, |
|
363 ProfileSimulatorTestCase |
|
364 ) |
|
365 |
|
366 |
|
367 if __name__ == "__main__": |
|
368 test_main() |