62 |
60 |
63 def runloop(self, *args, **kwargs): |
61 def runloop(self, *args, **kwargs): |
64 """ |
62 """ |
65 meant to be called by worker thread (blocking) |
63 meant to be called by worker thread (blocking) |
66 """ |
64 """ |
67 self._threadID = _thread.get_ident() |
65 self._threadID = get_ident() |
68 self.mutex.acquire() |
66 self.mutex.acquire() |
69 self.enabled = True |
67 self.enabled = True |
70 if args or kwargs: |
68 if args or kwargs: |
71 _job = job(*args, **kwargs) |
69 _job = job(*args, **kwargs) |
72 _job.do() |
70 _job.do() |
73 # _job.success can't be None after do() |
71 # _job.success can't be None after do() |
74 if not _job.success: |
72 if not _job.success: |
75 self.reraise(_job) |
73 self.reraise(_job) |
76 |
74 |
77 while not self._finish: |
75 while not self._finish: |
78 self.todo.wait() |
76 self.todo.wait_for(self.job is not None) |
79 if self.job is not None: |
77 self.job.do() |
80 self.job.do() |
78 self.done.notify() |
81 self.done.notify() |
79 |
82 else: |
|
83 break |
|
84 |
|
85 self.mutex.release() |
80 self.mutex.release() |
86 |
81 |
87 def interleave(self, waker, stopper, *args, **kwargs): |
82 def interleave(self, waker, stopper, *args, **kwargs): |
88 """ |
83 """ |
89 as for twisted reactor's interleave, it passes all jobs to waker func |
84 as for twisted reactor's interleave, it passes all jobs to waker func |
90 additionaly, it creates a new thread to wait for new job. |
85 additionaly, it creates a new thread to wait for new job. |
91 """ |
86 """ |
92 self.feed = Condition(self.mutex) |
87 self.feed = Condition(self.mutex) |
93 self._threadID = _thread.get_ident() |
88 self._threadID = get_ident() |
94 self.stopper = stopper |
89 self.stopper = stopper |
|
90 |
|
91 def do_pending_job(): |
|
92 self.mutex.acquire() |
|
93 if self.job is not None: |
|
94 self.job.do() |
|
95 self.done.notify_all() |
|
96 self.mutex.release() |
95 |
97 |
96 def wakerfeedingloop(): |
98 def wakerfeedingloop(): |
97 self.mutex.acquire() |
99 self.mutex.acquire() |
98 self.enabled = True |
100 self.enabled = True |
|
101 |
|
102 # Handle first job |
99 if args or kwargs: |
103 if args or kwargs: |
100 def first_job_todo(): |
104 self.job = job(*args, **kwargs) |
101 _job = job(*args, **kwargs) |
105 waker(do_pending_job) |
102 _job.do() |
106 self.done.wait_for(lambda: self.job.success is not None) |
103 if not _job.success: |
107 if not self.job.success: |
104 self.reraise(_job) |
108 self.reraise(_job) |
105 self.mutex.acquire() |
109 self.job = None |
106 self.feed.notify() |
110 |
107 self.mutex.release() |
111 self.free.notify() |
108 waker(first_job_todo) |
|
109 self.feed.wait() |
|
110 |
112 |
111 while not self._finish: |
113 while not self._finish: |
112 self.todo.wait() |
114 self.todo.wait_for(lambda: self.job is not None) |
113 def job_todo(): |
|
114 self.mutex.acquire() |
|
115 if self.job is not None: |
|
116 self.job.do() |
|
117 self.feed.notify() |
|
118 self.done.notify() |
|
119 self.mutex.release() |
|
120 if self._finish: |
115 if self._finish: |
121 break |
116 break |
122 waker(job_todo) |
117 waker(do_pending_job) |
123 self.feed.wait() |
118 self.done.wait_for(lambda: self.job.success is not None) |
|
119 self.job = None |
|
120 self.free.notify() |
124 |
121 |
125 self.mutex.release() |
122 self.mutex.release() |
|
123 |
126 self.own_thread = Thread(target = wakerfeedingloop) |
124 self.own_thread = Thread(target = wakerfeedingloop) |
127 self.own_thread.start() |
125 self.own_thread.start() |
128 |
126 |
129 def stop(self): |
127 def stop(self): |
130 """ |
128 """ |
147 blocking until job done |
145 blocking until job done |
148 """ |
146 """ |
149 |
147 |
150 _job = job(*args, **kwargs) |
148 _job = job(*args, **kwargs) |
151 |
149 |
152 if self._threadID == _thread.get_ident(): |
150 if self._threadID == get_ident(): |
153 # if caller is worker thread execute immediately |
151 # if caller is worker thread execute immediately |
154 _job.do() |
152 _job.do() |
155 else: |
153 else: |
156 # otherwise notify and wait for completion |
154 # otherwise notify and wait for completion |
157 self.mutex.acquire() |
155 self.mutex.acquire() |
158 if not self.enabled: |
156 if not self.enabled: |
159 self.mutex.release() |
157 self.mutex.release() |
160 raise EOFError("Worker is disabled") |
158 raise EOFError("Worker is disabled") |
161 |
159 |
162 while self.job is not None: |
160 self.free.wait_for(lambda: self.job is None) |
163 self.free.wait() |
|
164 |
161 |
165 self.job = _job |
162 self.job = _job |
166 self.todo.notify() |
163 self.todo.notify() |
167 self.done.wait() |
164 self.done.wait_for(lambda: _job.success is not None) |
168 self.job = None |
|
169 self.free.notify() |
165 self.free.notify() |
170 self.mutex.release() |
166 self.mutex.release() |
171 |
167 |
172 if _job.success is None: |
168 if _job.success is None: |
173 raise EOFError("Worker job was interrupted") |
169 raise EOFError("Worker job was interrupted") |