|
1 #!/usr/bin/env python |
|
2 # -*- coding: utf-8 -*- |
|
3 |
|
4 # This file is part of Beremiz runtime. |
|
5 # |
|
6 # Copyright (C) 2018: Edouard TISSERANT |
|
7 # |
|
8 # See COPYING.Runtime file for copyrights details. |
|
9 |
|
10 from __future__ import absolute_import |
|
11 import thread |
|
12 from threading import Lock, Condition |
|
13 |
|
14 class job(object): |
|
15 """ |
|
16 job to be executed by a worker |
|
17 """ |
|
18 def __init__(self, call, *args, **kwargs): |
|
19 self.job = (call, args, kwargs) |
|
20 self.result = None |
|
21 self.success = False |
|
22 self.exc_info = None |
|
23 |
|
24 def do(self): |
|
25 """ |
|
26 do the job by executing the call, and deal with exceptions |
|
27 """ |
|
28 try: |
|
29 call, args, kwargs = self.job |
|
30 self.result = call(*args, **kwargs) |
|
31 self.success = True |
|
32 except Exception: |
|
33 self.success = False |
|
34 self.exc_info = sys.exc_info() |
|
35 |
|
36 |
|
37 class worker(object): |
|
38 """ |
|
39 serialize main thread load/unload of PLC shared objects |
|
40 """ |
|
41 def __init__(self): |
|
42 # Only one job at a time |
|
43 self._finish = False |
|
44 self._threadID = None |
|
45 self.mutex = Lock() |
|
46 self.todo = Condition(self.mutex) |
|
47 self.done = Condition(self.mutex) |
|
48 self.free = Condition(self.mutex) |
|
49 self.job = None |
|
50 |
|
51 def runloop(self, *args, **kwargs): |
|
52 """ |
|
53 meant to be called by worker thread (blocking) |
|
54 """ |
|
55 self._threadID = thread.get_ident() |
|
56 if args or kwargs: |
|
57 job(*args, **kwargs).do() |
|
58 # result is ignored |
|
59 self.mutex.acquire() |
|
60 while not self._finish: |
|
61 self.todo.wait() |
|
62 if self.job is not None: |
|
63 self.job.do() |
|
64 self.done.notify() |
|
65 else: |
|
66 self.free.notify() |
|
67 self.mutex.release() |
|
68 |
|
69 def call(self, *args, **kwargs): |
|
70 """ |
|
71 creates a job, execute it in worker thread, and deliver result. |
|
72 if job execution raise exception, re-raise same exception |
|
73 meant to be called by non-worker threads, but this is accepted. |
|
74 blocking until job done |
|
75 """ |
|
76 |
|
77 _job = job(*args, **kwargs) |
|
78 |
|
79 if self._threadID == thread.get_ident() or self._threadID is None: |
|
80 # if caller is worker thread execute immediately |
|
81 _job.do() |
|
82 else: |
|
83 # otherwise notify and wait for completion |
|
84 self.mutex.acquire() |
|
85 |
|
86 while self.job is not None: |
|
87 self.free.wait() |
|
88 |
|
89 self.job = _job |
|
90 self.todo.notify() |
|
91 self.done.wait() |
|
92 _job = self.job |
|
93 self.job = None |
|
94 self.mutex.release() |
|
95 |
|
96 if _job.success: |
|
97 return _job.result |
|
98 else: |
|
99 raise _job.exc_info[0], _job.exc_info[1], _job.exc_info[2] |
|
100 |
|
101 def quit(self): |
|
102 """ |
|
103 unblocks main thread, and terminate execution of runloop() |
|
104 """ |
|
105 # mark queue |
|
106 self._finish = True |
|
107 self.mutex.acquire() |
|
108 self.job = None |
|
109 self.todo.notify() |
|
110 self.mutex.release() |