diff --git a/opt/ahenk/ahenkd.py b/opt/ahenk/ahenkd.py index 1618328..107077d 100644 --- a/opt/ahenk/ahenkd.py +++ b/opt/ahenk/ahenkd.py @@ -6,14 +6,15 @@ from base.config.ConfigManager import ConfigManager from base.deamon.BaseDeamon import BaseDaemon from base.logger.AhenkLogger import Logger from base.Scope import Scope -from base.messaging.Messaging import Messaging from base.messaging.MessageReceiver import MessageReceiver from base.messaging.MessageSender import MessageSender from base.registration.Registration import Registration +from base.messaging.MessageResponseQueue import MessageResponseQueue +from base.plugin.PluginManager import PluginManager +from base.task.TaskManager import TaskManager from multiprocessing import Process from threading import Thread -import sys,logging -import time +import sys,logging,queue,time class AhenkDeamon(BaseDaemon): @@ -33,26 +34,44 @@ class AhenkDeamon(BaseDaemon): config = configManager.read() globalscope.setConfigurationManager(config) + # Logger must be second logger = Logger() - logger.debug("[AhenkDeamon]logging") + logger.info("this is info log") globalscope.setLogger(logger) + pluginManager = PluginManager() + pluginManager.loadPlugins() + globalscope.setPluginManager(pluginManager) + + taskManger = TaskManager() + globalscope.setTaskManager(taskManger) + registration = Registration() registration.register() + """ #TODO send register message according to register status print("sending registration message") message_sender = MessageSender(registration.get_registration_message()) message_sender.connect_to_server() print("registration message were sent") - #TODO add sender to scope + #TODO add sender to scope message_receiver = MessageReceiver() rec_process = Process(target=message_receiver.connect_to_server) rec_process.start() print("receiver online") #set parameters which will use for message sending + """ + """ + this is must be created after message services + responseQueue = queue.Queue() + messageResponseQueue = MessageResponseQueue(responseQueue) + messageResponseQueue.setDaemon(True) + messageResponseQueue.start() + globalscope.setResponseQueue(responseQueue) + """ if __name__ == '__main__': diff --git a/opt/ahenk/base/Scope.py b/opt/ahenk/base/Scope.py index 7125c80..b5e98f9 100644 --- a/opt/ahenk/base/Scope.py +++ b/opt/ahenk/base/Scope.py @@ -14,6 +14,9 @@ class Scope(object): self.configurationManager=None self.messageManager=None self.logger=None + self.pluginManager=None + self.taskManager=None + self.responseQueue=None @staticmethod def getInstance(): @@ -50,3 +53,21 @@ class Scope(object): def setMessageManager(self,messageManager): self.messageManager = messageManager + + def getPluginManager(self): + return self.pluginManager + + def setPluginManager(self,pluginManager): + self.pluginManager = pluginManager + + def getTaskManager(self): + return self.TaskManager + + def setTaskManager(self,taskManager): + self.taskManager = taskManager + + def getResponseQueue(self): + return self.responseQueue + + def setResponseQueue(self,responseQueue): + self.responseQueue=responseQueue diff --git a/opt/ahenk/base/database/AhenkDao.py b/opt/ahenk/base/database/AhenkDao.py new file mode 100644 index 0000000..14430d4 --- /dev/null +++ b/opt/ahenk/base/database/AhenkDao.py @@ -0,0 +1,11 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# Author: İsmail BAŞARAN + + +class AhenkDao(object): + """ + Sqlite manager for ahenk + """ + def __init__(self): + super(AhenkDao, self).__init__() diff --git a/opt/ahenk/base/messaging/MessageResponseQueue.py b/opt/ahenk/base/messaging/MessageResponseQueue.py index d382b71..e8b316b 100644 --- a/opt/ahenk/base/messaging/MessageResponseQueue.py +++ b/opt/ahenk/base/messaging/MessageResponseQueue.py @@ -24,6 +24,6 @@ class MessageResponseQueue(threading.Thread): print(item) # Call message manager for response self.messageManager.sendResponse(responseMessage) - self.outQueue.task_done() + #self.outQueue.task_done() except: pass diff --git a/opt/ahenk/base/model/Task.py b/opt/ahenk/base/model/Task.py new file mode 100644 index 0000000..29603f7 --- /dev/null +++ b/opt/ahenk/base/model/Task.py @@ -0,0 +1,16 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# Author: İsmail BAŞARAN + +class Task(object): + """docstring for Task""" + def __init__(self,message): + super(Task, self).__init__() + + def getPluginId(self): + # Not implemented yet + pass + + def getcommandId(self): + # Not implemented yet + pass diff --git a/opt/ahenk/base/plugin/Plugin.py b/opt/ahenk/base/plugin/Plugin.py index 99387da..f942e8d 100644 --- a/opt/ahenk/base/plugin/Plugin.py +++ b/opt/ahenk/base/plugin/Plugin.py @@ -16,11 +16,12 @@ class Plugin(threading.Thread): def run(): try: task=self.InQueue.get() - command = self.pluginManager.findCommand(task.getCommandId()) + command = self.pluginManager.findCommand(self.getName(),task.getCommandId()) command.handle_task(task) # TODO add result to response queue except Exception as e: + #TODO error log here print("exception occured when executing plugin") def getName(self): diff --git a/opt/ahenk/base/plugin/PluginManager.py b/opt/ahenk/base/plugin/PluginManager.py index cb16d96..4a0fd1e 100644 --- a/opt/ahenk/base/plugin/PluginManager.py +++ b/opt/ahenk/base/plugin/PluginManager.py @@ -2,38 +2,77 @@ # -*- coding: utf-8 -*- # Author: İsmail BAŞARAN from base.plugin.Plugin import Plugin +from base.plugin.PluginQueue import PluginQueue +from base.Scope import Scope import imp,os class PluginManager(object): """docstring for PluginManager""" #implement logger - def __init__(self, configManager): + def __init__(self): super(PluginManager, self).__init__() - self.configManager = configManager + scope = Scope.getInstance() + self.configManager = scope.getConfigurationManager() self.plugins = [] + self.pluginQueueDict = dict() + self.logger = scope.getLogger() - def loadPlugins(): + def loadPlugins(self): + print("loading") self.plugins = [] possibleplugins = os.listdir(self.configManager.get("PLUGIN", "pluginFolderPath")) for pname in possibleplugins: location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pname) if not os.path.isdir(location) or not self.configManager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir(location): + self.logger.debug('It is not a plugin location - - ') continue - info = imp.find_module(self.configManager.get("PLUGIN", "mainModuleName"), [location]) - #mainModule = self.loadSinglePlugin(info): - self.plugins.append(Plugin(pname,mainModule)) + try: + self.loadSinglePlugin(pname) + except Exception as e: + # TODO error log + pass - def createQueueForPlugin(self): + def loadSinglePlugin(self,pluginName): + # TODO check already loaded plugin + self.pluginQueueDict[pluginName]=PluginQueue() + plugin = Plugin(pluginName,self.pluginQueueDict[pluginName]) + plugin.setDaemon(True) + plugin.start() + self.plugins.append(plugin) + def findCommand(self,pluginName,commandId): + location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pluginName) + if os.path.dir(location) and commandId + ".py" in os.listdir(location): + info = imp.find_module(commandId, [location]) + return imp.load_module(commandId, *info) + else: + self.logger.warning('Command id - - not found') + return None - def loadSinglePlugin(self,pluginInfo): - return imp.load_module(self.configManager.get("PLUGIN", "mainModuleName"), *pluginInfo) + def processTask(self,task): + try: + if task.getPluginId().lower() in self.pluginQueueDict : + self.pluginQueueDict[task.getPluginId().lower()].put(task,task.priority) + except Exception as e: + # TODO error log here + # TODO update task - status to not found command + pass - def findSinglePlugin(self,pluginName): - for plugin in self.getPlugins(): - if plugin["name"] == self.plugins: - return self.loadSinglePlugin(plugin) - - - def findCommand(self,comamndName): + def reloadPlugins(self): + # Not implemented yet pass + + def checkPluginExists(self,pluginName): + # Not implemented yet + pass + + def reloadSinglePlugin(self,pluginName): + # Not implemented yet + pass + + def checkCommandExist(self,pluginName,commandId): + # Not implemented yet + pass + + def printQueueSize(self): + print("size " + str(len(self.pluginQueueDict))) diff --git a/opt/ahenk/base/plugin/PluginQueue.py b/opt/ahenk/base/plugin/PluginQueue.py new file mode 100644 index 0000000..3d97edc --- /dev/null +++ b/opt/ahenk/base/plugin/PluginQueue.py @@ -0,0 +1,10 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# Author: İsmail BAŞARAN + +from queue import Queue + +class PluginQueue(Queue): + def __contains__(self, item): + with self.mutex: + return item in self.queue diff --git a/opt/ahenk/base/task/TaskManager.py b/opt/ahenk/base/task/TaskManager.py index e011f1b..82ab0bd 100644 --- a/opt/ahenk/base/task/TaskManager.py +++ b/opt/ahenk/base/task/TaskManager.py @@ -1,9 +1,38 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- # Author: İsmail BAŞARAN +from base.Scope import Scope class TaskManager(object): """docstring for TaskManager""" def __init__(self): super(TaskManager, self).__init__() - + scope = Scope.getInstance() + self.pluginManager = scope.getPluginManager() + self.logger= scope.getLogger() + + def addTask(self,task): + try: + # TODO add log + # TODO save task to database + # TODO send task received message + self.pluginManager.processTask(task) + except Exception as e: + # TODO error log here + pass + + def saveTask(self,task): + # TODO not implemented yet + # task reveiced to ahenk save to db firstly. + # if user close before processing task you can load from db for process + pass + + def updateTask(self,task): + # TODO not implemented yet + # This is updates task status processing - processed ... + pass + + def deleteTask(self,task): + # TODO not implemented yet + # remove task if it is processed + pass