From f82e7c4f02e5d9b184dfcc424f2d7dcde88ac7ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Wed, 30 Mar 2016 10:43:38 +0300 Subject: [PATCH 1/4] event base docstring. response queue exception handling fix --- opt/ahenk/base/event/EventBase.py | 17 ++++++++++++++++ .../base/messaging/MessageResponseQueue.py | 20 +++++++++---------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/opt/ahenk/base/event/EventBase.py b/opt/ahenk/base/event/EventBase.py index 5d10792..e2d3986 100644 --- a/opt/ahenk/base/event/EventBase.py +++ b/opt/ahenk/base/event/EventBase.py @@ -3,6 +3,11 @@ # @author: İsmail BAŞARAN class EventBase(): + + """ + This is base event class for event management. + """ + listeners = [] def __init__(self): @@ -10,10 +15,22 @@ class EventBase(): self.listener_events = [] def register_event(self, event_name, callback_func): + """ + Registers event listener. + Args: + event_name : name of event, user specify event name + callback_func : when an event fire with specified event name this method will call + """ self.listener_events.append({'event_name': event_name, 'callback_func': callback_func}) class Event(): + """ + This is event class. Takes two argument ; + Args: + event_name : name of event. + callback_args : arguments specified by user. This function will transmit args to callback function directly. + """ def __init__(self, event_name, *callback_args): for listener in EventBase.listeners: for listener_cls in listener.listener_events: diff --git a/opt/ahenk/base/messaging/MessageResponseQueue.py b/opt/ahenk/base/messaging/MessageResponseQueue.py index 91312f7..ce6bf22 100644 --- a/opt/ahenk/base/messaging/MessageResponseQueue.py +++ b/opt/ahenk/base/messaging/MessageResponseQueue.py @@ -20,14 +20,14 @@ class MessageResponseQueue(threading.Thread): self.outQueue = outQueue def run(self): - try: while True: - # This item will send response to lider. - # item must be response message. Response message may be generic message type - responseMessage = self.outQueue.get(block=True) - print(responseMessage) - # Call message manager for response - self.messageManager.send_direct_message(responseMessage) - # self.outQueue.task_done() - except: - pass + try: + # This item will send response to lider. + # item must be response message. Response message may be generic message type + responseMessage = self.outQueue.get(block=True) + self.logger.debug('[MessageResponseQueue] Sending response message to lider. Response Message ' + str(responseMessage)) + # Call message manager for response + self.messageManager.send_direct_message(responseMessage) + # self.outQueue.task_done() + except Exception as e: + self.logger.error From 721096b114e9d426384eb77b7da11c9f40511732 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Wed, 30 Mar 2016 10:51:32 +0300 Subject: [PATCH 2/4] Moved register event types to enumaration --- opt/ahenk/base/execution/ExecutionManager.py | 12 ++++++------ opt/ahenk/base/model/MessageType.py | 5 +++++ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/opt/ahenk/base/execution/ExecutionManager.py b/opt/ahenk/base/execution/ExecutionManager.py index a53f25a..0b74028 100644 --- a/opt/ahenk/base/execution/ExecutionManager.py +++ b/opt/ahenk/base/execution/ExecutionManager.py @@ -12,6 +12,7 @@ import subprocess from base.Scope import Scope from base.model.Policy import Policy from base.model.Task import Task +from base.model.MessageType import MessageType class ExecutionManager(object): @@ -28,12 +29,11 @@ class ExecutionManager(object): self.logger = scope.getLogger() self.db_service = scope.getDbService() - # TODO move this event names to enumeration - self.event_manager.register_event('EXECUTE_SCRIPT', self.execute_script) - self.event_manager.register_event('REQUEST_FILE', self.request_file) - self.event_manager.register_event('MOVE_FILE', self.move_file) - self.event_manager.register_event('EXECUTE_TASK', self.execute_task) - self.event_manager.register_event('EXECUTE_POLICY', self.execute_policy) + self.event_manager.register_event(MessageType.EXECUTE_SCRIPT, self.execute_script) + self.event_manager.register_event(MessageType.REQUEST_FILE, self.request_file) + self.event_manager.register_event(MessageType.MOVE_FILE, self.move_file) + self.event_manager.register_event(MessageType.EXECUTE_TASK, self.execute_task) + self.event_manager.register_event(MessageType.EXECUTE_POLICY, self.execute_policy) def execute_policy(self, arg): diff --git a/opt/ahenk/base/model/MessageType.py b/opt/ahenk/base/model/MessageType.py index 49e223e..eb6abf9 100644 --- a/opt/ahenk/base/model/MessageType.py +++ b/opt/ahenk/base/model/MessageType.py @@ -12,3 +12,8 @@ class MessageType(Enum): TASK_WARNING = 'TASK_WARNING' POLICY_RECEIVED = 'POLICY_RECEIVED' POLICY_PROCESSED = 'POLICY_PROCESSED' + EXECUTE_POLICY = 'EXECUTE_POLICY' + EXECUTE_TASK = 'EXECUTE_TASK' + MOVE_FILE = 'MOVE_FILE' + REQUEST_FILE = 'REQUEST_FILE' + EXECUTE_SCRIPT = 'EXECUTE_SCRIPT' From fdb095cf7c8a7d90c04aabdfc518d66b47bff014 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Wed, 30 Mar 2016 17:11:47 +0300 Subject: [PATCH 3/4] Kill signal for reload plugins --- opt/ahenk/base/model/PluginKillSignal.py | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 opt/ahenk/base/model/PluginKillSignal.py diff --git a/opt/ahenk/base/model/PluginKillSignal.py b/opt/ahenk/base/model/PluginKillSignal.py new file mode 100644 index 0000000..2341d97 --- /dev/null +++ b/opt/ahenk/base/model/PluginKillSignal.py @@ -0,0 +1,10 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# Author: İsmail BAŞARAN + + +class PluginKillSignal(object): + + @property + def obj_name(self): + return "KILL_SIGNAL" From 9b63aff8ab8388644ded05ccda330b7df96cc062 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Wed, 30 Mar 2016 17:12:38 +0300 Subject: [PATCH 4/4] created initial context obj for handling task and policy --- opt/ahenk/base/model/Policy.py | 4 +-- opt/ahenk/base/plugin/Plugin.py | 42 +++++++++++++++++++++----- opt/ahenk/base/plugin/PluginManager.py | 27 ++++++++++++++--- opt/ahenk/plugins/plugin1/command1.py | 3 +- opt/ahenk/plugins/plugin1/policy.py | 2 +- 5 files changed, 62 insertions(+), 16 deletions(-) diff --git a/opt/ahenk/base/model/Policy.py b/opt/ahenk/base/model/Policy.py index 135dd9f..5091db5 100644 --- a/opt/ahenk/base/model/Policy.py +++ b/opt/ahenk/base/model/Policy.py @@ -48,5 +48,5 @@ class Policy(object): def to_json(self): return json.load(self.policy) - def obj_type(self): - return "POLICY" + def obj_name(self): + return "PROFILE" diff --git a/opt/ahenk/base/plugin/Plugin.py b/opt/ahenk/base/plugin/Plugin.py index 4864adc..b40ed97 100644 --- a/opt/ahenk/base/plugin/Plugin.py +++ b/opt/ahenk/base/plugin/Plugin.py @@ -5,39 +5,67 @@ import threading from base.Scope import Scope +class Context(object): + def __init__(self): + self.data = {} + + def put(self,var_name,data): + self.data[var_name] = data + + def empty_data(self): + self.data = {} class Plugin(threading.Thread): - """docstring for Plugin""" + """ + This is a thread inherit class and have a queue. + Plugin class responsible for processing TASK or USER PLUGIN PROFILE. + """ def __init__(self, name, InQueue): threading.Thread.__init__(self) self.name = name self.InQueue = InQueue scope = Scope.getInstance() - self.pluginManager = scope.getPluginManager() self.logger = scope.getLogger() + self.keep_run = True + self.context = Context() def run(self): - while True: + while self.keep_run: try: item_obj = self.InQueue.get(block=True) obj_name = item_obj.obj_name print(obj_name) if obj_name == "TASK": command = Scope.getInstance().getPluginManager().findCommand(self.getName(), item_obj.command_cls_id) - command.handle_task(item_obj) + command.handle_task(item_obj,self.context) + # TODO create response message from context and item_obj. item_obj is task + + + # Empty context for next use + self.context.empty_data() + # TODO add result to response queue elif obj_name == "PROFILE": plugin = item_obj.plugin plugin_name = plugin.name profile_data = item_obj.profile_data policy_module = Scope.getInstance().getPluginManager().findPolicyModule(plugin_name) - policy_module.handle_policy(profile_data) + + policy_module.handle_policy(profile_data,self.context) + # TODO create response message from context and item_obj. item_obj is profile + + # Empty context for next use + self.context.empty_data() + + elif obj_name == "KILL_SIGNAL": + self.keep_run = False + self.logger.debug('[Plugin] Killing queue ! Plugin Name : ' + str(self.name)) else: - self.logger.warning("Not supported object type " + obj_name) + self.logger.warning("[Plugin] Not supported object type " + obj_name) except Exception as e: # TODO error log here - self.logger.error("Plugin running exception " + str(e)) + self.logger.error("[Plugin] Plugin running exception " + str(e)) def getName(self): return self.name diff --git a/opt/ahenk/base/plugin/PluginManager.py b/opt/ahenk/base/plugin/PluginManager.py index 38dddf1..bede627 100644 --- a/opt/ahenk/base/plugin/PluginManager.py +++ b/opt/ahenk/base/plugin/PluginManager.py @@ -7,6 +7,7 @@ import os from base.Scope import Scope from base.plugin.Plugin import Plugin from base.plugin.PluginQueue import PluginQueue +from base.model.PluginKillSignal import PluginKillSignal class PluginManager(object): @@ -24,18 +25,24 @@ class PluginManager(object): #TODO version? def loadPlugins(self): + """ + This method loads plugins + """ + self.logger.info('[PluginManager] Loading plugins...') self.plugins = [] + self.logger.debug('[PluginManager] Lookup for possible plugins...') possibleplugins = os.listdir(self.configManager.get("PLUGIN", "pluginFolderPath")) + self.logger.debug('[PluginManager] Possible plugins.. ' + str(possibleplugins)) for pname in possibleplugins: location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pname) if not os.path.isdir(location) or not self.configManager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir(location): - self.logger.debug('It is not a plugin location - - ') + self.logger.debug('It is not a plugin location ! There is no main module - ' + str(location)) continue try: self.loadSinglePlugin(pname) except Exception as e: - # TODO error log - pass + self.logger.error('Exception occured when loading plugin ! Plugin name : ' + str(pname) + ' Exception : ' + str(e)) + self.logger.info('[PluginManager] Loaded plugins successfully.') def loadSinglePlugin(self, pluginName): # TODO check already loaded plugin @@ -63,8 +70,17 @@ class PluginManager(object): self.logger.error("[PluginManager] Exception occurred when processing task " + str(e)) def reloadPlugins(self): - # Not implemented yet - pass + #TODO + try: + self.logger.info('[PluginManager] Reloading plugins... ') + kill_sgnl = PluginKillSignal() + for p_queue in self.pluginQueueDict: + p_queue.put(kill_sgnl) + self.plugins = [] + self.loadPlugins() + self.logger.info('[PluginManager] Plugin reloaded successfully.') + except Exception as e: + self.logger.error('[PluginManager] Exception occurred when reloading plugins ' + str(e)) def findPolicyModule(self,plugin_name): location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), plugin_name) @@ -102,6 +118,7 @@ class PluginManager(object): def reloadSinglePlugin(self, pluginName): # Not implemented yet + pass def checkCommandExist(self, pluginName, commandId): diff --git a/opt/ahenk/plugins/plugin1/command1.py b/opt/ahenk/plugins/plugin1/command1.py index a5c2ece..0d6e479 100644 --- a/opt/ahenk/plugins/plugin1/command1.py +++ b/opt/ahenk/plugins/plugin1/command1.py @@ -15,9 +15,10 @@ class MySamplePlugin(AbstractCommand): print("parameter map="+self.task.parameter_map) -def handle_task(task): +def handle_task(task,context): # Do what ever you want here # You can create command class but it is not necessary # You can use directly this method. + context.put('my_data_name','my data') myPlugin = MySamplePlugin(task) myPlugin.handle_task() diff --git a/opt/ahenk/plugins/plugin1/policy.py b/opt/ahenk/plugins/plugin1/policy.py index 2e4176f..c141dfb 100644 --- a/opt/ahenk/plugins/plugin1/policy.py +++ b/opt/ahenk/plugins/plugin1/policy.py @@ -3,5 +3,5 @@ # Author: İsmail BAŞARAN -def handle_policy(profile_data): +def handle_policy(profile_data,context): print("This is policy file - plugin 1")