connectors/WAMP/__init__.py
changeset 1441 826730e60407
parent 1440 e8daabf2c438
child 1443 ff8a22d45c44
equal deleted inserted replaced
1440:e8daabf2c438 1441:826730e60407
    17 #
    17 #
    18 #You should have received a copy of the GNU General Public
    18 #You should have received a copy of the GNU General Public
    19 #License along with this library; if not, write to the Free Software
    19 #License along with this library; if not, write to the Free Software
    20 #Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
    20 #Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
    21 
    21 
    22 import sys, traceback
    22 import sys, traceback, atexit
    23 #from twisted.python import log
    23 #from twisted.python import log
    24 from twisted.internet import reactor, threads
    24 from twisted.internet import reactor, threads
    25 from autobahn.twisted import wamp
    25 from autobahn.twisted import wamp
    26 from autobahn.twisted.websocket import WampWebSocketClientFactory, connectWS
    26 from autobahn.twisted.websocket import WampWebSocketClientFactory, connectWS
    27 from autobahn.wamp import types
    27 from autobahn.wamp import types
    28 from autobahn.wamp.exception import TransportLost
    28 from autobahn.wamp.exception import TransportLost
    29 from autobahn.wamp.serializer import MsgPackSerializer
    29 from autobahn.wamp.serializer import MsgPackSerializer
    30 from threading import Thread, Event
    30 from threading import Thread, Event
    31 
    31 
    32 _WampSession = None
    32 _WampSession = None
    33 _ReactorThread = None
    33 _WampConnection = None
    34 _WampSessionEvent = Event()
    34 _WampSessionEvent = Event()
    35 
    35 
    36 class WampSession(wamp.ApplicationSession):
    36 class WampSession(wamp.ApplicationSession):
    37     def onJoin(self, details):
    37     def onJoin(self, details):
    38         global _WampSession
    38         global _WampSession, _WampSessionEvent
    39         _WampSession = self
    39         _WampSession = self
    40         _WampSessionEvent.set()
    40         _WampSessionEvent.set()
    41         print 'WAMP session joined for :', self.config.extra["ID"]
    41         print 'WAMP session joined for :', self.config.extra["ID"]
    42 
    42 
    43     def onLeave(self, details):
    43     def onLeave(self, details):
    44         global _WampSession
    44         global _WampSession, _WampSessionEvent
    45         _WampSessionEvent.clear()
    45         _WampSessionEvent.clear()
    46         _WampSession = None
    46         _WampSession = None
    47         print 'WAMP session left'
    47         print 'WAMP session left'
    48 
    48 
    49 PLCObjDefaults = { "StartPLC": False,
    49 PLCObjDefaults = { "StartPLC": False,
    54 def WAMP_connector_factory(uri, confnodesroot):
    54 def WAMP_connector_factory(uri, confnodesroot):
    55     """
    55     """
    56     WAMP://127.0.0.1:12345/path#realm#ID
    56     WAMP://127.0.0.1:12345/path#realm#ID
    57     WAMPS://127.0.0.1:12345/path#realm#ID
    57     WAMPS://127.0.0.1:12345/path#realm#ID
    58     """
    58     """
    59     global _WampSession, _ReactorThread, _WampSessionEvent
       
    60 
       
    61     servicetype, location = uri.split("://")
    59     servicetype, location = uri.split("://")
    62     urlpath, realm, ID = location.split('#')
    60     urlpath, realm, ID = location.split('#')
    63     urlprefix = {"WAMP":"ws",
    61     urlprefix = {"WAMP":"ws",
    64                  "WAMPS":"wss"}[servicetype]
    62                  "WAMPS":"wss"}[servicetype]
    65     url = urlprefix+"://"+urlpath
    63     url = urlprefix+"://"+urlpath
    88         # start the client from a Twisted endpoint
    86         # start the client from a Twisted endpoint
    89         conn = connectWS(transport_factory)
    87         conn = connectWS(transport_factory)
    90         confnodesroot.logger.write(_("WAMP connecting to URL : %s\n")%url)
    88         confnodesroot.logger.write(_("WAMP connecting to URL : %s\n")%url)
    91         return conn
    89         return conn
    92 
    90 
       
    91     AddToDoBeforeQuit = confnodesroot.AppFrame.AddToDoBeforeQuit
       
    92     def ThreadProc():
       
    93         global _WampConnection
       
    94         _WampConnection = RegisterWampClient()
       
    95         AddToDoBeforeQuit(reactor.stop)
       
    96         reactor.run(installSignalHandlers=False)
       
    97 
    93     def WampSessionProcMapper(funcname):
    98     def WampSessionProcMapper(funcname):
    94         def catcher_func(*args,**kwargs):
    99         def catcher_func(*args,**kwargs):
       
   100             global _WampSession
    95             if _WampSession is not None :
   101             if _WampSession is not None :
    96                 try:
   102                 try:
    97                     return threads.blockingCallFromThread(
   103                     return threads.blockingCallFromThread(
    98                         reactor, _WampSession.call, funcname,
   104                         reactor, _WampSession.call, funcname,
    99                         *args,**kwargs)
   105                         *args,**kwargs)
   108             return PLCObjDefaults.get(funcname)
   114             return PLCObjDefaults.get(funcname)
   109         return catcher_func
   115         return catcher_func
   110 
   116 
   111     class WampPLCObjectProxy(object):
   117     class WampPLCObjectProxy(object):
   112         def __init__(self):
   118         def __init__(self):
       
   119             global _WampSessionEvent, _WampConnection
   113             if not reactor.running:
   120             if not reactor.running:
   114                 def ThreadProc():
       
   115                     self.connection = RegisterWampClient()
       
   116                     reactor.run(installSignalHandlers=False)
       
   117                 Thread(target=ThreadProc).start()
   121                 Thread(target=ThreadProc).start()
   118             else:
   122             else:
   119                 self.connection = threads.blockingCallFromThread(
   123                 _WampConnection = threads.blockingCallFromThread(
   120                     reactor, RegisterWampClient)
   124                     reactor, RegisterWampClient)
   121             if not _WampSessionEvent.wait(5):
   125             if not _WampSessionEvent.wait(5):
   122                 self.connection.stopConnecting()
   126                 _WampConnection = stopConnecting()
   123                 raise Exception, _("WAMP connection timeout")
   127                 raise Exception, _("WAMP connection timeout")
   124 
   128 
   125         def __del__(self):
   129         def __del__(self):
   126             self.connection.disconnect()
   130             global _WampConnection
   127             #reactor.Stop()
   131             _WampConnection.disconnect()
       
   132             #
       
   133             # reactor.stop()
   128 
   134 
   129         def __getattr__(self, attrName):
   135         def __getattr__(self, attrName):
   130             member = self.__dict__.get(attrName, None)
   136             member = self.__dict__.get(attrName, None)
   131             if member is None:
   137             if member is None:
   132                 member = WampSessionProcMapper(attrName)
   138                 member = WampSessionProcMapper(attrName)