connectors/WAMP/__init__.py
changeset 1475 de4ee16f7c6c
parent 1443 ff8a22d45c44
child 1571 486f94a8032c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/connectors/WAMP/__init__.py	Wed Oct 21 15:00:32 2015 +0100
@@ -0,0 +1,153 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+#Copyright (C) 2015: Edouard TISSERANT
+#
+#See COPYING file for copyrights details.
+#
+#This library is free software; you can redistribute it and/or
+#modify it under the terms of the GNU General Public
+#License as published by the Free Software Foundation; either
+#version 2.1 of the License, or (at your option) any later version.
+#
+#This library is distributed in the hope that it will be useful,
+#but WITHOUT ANY WARRANTY; without even the implied warranty of
+#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+#General Public License for more details.
+#
+#You should have received a copy of the GNU General Public
+#License along with this library; if not, write to the Free Software
+#Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+import sys, traceback, atexit
+#from twisted.python import log
+from twisted.internet import reactor, threads
+from autobahn.twisted import wamp
+from autobahn.twisted.websocket import WampWebSocketClientFactory, connectWS
+from autobahn.wamp import types
+from autobahn.wamp.exception import TransportLost
+from autobahn.wamp.serializer import MsgPackSerializer
+from threading import Thread, Event
+
+_WampSession = None
+_WampConnection = None
+_WampSessionEvent = Event()
+
+class WampSession(wamp.ApplicationSession):
+    def onJoin(self, details):
+        global _WampSession, _WampSessionEvent
+        _WampSession = self
+        _WampSessionEvent.set()
+        print 'WAMP session joined for :', self.config.extra["ID"]
+
+    def onLeave(self, details):
+        global _WampSession, _WampSessionEvent
+        _WampSessionEvent.clear()
+        _WampSession = None
+        print 'WAMP session left'
+
+PLCObjDefaults = { "StartPLC": False,
+                   "GetTraceVariables" : ("Broken",None),
+                   "GetPLCstatus" : ("Broken",None),
+                   "RemoteExec" : (-1, "RemoteExec script failed!")}
+
+def WAMP_connector_factory(uri, confnodesroot):
+    """
+    WAMP://127.0.0.1:12345/path#realm#ID
+    WAMPS://127.0.0.1:12345/path#realm#ID
+    """
+    servicetype, location = uri.split("://")
+    urlpath, realm, ID = location.split('#')
+    urlprefix = {"WAMP":"ws",
+                 "WAMPS":"wss"}[servicetype]
+    url = urlprefix+"://"+urlpath
+
+    def RegisterWampClient():
+
+        ## start logging to console
+        # log.startLogging(sys.stdout)
+
+        # create a WAMP application session factory
+        component_config = types.ComponentConfig(
+            realm = realm,
+            extra = {"ID":ID})
+        session_factory = wamp.ApplicationSessionFactory(
+            config = component_config)
+        session_factory.session = WampSession
+
+        # create a WAMP-over-WebSocket transport client factory
+        transport_factory = WampWebSocketClientFactory(
+            session_factory,
+            url = url,
+            serializers = [MsgPackSerializer()],
+            debug = False,
+            debug_wamp = False)
+
+        # start the client from a Twisted endpoint
+        conn = connectWS(transport_factory)
+        confnodesroot.logger.write(_("WAMP connecting to URL : %s\n")%url)
+        return conn
+
+    AddToDoBeforeQuit = confnodesroot.AppFrame.AddToDoBeforeQuit
+    def ThreadProc():
+        global _WampConnection
+        _WampConnection = RegisterWampClient()
+        AddToDoBeforeQuit(reactor.stop)
+        reactor.run(installSignalHandlers=False)
+
+    def WampSessionProcMapper(funcname):
+        wampfuncname = '.'.join((ID,funcname))
+        def catcher_func(*args,**kwargs):
+            global _WampSession
+            if _WampSession is not None :
+                try:
+                    return threads.blockingCallFromThread(
+                        reactor, _WampSession.call, wampfuncname,
+                        *args,**kwargs)
+                except TransportLost, e:
+                    confnodesroot.logger.write_error("Connection lost!\n")
+                    confnodesroot._SetConnector(None)
+                except Exception,e:
+                    errmess = traceback.format_exc()
+                    confnodesroot.logger.write_error(errmess+"\n")
+                    print errmess
+                    #confnodesroot._SetConnector(None)
+            return PLCObjDefaults.get(funcname)
+        return catcher_func
+
+    class WampPLCObjectProxy(object):
+        def __init__(self):
+            global _WampSessionEvent, _WampConnection
+            if not reactor.running:
+                Thread(target=ThreadProc).start()
+            else:
+                _WampConnection = threads.blockingCallFromThread(
+                    reactor, RegisterWampClient)
+            if not _WampSessionEvent.wait(5):
+                _WampConnection = stopConnecting()
+                raise Exception, _("WAMP connection timeout")
+
+        def __del__(self):
+            global _WampConnection
+            _WampConnection.disconnect()
+            #
+            # reactor.stop()
+
+        def __getattr__(self, attrName):
+            member = self.__dict__.get(attrName, None)
+            if member is None:
+                member = WampSessionProcMapper(attrName)
+                self.__dict__[attrName] = member
+            return member
+
+    # Try to get the proxy object
+    try :
+        return WampPLCObjectProxy()
+    except Exception, msg:
+        confnodesroot.logger.write_error(_("WAMP connection to '%s' failed.\n")%location)
+        confnodesroot.logger.write_error(traceback.format_exc())
+        return None
+
+
+
+