equal
deleted
inserted
replaced
50 self.todo = Condition(self.mutex) |
50 self.todo = Condition(self.mutex) |
51 self.done = Condition(self.mutex) |
51 self.done = Condition(self.mutex) |
52 self.free = Condition(self.mutex) |
52 self.free = Condition(self.mutex) |
53 self.job = None |
53 self.job = None |
54 self.enabled = False |
54 self.enabled = False |
|
55 self.stopper = None |
|
56 self.own_thread = None |
55 |
57 |
56 def reraise(self, job): |
58 def reraise(self, job): |
57 """ |
59 """ |
58 reraise exception happend in a job |
60 reraise exception happend in a job |
59 @param job: job where original exception happend |
61 @param job: job where original exception happend |
85 else: |
87 else: |
86 break |
88 break |
87 |
89 |
88 self.mutex.release() |
90 self.mutex.release() |
89 |
91 |
90 def interleave(self, waker, *args, **kwargs): |
92 def interleave(self, waker, stopper, *args, **kwargs): |
91 """ |
93 """ |
92 as for twisted reactor's interleave, it passes all jobs to waker func |
94 as for twisted reactor's interleave, it passes all jobs to waker func |
93 additionaly, it creates a new thread to wait for new job. |
95 additionaly, it creates a new thread to wait for new job. |
94 """ |
96 """ |
95 self.feed = Condition(self.mutex) |
97 self.feed = Condition(self.mutex) |
96 self._threadID = _thread.get_ident() |
98 self._threadID = _thread.get_ident() |
|
99 self.stopper = stopper |
97 |
100 |
98 def wakerfeedingloop(): |
101 def wakerfeedingloop(): |
99 self.mutex.acquire() |
102 self.mutex.acquire() |
100 self.enabled = True |
103 self.enabled = True |
101 if args or kwargs: |
104 if args or kwargs: |
117 if self.job is not None: |
120 if self.job is not None: |
118 self.job.do() |
121 self.job.do() |
119 self.feed.notify() |
122 self.feed.notify() |
120 self.done.notify() |
123 self.done.notify() |
121 self.mutex.release() |
124 self.mutex.release() |
|
125 if self._finish: |
|
126 break |
122 waker(job_todo) |
127 waker(job_todo) |
123 self.feed.wait() |
128 self.feed.wait() |
124 |
129 |
125 self.mutex.release() |
130 self.mutex.release() |
126 self.own_thread = Thread(target = wakerfeedingloop) |
131 self.own_thread = Thread(target = wakerfeedingloop) |
187 self.enabled = False |
192 self.enabled = False |
188 self.job = None |
193 self.job = None |
189 self.todo.notify() |
194 self.todo.notify() |
190 self.done.notify() |
195 self.done.notify() |
191 self.mutex.release() |
196 self.mutex.release() |
|
197 |
|
198 def finish(self): |
|
199 if self.own_thread is None: |
|
200 self.quit() |
|
201 if self.stopper is not None: |
|
202 self.stopper() |