connectors/WAMP/__init__.py
changeset 1440 e8daabf2c438
child 1441 826730e60407
equal deleted inserted replaced
1439:a68cd4253259 1440:e8daabf2c438
       
     1 #!/usr/bin/env python
       
     2 # -*- coding: utf-8 -*-
       
     3 #
       
     4 #Copyright (C) 2015: Edouard TISSERANT
       
     5 #
       
     6 #See COPYING file for copyrights details.
       
     7 #
       
     8 #This library is free software; you can redistribute it and/or
       
     9 #modify it under the terms of the GNU General Public
       
    10 #License as published by the Free Software Foundation; either
       
    11 #version 2.1 of the License, or (at your option) any later version.
       
    12 #
       
    13 #This library is distributed in the hope that it will be useful,
       
    14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    16 #General Public License for more details.
       
    17 #
       
    18 #You should have received a copy of the GNU General Public
       
    19 #License along with this library; if not, write to the Free Software
       
    20 #Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
       
    21 
       
    22 import sys, traceback
       
    23 #from twisted.python import log
       
    24 from twisted.internet import reactor, threads
       
    25 from autobahn.twisted import wamp
       
    26 from autobahn.twisted.websocket import WampWebSocketClientFactory, connectWS
       
    27 from autobahn.wamp import types
       
    28 from autobahn.wamp.exception import TransportLost
       
    29 from autobahn.wamp.serializer import MsgPackSerializer
       
    30 from threading import Thread, Event
       
    31 
       
    32 _WampSession = None
       
    33 _ReactorThread = None
       
    34 _WampSessionEvent = Event()
       
    35 
       
    36 class WampSession(wamp.ApplicationSession):
       
    37     def onJoin(self, details):
       
    38         global _WampSession
       
    39         _WampSession = self
       
    40         _WampSessionEvent.set()
       
    41         print 'WAMP session joined for :', self.config.extra["ID"]
       
    42 
       
    43     def onLeave(self, details):
       
    44         global _WampSession
       
    45         _WampSessionEvent.clear()
       
    46         _WampSession = None
       
    47         print 'WAMP session left'
       
    48 
       
    49 PLCObjDefaults = { "StartPLC": False,
       
    50                    "GetTraceVariables" : ("Broken",None),
       
    51                    "GetPLCstatus" : ("Broken",None),
       
    52                    "RemoteExec" : (-1, "RemoteExec script failed!")}
       
    53 
       
    54 def WAMP_connector_factory(uri, confnodesroot):
       
    55     """
       
    56     WAMP://127.0.0.1:12345/path#realm#ID
       
    57     WAMPS://127.0.0.1:12345/path#realm#ID
       
    58     """
       
    59     global _WampSession, _ReactorThread, _WampSessionEvent
       
    60 
       
    61     servicetype, location = uri.split("://")
       
    62     urlpath, realm, ID = location.split('#')
       
    63     urlprefix = {"WAMP":"ws",
       
    64                  "WAMPS":"wss"}[servicetype]
       
    65     url = urlprefix+"://"+urlpath
       
    66 
       
    67     def RegisterWampClient():
       
    68 
       
    69         ## start logging to console
       
    70         # log.startLogging(sys.stdout)
       
    71 
       
    72         # create a WAMP application session factory
       
    73         component_config = types.ComponentConfig(
       
    74             realm = realm,
       
    75             extra = {"ID":ID})
       
    76         session_factory = wamp.ApplicationSessionFactory(
       
    77             config = component_config)
       
    78         session_factory.session = WampSession
       
    79 
       
    80         # create a WAMP-over-WebSocket transport client factory
       
    81         transport_factory = WampWebSocketClientFactory(
       
    82             session_factory,
       
    83             url = url,
       
    84             serializers = [MsgPackSerializer()],
       
    85             debug = False,
       
    86             debug_wamp = False)
       
    87 
       
    88         # start the client from a Twisted endpoint
       
    89         conn = connectWS(transport_factory)
       
    90         confnodesroot.logger.write(_("WAMP connecting to URL : %s\n")%url)
       
    91         return conn
       
    92 
       
    93     def WampSessionProcMapper(funcname):
       
    94         def catcher_func(*args,**kwargs):
       
    95             if _WampSession is not None :
       
    96                 try:
       
    97                     return threads.blockingCallFromThread(
       
    98                         reactor, _WampSession.call, funcname,
       
    99                         *args,**kwargs)
       
   100                 except TransportLost, e:
       
   101                     confnodesroot.logger.write_error("Connection lost!\n")
       
   102                     confnodesroot._SetConnector(None)
       
   103                 except Exception,e:
       
   104                     errmess = traceback.format_exc()
       
   105                     confnodesroot.logger.write_error(errmess+"\n")
       
   106                     print errmess
       
   107                     #confnodesroot._SetConnector(None)
       
   108             return PLCObjDefaults.get(funcname)
       
   109         return catcher_func
       
   110 
       
   111     class WampPLCObjectProxy(object):
       
   112         def __init__(self):
       
   113             if not reactor.running:
       
   114                 def ThreadProc():
       
   115                     self.connection = RegisterWampClient()
       
   116                     reactor.run(installSignalHandlers=False)
       
   117                 Thread(target=ThreadProc).start()
       
   118             else:
       
   119                 self.connection = threads.blockingCallFromThread(
       
   120                     reactor, RegisterWampClient)
       
   121             if not _WampSessionEvent.wait(5):
       
   122                 self.connection.stopConnecting()
       
   123                 raise Exception, _("WAMP connection timeout")
       
   124 
       
   125         def __del__(self):
       
   126             self.connection.disconnect()
       
   127             #reactor.Stop()
       
   128 
       
   129         def __getattr__(self, attrName):
       
   130             member = self.__dict__.get(attrName, None)
       
   131             if member is None:
       
   132                 member = WampSessionProcMapper(attrName)
       
   133                 self.__dict__[attrName] = member
       
   134             return member
       
   135 
       
   136     # Try to get the proxy object
       
   137     try :
       
   138         return WampPLCObjectProxy()
       
   139     except Exception, msg:
       
   140         confnodesroot.logger.write_error(_("WAMP connection to '%s' failed.\n")%location)
       
   141         confnodesroot.logger.write_error(traceback.format_exc())
       
   142         return None
       
   143 
       
   144 
       
   145 
       
   146