PSKManagement.py
author Andrey Skvortsov <andrej.skvortzov@gmail.com>
Wed, 13 Mar 2019 11:47:03 +0300
changeset 2537 eb4a4cc41914
parent 2492 7dd551ac2fa0
child 3750 f62625418bff
permissions -rw-r--r--
Fix various pylint and pep8 errors

Check basic code-style problems for PEP-8
pep8 version: 2.4.0
./connectors/PYRO/__init__.py:57:43: E261 at least two spaces before inline comment
./connectors/SchemeEditor.py:29:21: E128 continuation line under-indented for visual indent
./controls/IDBrowser.py:101:23: E127 continuation line over-indented for visual indent
./controls/IDBrowser.py:102:23: E127 continuation line over-indented for visual indent

Check for problems using pylint ...
No config file found, using default configuration
pylint 1.9.4,
astroid 1.6.5
Python 2.7.16rc1 (default, Feb 18 2019, 11:05:09)
[GCC 8.2.0]
Use multiple threads for pylint
Using config file /home/developer/WorkData/PLC/beremiz/beremiz/.pylint
************* Module connectors.PYRO_dialog
connectors/PYRO_dialog.py:9: [W0611(unused-import), ] Unused import wx
************* Module connectors
connectors/__init__.py:32: [W1652(deprecated-types-field), ] Accessing a deprecated fields on the types module
connectors/__init__.py:32: [C0411(wrong-import-order), ] standard import "from types import ClassType" should be placed before "from connectors.ConnectorBase import ConnectorBase"
************* Module connectors.PYRO.PSK_Adapter
connectors/PYRO/PSK_Adapter.py:7: [C0411(wrong-import-order), ] standard import "import ssl" should be placed before "import sslpsk"
************* Module connectors.SchemeEditor
connectors/SchemeEditor.py:29: [C0330(bad-continuation), ] Wrong continued indentation (add 1 space).
wx.ALIGN_CENTER_VERTICAL),
^|
connectors/SchemeEditor.py:42: [W0631(undefined-loop-variable), SchemeEditor.__init__] Using possibly undefined loop variable 'tag'
************* Module runtime.WampClient
runtime/WampClient.py:138: [W1612(unicode-builtin), WampSession.onJoin] unicode built-in referenced
runtime/WampClient.py:154: [W1612(unicode-builtin), WampSession.publishWithOwnID] unicode built-in referenced
runtime/WampClient.py:346: [W1612(unicode-builtin), PublishEvent] unicode built-in referenced
runtime/WampClient.py:351: [W1612(unicode-builtin), PublishEventWithOwnID] unicode built-in referenced
runtime/WampClient.py:31: [W0611(unused-import), ] Unused str imported from builtins as text
************* Module runtime.PLCObject
runtime/PLCObject.py:35: [W1648(bad-python3-import), ] Module moved in Python 3
runtime/PLCObject.py:35: [C0411(wrong-import-order), ] standard import "import md5" should be placed before "from six.moves import xrange"
runtime/PLCObject.py:36: [C0411(wrong-import-order), ] standard import "from tempfile import mkstemp" should be placed before "from six.moves import xrange"
runtime/PLCObject.py:37: [C0411(wrong-import-order), ] standard import "import shutil" should be placed before "from six.moves import xrange"
runtime/PLCObject.py:38: [C0411(wrong-import-order), ] standard import "from functools import wraps, partial" should be placed before "from six.moves import xrange"
************* Module runtime.Worker
runtime/Worker.py:12: [W1648(bad-python3-import), ] Module moved in Python 3
************* Module runtime.spawn_subprocess
runtime/spawn_subprocess.py:125: [C0325(superfluous-parens), ] Unnecessary parens after 'print' keyword
runtime/spawn_subprocess.py:130: [C0325(superfluous-parens), ] Unnecessary parens after 'print' keyword
runtime/spawn_subprocess.py:125: [E1601(print-statement), ] print statement used
runtime/spawn_subprocess.py:130: [E1601(print-statement), ] print statement used
************* Module controls.IDBrowser
controls/IDBrowser.py:101: [C0330(bad-continuation), ] Wrong continued indentation (remove 5 spaces).
if self.isManager
| ^
controls/IDBrowser.py:102: [C0330(bad-continuation), ] Wrong continued indentation (remove 5 spaces).
else dv.DATAVIEW_CELL_INERT),
| ^
************* Module Beremiz_service
Beremiz_service.py:34: [W0611(unused-import), ] Unused import __builtin__
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# See COPYING file for copyrights details.

from __future__ import absolute_import
import os
import time
import json
from zipfile import ZipFile

# PSK Management Data model :
# [[ID,Desc, LastKnownURI, LastConnect]]
COL_ID, COL_URI, COL_DESC, COL_LAST = range(4)
REPLACE, REPLACE_ALL, KEEP, KEEP_ALL, CANCEL = range(5)


def _pskpath(project_path):
    return os.path.join(project_path, 'psk')


def _mgtpath(project_path):
    return os.path.join(_pskpath(project_path), 'management.json')


def _ensurePSKdir(project_path):
    pskpath = _pskpath(project_path)
    if not os.path.exists(pskpath):
        os.mkdir(pskpath)
    return pskpath


def _default(ID):
    return [ID,
            '',  # default description
            None,  # last known URI
            None]  # last connection date


def _dataByID(data):
    return {row[COL_ID]: row for row in data}


def _LoadData(project_path):
    """ load known keys metadata """
    if os.path.isdir(_pskpath(project_path)):
        _path = _mgtpath(project_path)
        if os.path.exists(_path):
            return json.loads(open(_path).read())
    return []


def _filterData(psk_files, data_input):
    input_by_ID = _dataByID(data_input)
    output = []
    # go through all secret files available an build data
    # out of data recoverd from json and list of secret.
    # this implicitly filters IDs out of metadata who's
    # secret is missing
    for filename in psk_files:
        if filename.endswith('.secret'):
            ID = filename[:-7]  # strip filename extension
            output.append(input_by_ID.get(ID, _default(ID)))
    return output


def GetData(project_path):
    loaded_data = _LoadData(project_path)
    if loaded_data:
        psk_files = os.listdir(_pskpath(project_path))
        return _filterData(psk_files, loaded_data)
    return []


def DeleteID(project_path, ID):
    secret_path = os.path.join(_pskpath(project_path), ID+'.secret')
    os.remove(secret_path)


def SaveData(project_path, data):
    _ensurePSKdir(project_path)
    with open(_mgtpath(project_path), 'w') as f:
        f.write(json.dumps(data))


def UpdateID(project_path, ID, secret, URI):
    pskpath = _ensurePSKdir(project_path)
    if not os.path.exists(pskpath):
        os.mkdir(pskpath)

    secpath = os.path.join(pskpath, ID+'.secret')
    with open(secpath, 'w') as f:
        f.write(ID+":"+secret)

    # here we directly use _LoadData, avoiding filtering that could be long
    data = _LoadData(project_path)
    idata = _dataByID(data)
    dataForID = idata.get(ID, None) if data else None

    _is_new_ID = dataForID is None
    if _is_new_ID:
        dataForID = _default(ID)

    dataForID[COL_URI] = URI
    # FIXME : could store time instead os a string and use DVC model's cmp
    # then date display could be smarter, etc - sortable sting hack for now
    dataForID[COL_LAST] = time.strftime('%y/%M/%d-%H:%M:%S')

    if _is_new_ID:
        data.append(dataForID)

    SaveData(project_path, data)


def ExportIDs(project_path, export_zip):
    with ZipFile(export_zip, 'w') as zf:
        path = _pskpath(project_path)
        for nm in os.listdir(path):
            if nm.endswith('.secret') or nm == 'management.json':
                zf.write(os.path.join(path, nm), nm)


def ImportIDs(project_path, import_zip, should_I_replace_callback):
    zf = ZipFile(import_zip, 'r')
    data = GetData(project_path)

    zip_loaded_data = json.loads(zf.open('management.json').read())
    name_list = zf.namelist()
    zip_filtered_data = _filterData(name_list, zip_loaded_data)

    idata = _dataByID(data)

    keys_to_import = []
    result = None

    for imported_row in zip_filtered_data:
        ID = imported_row[COL_ID]
        existing_row = idata.get(ID, None)
        if existing_row is None:
            data.append(imported_row)
        else:
            # callback returns the selected list for merge or none if canceled
            if result not in [REPLACE_ALL, KEEP_ALL]:
                result = should_I_replace_callback(existing_row, imported_row)

            if result == CANCEL:
                return

            if result in [REPLACE_ALL, REPLACE]:
                # replace with imported
                existing_row[:] = imported_row
                # copy the key of selected
                keys_to_import.append(ID)

    for ID in keys_to_import:
        zf.extract(ID+".secret", _pskpath(project_path))

    SaveData(project_path, data)

    return data