From 9790371289a00131e5a2c8de5ed50802ea1bacc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Wed, 23 Mar 2016 10:04:31 +0200 Subject: [PATCH] bug fix --- opt/ahenk/ahenkd.py | 44 +++++++++---------- opt/ahenk/base/execution/ExecutionManager.py | 1 + .../base/messaging/MessageResponseQueue.py | 17 +++---- opt/ahenk/base/plugin/Plugin.py | 26 ++++++----- opt/ahenk/base/plugin/PluginManager.py | 13 +++--- opt/ahenk/base/task/TaskManager.py | 6 ++- 6 files changed, 55 insertions(+), 52 deletions(-) diff --git a/opt/ahenk/ahenkd.py b/opt/ahenk/ahenkd.py index cf65bfb..afd85e1 100755 --- a/opt/ahenk/ahenkd.py +++ b/opt/ahenk/ahenkd.py @@ -10,13 +10,13 @@ from base.Scope import Scope from base.messaging.Messaging import Messaging from base.messaging.Messager import Messager from base.execution.ExecutionManager import ExecutionManager -from base.registration.Registration import Registration +#from base.registration.Registration import Registration from base.messaging.MessageResponseQueue import MessageResponseQueue from base.event.EventManager import EventManager from base.plugin.PluginManager import PluginManager from base.task.TaskManager import TaskManager from base.database.AhenkDbService import AhenkDbService -import threading, time, sys, os, signal, configparser +import threading, time, sys, os, signal, configparser,queue pidfilePath = '/var/run/ahenk.pid' @@ -70,9 +70,9 @@ class AhenkDeamon(BaseDaemon): globalscope.setTaskManager(taskManager) logger.info('[AhenkDeamon] Task Manager was set') - registration=Registration() - globalscope.setRegistration(registration) - logger.info('[AhenkDeamon] Registration was set') + #registration=Registration() + #globalscope.setRegistration(registration) + #logger.info('[AhenkDeamon] Registration was set') execution_manager=ExecutionManager() globalscope.setExecutionManager(execution_manager) @@ -80,9 +80,9 @@ class AhenkDeamon(BaseDaemon): #TODO restrict number of attemption - while registration.is_registered() is False: - logger.debug('[AhenkDeamon] Attempting to register') - registration.registration_request() + #while registration.is_registered() is False: + # logger.debug('[AhenkDeamon] Attempting to register') + # registration.registration_request() logger.info('[AhenkDeamon] Ahenk is registered') @@ -96,9 +96,9 @@ class AhenkDeamon(BaseDaemon): globalscope.setMessager(messager) logger.info('[AhenkDeamon] Messager was set') - if registration.is_ldap_registered() is False: - logger.debug('[AhenkDeamon] Attempting to registering ldap') - registration.ldap_registration_request() #TODO work on message + #if registration.is_ldap_registered() is False: + # logger.debug('[AhenkDeamon] Attempting to registering ldap') + # registration.ldap_registration_request() #TODO work on message logger.info('[AhenkDeamon] LDAP registration of Ahenk is completed') @@ -119,6 +119,12 @@ class AhenkDeamon(BaseDaemon): #messager.send_direct_message('test') + responseQueue = queue.Queue() + messageResponseQueue = MessageResponseQueue(responseQueue) + messageResponseQueue.setDaemon(True) + messageResponseQueue.start() + globalscope.setResponseQueue(responseQueue) + while True: time.sleep(1) @@ -126,14 +132,10 @@ class AhenkDeamon(BaseDaemon): #logger.info('[AhenkDeamon] Requesting policies...') #messager.send_direct_message(messageManager.policy_request_msg()) - """ - this is must be created after message services - responseQueue = queue.Queue() - messageResponseQueue = MessageResponseQueue(responseQueue) - messageResponseQueue.setDaemon(True) - messageResponseQueue.start() - globalscope.setResponseQueue(responseQueue) - """ + + #this is must be created after message services + + def signal_handler (self, num, stack): @@ -202,7 +204,3 @@ if __name__ == '__main__': else: print('Usage : %s start|stop|restart|status' % sys.argv[0]) sys.exit(2) - - - - diff --git a/opt/ahenk/base/execution/ExecutionManager.py b/opt/ahenk/base/execution/ExecutionManager.py index 6f47e1b..1a68a4d 100644 --- a/opt/ahenk/base/execution/ExecutionManager.py +++ b/opt/ahenk/base/execution/ExecutionManager.py @@ -22,6 +22,7 @@ class ExecutionManager(object): self.logger=scope.getLogger() self.db_service=scope.getDbService() + # TODO move this event names to enumeration self.event_manager.register_event('EXECUTE_SCRIPT',self.execute_script) self.event_manager.register_event('REQUEST_FILE',self.request_file) self.event_manager.register_event('MOVE_FILE',self.move_file) diff --git a/opt/ahenk/base/messaging/MessageResponseQueue.py b/opt/ahenk/base/messaging/MessageResponseQueue.py index e8b316b..c6a0673 100644 --- a/opt/ahenk/base/messaging/MessageResponseQueue.py +++ b/opt/ahenk/base/messaging/MessageResponseQueue.py @@ -13,17 +13,18 @@ class MessageResponseQueue(threading.Thread): super(MessageResponseQueue, self).__init__() scope = Scope.getInstance() self.logger = scope.getLogger() - self.messageManager = scope.getMessageManager() + self.messageManager = scope.getMessager() self.outQueue = outQueue def run(self): try: - # This item will send response to lider. - # item must be response message. Response message may be generic message type - responseMessage = self.outQueue.get() - print(item) - # Call message manager for response - self.messageManager.sendResponse(responseMessage) - #self.outQueue.task_done() + while True : + # This item will send response to lider. + # item must be response message. Response message may be generic message type + responseMessage = self.outQueue.get(block=True) + print(responseMessage) + # Call message manager for response + self.messageManager.send_direct_message(responseMessage) + #self.outQueue.task_done() except: pass diff --git a/opt/ahenk/base/plugin/Plugin.py b/opt/ahenk/base/plugin/Plugin.py index f942e8d..84de106 100644 --- a/opt/ahenk/base/plugin/Plugin.py +++ b/opt/ahenk/base/plugin/Plugin.py @@ -7,22 +7,24 @@ from base.Scope import Scope class Plugin(threading.Thread): """docstring for Plugin""" def __init__(self, name,InQueue): - super(Plugin, self).__init__() + threading.Thread.__init__(self) self.name = name self.InQueue = InQueue - self.scope=Scope.getInstance() - self.pluginManager = self.scope.getPluginManager() + scope = Scope.getInstance() + self.pluginManager = scope.getPluginManager() + self.logger = scope.getLogger() - def run(): - try: - task=self.InQueue.get() - command = self.pluginManager.findCommand(self.getName(),task.getCommandId()) - command.handle_task(task) - # TODO add result to response queue + def run(self): + while True : + try: + task=self.InQueue.get(block=True) + command = Scope.getInstance().getPluginManager().findCommand(self.getName(),task.command_cls_id) + command.handle_task(task) + # TODO add result to response queue - except Exception as e: - #TODO error log here - print("exception occured when executing plugin") + except Exception as e: + #TODO error log here + self.logger.error("Plugin running exception " + str(e)) def getName(self): return self.name diff --git a/opt/ahenk/base/plugin/PluginManager.py b/opt/ahenk/base/plugin/PluginManager.py index 4ab6d1e..7061539 100644 --- a/opt/ahenk/base/plugin/PluginManager.py +++ b/opt/ahenk/base/plugin/PluginManager.py @@ -18,7 +18,6 @@ class PluginManager(object): self.logger = self.scope.getLogger() def loadPlugins(self): - print("loading") self.plugins = [] possibleplugins = os.listdir(self.configManager.get("PLUGIN", "pluginFolderPath")) for pname in possibleplugins: @@ -42,21 +41,21 @@ class PluginManager(object): def findCommand(self,pluginName,commandId): location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pluginName) - if os.path.dir(location) and commandId + ".py" in os.listdir(location): + if os.path.isdir(location) and commandId + ".py" in os.listdir(location): info = imp.find_module(commandId, [location]) return imp.load_module(commandId, *info) else: - self.logger.warning('Command id - - not found') + self.logger.warning('Command id -' + commandId +' - not found') return None def processTask(self,task): try: - if task.getPluginId().lower() in self.pluginQueueDict : - self.pluginQueueDict[task.getPluginId().lower()].put(task,task.priority) + if task.plugin.name.lower() in self.pluginQueueDict : + self.pluginQueueDict[task.plugin.name.lower()].put(task,1) except Exception as e: - # TODO error log here # TODO update task - status to not found command - pass + self.logger.error("[PluginManager] Exception occurred when processing task " + str(e)) + def reloadPlugins(self): # Not implemented yet diff --git a/opt/ahenk/base/task/TaskManager.py b/opt/ahenk/base/task/TaskManager.py index a63a4c9..01a982e 100644 --- a/opt/ahenk/base/task/TaskManager.py +++ b/opt/ahenk/base/task/TaskManager.py @@ -10,7 +10,7 @@ class TaskManager(object): """docstring for TaskManager""" def __init__(self): - super(TaskManager, self).__init__() + #super(TaskManager, self).__init__() scope = Scope.getInstance() self.pluginManager = scope.getPluginManager() self.logger = scope.getLogger() @@ -19,11 +19,13 @@ class TaskManager(object): def addTask(self, task): try: self.logger.debug('Adding task ... ') - self.saveTask(task) + #self.saveTask(task) + self.logger.info('Task saved ') # TODO send task received message self.pluginManager.processTask(task) except Exception as e: # TODO error log here + self.logger.debug('Exception occured when adding task ' + str(e)) pass def saveTask(self, task):