From 3952cafc1ae1b3ad619d103a473d03e2dbeca83c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Wed, 2 Mar 2016 10:27:43 +0200 Subject: [PATCH 1/5] plugin queue and load modules --- opt/ahenk/base/plugin/PluginManager.py | 45 +++++++++++++++----------- opt/ahenk/base/plugin/PluginQueue.py | 10 ++++++ 2 files changed, 37 insertions(+), 18 deletions(-) create mode 100644 opt/ahenk/base/plugin/PluginQueue.py diff --git a/opt/ahenk/base/plugin/PluginManager.py b/opt/ahenk/base/plugin/PluginManager.py index cb16d96..2e35deb 100644 --- a/opt/ahenk/base/plugin/PluginManager.py +++ b/opt/ahenk/base/plugin/PluginManager.py @@ -2,15 +2,20 @@ # -*- coding: utf-8 -*- # Author: İsmail BAŞARAN from base.plugin.Plugin import Plugin +from base.plugin.PluginQueue import PluginQueue +from base.Scope import Scope import imp,os class PluginManager(object): """docstring for PluginManager""" #implement logger - def __init__(self, configManager): + def __init__(self): super(PluginManager, self).__init__() - self.configManager = configManager + scope = Scope.getInstance() + self.configManager = scope.getConfigurationManager() self.plugins = [] + self.pluginQueueDict = dict() + self.logger = scope.getLogger() def loadPlugins(): self.plugins = [] @@ -18,22 +23,26 @@ class PluginManager(object): for pname in possibleplugins: location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pname) if not os.path.isdir(location) or not self.configManager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir(location): + #TODO debug log here continue - info = imp.find_module(self.configManager.get("PLUGIN", "mainModuleName"), [location]) - #mainModule = self.loadSinglePlugin(info): - self.plugins.append(Plugin(pname,mainModule)) + try: + self.loadSinglePlugin(pname) + except Exception as e: + # TODO error log - def createQueueForPlugin(self): + def loadSinglePlugin(self,pluginName): + # TODO check already loaded plugin + self.pluginQueueDict[pluginName]=PluginQueue() + plugin = Plugin(pluginName,self.pluginQueueDict[pluginName]) + plugin.setDaemon(True) + plugin.start() + self.plugins.append(plugin) - - def loadSinglePlugin(self,pluginInfo): - return imp.load_module(self.configManager.get("PLUGIN", "mainModuleName"), *pluginInfo) - - def findSinglePlugin(self,pluginName): - for plugin in self.getPlugins(): - if plugin["name"] == self.plugins: - return self.loadSinglePlugin(plugin) - - - def findCommand(self,comamndName): - pass + def findCommand(self,pluginName,commandId): + location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pname) + if os.path.dir(location) and commandId + ".py" in os.listdir(location): + info = imp.find_module(commandId, [location]) + return imp.load_module(commandId, *info) + else: + self.logger.warning('Command id - %s - not found',commandId) + return None diff --git a/opt/ahenk/base/plugin/PluginQueue.py b/opt/ahenk/base/plugin/PluginQueue.py new file mode 100644 index 0000000..df7a961 --- /dev/null +++ b/opt/ahenk/base/plugin/PluginQueue.py @@ -0,0 +1,10 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# Author: İsmail BAŞARAN + +import Queue + +class PluginQueue(Queue.PriorityQueue): + def __contains__(self, item): + with self.mutex: + return item in self.queue From 6951f0b23880ad0da73df3435b79beaaa082c7bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Wed, 2 Mar 2016 11:33:48 +0200 Subject: [PATCH 2/5] model and database initial classes --- opt/ahenk/base/database/AhenkDao.py | 11 +++++++++++ opt/ahenk/base/model/Task.py | 16 ++++++++++++++++ 2 files changed, 27 insertions(+) create mode 100644 opt/ahenk/base/database/AhenkDao.py create mode 100644 opt/ahenk/base/model/Task.py diff --git a/opt/ahenk/base/database/AhenkDao.py b/opt/ahenk/base/database/AhenkDao.py new file mode 100644 index 0000000..14430d4 --- /dev/null +++ b/opt/ahenk/base/database/AhenkDao.py @@ -0,0 +1,11 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# Author: İsmail BAŞARAN + + +class AhenkDao(object): + """ + Sqlite manager for ahenk + """ + def __init__(self): + super(AhenkDao, self).__init__() diff --git a/opt/ahenk/base/model/Task.py b/opt/ahenk/base/model/Task.py new file mode 100644 index 0000000..29603f7 --- /dev/null +++ b/opt/ahenk/base/model/Task.py @@ -0,0 +1,16 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# Author: İsmail BAŞARAN + +class Task(object): + """docstring for Task""" + def __init__(self,message): + super(Task, self).__init__() + + def getPluginId(self): + # Not implemented yet + pass + + def getcommandId(self): + # Not implemented yet + pass From 55c430ed40b8f479c8acab27ed7655428c1eed1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Wed, 2 Mar 2016 11:59:19 +0200 Subject: [PATCH 3/5] plugin manager and task manager (in progress) --- opt/ahenk/base/plugin/Plugin.py | 3 ++- opt/ahenk/base/plugin/PluginManager.py | 28 ++++++++++++++++++++++-- opt/ahenk/base/task/TaskManager.py | 30 +++++++++++++++++++++++++- 3 files changed, 57 insertions(+), 4 deletions(-) diff --git a/opt/ahenk/base/plugin/Plugin.py b/opt/ahenk/base/plugin/Plugin.py index 99387da..f942e8d 100644 --- a/opt/ahenk/base/plugin/Plugin.py +++ b/opt/ahenk/base/plugin/Plugin.py @@ -16,11 +16,12 @@ class Plugin(threading.Thread): def run(): try: task=self.InQueue.get() - command = self.pluginManager.findCommand(task.getCommandId()) + command = self.pluginManager.findCommand(self.getName(),task.getCommandId()) command.handle_task(task) # TODO add result to response queue except Exception as e: + #TODO error log here print("exception occured when executing plugin") def getName(self): diff --git a/opt/ahenk/base/plugin/PluginManager.py b/opt/ahenk/base/plugin/PluginManager.py index 2e35deb..caee36c 100644 --- a/opt/ahenk/base/plugin/PluginManager.py +++ b/opt/ahenk/base/plugin/PluginManager.py @@ -23,7 +23,7 @@ class PluginManager(object): for pname in possibleplugins: location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pname) if not os.path.isdir(location) or not self.configManager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir(location): - #TODO debug log here + self.logger.debug('It is not a plugin location - %s - ',location) continue try: self.loadSinglePlugin(pname) @@ -39,10 +39,34 @@ class PluginManager(object): self.plugins.append(plugin) def findCommand(self,pluginName,commandId): - location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pname) + location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pluginName) if os.path.dir(location) and commandId + ".py" in os.listdir(location): info = imp.find_module(commandId, [location]) return imp.load_module(commandId, *info) else: self.logger.warning('Command id - %s - not found',commandId) return None + + def processTask(self,task): + try: + if task.getPluginId().lower() in self.pluginQueueDict : + self.pluginQueueDict[task.getPluginId().lower()].put(task,task.priority) + except Exception as e: + # TODO error log here + # TODO update task - status to not found command + + def reloadPlugins(self): + # Not implemented yet + pass + + def checkPluginExists(self,pluginName): + # Not implemented yet + pass + + def reloadSinglePlugin(self,pluginName): + # Not implemented yet + pass + + def checkCommandExist(self,pluginName,commandId): + # Not implemented yet + pass diff --git a/opt/ahenk/base/task/TaskManager.py b/opt/ahenk/base/task/TaskManager.py index e011f1b..874ce07 100644 --- a/opt/ahenk/base/task/TaskManager.py +++ b/opt/ahenk/base/task/TaskManager.py @@ -1,9 +1,37 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- # Author: İsmail BAŞARAN +from base.Scope import Scope class TaskManager(object): """docstring for TaskManager""" def __init__(self): super(TaskManager, self).__init__() - + scope = Scope.getInstance() + self.pluginManager = scope.getPluginManager() + self.logger= scope.getLogger() + + def addTask(self,task): + try: + # TODO add log + # TODO save task to database + # TODO send task received message + self.pluginManager.processTask(task) + except Exception as e: + # TODO error log here + + def saveTask(self,task): + # TODO not implemented yet + # task reveiced to ahenk save to db firstly. + # if user close before processing task you can load from db for process + pass + + def updateTask(self,task): + # TODO not implemented yet + # This is updates task status processing - processed ... + pass + + def deleteTask(self,task): + # TODO not implemented yet + # remove task if it is processed + pass From 8939ac4a36b9006ee6c337cef6bcb5f3a314b537 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Wed, 2 Mar 2016 15:45:36 +0200 Subject: [PATCH 4/5] plugin and task manager --- opt/ahenk/base/Scope.py | 14 ++++++++++++++ opt/ahenk/base/plugin/PluginManager.py | 8 +++++--- opt/ahenk/base/plugin/PluginQueue.py | 4 ++-- opt/ahenk/base/task/TaskManager.py | 1 + 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/opt/ahenk/base/Scope.py b/opt/ahenk/base/Scope.py index 7125c80..41259f0 100644 --- a/opt/ahenk/base/Scope.py +++ b/opt/ahenk/base/Scope.py @@ -14,6 +14,8 @@ class Scope(object): self.configurationManager=None self.messageManager=None self.logger=None + self.pluginManager=None + self.taskManager=None @staticmethod def getInstance(): @@ -50,3 +52,15 @@ class Scope(object): def setMessageManager(self,messageManager): self.messageManager = messageManager + + def getPluginManager(self): + return self.pluginManager + + def setPluginManager(self,pluginManager): + self.pluginManager = pluginManager + + def getTaskManager(self): + return self.TaskManager + + def setTaskManager(self,taskManager): + self.taskManager = taskManager diff --git a/opt/ahenk/base/plugin/PluginManager.py b/opt/ahenk/base/plugin/PluginManager.py index caee36c..5e8180c 100644 --- a/opt/ahenk/base/plugin/PluginManager.py +++ b/opt/ahenk/base/plugin/PluginManager.py @@ -17,18 +17,19 @@ class PluginManager(object): self.pluginQueueDict = dict() self.logger = scope.getLogger() - def loadPlugins(): + def loadPlugins(self): self.plugins = [] possibleplugins = os.listdir(self.configManager.get("PLUGIN", "pluginFolderPath")) for pname in possibleplugins: location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pname) if not os.path.isdir(location) or not self.configManager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir(location): - self.logger.debug('It is not a plugin location - %s - ',location) + self.logger.debug('It is not a plugin location - - ') continue try: self.loadSinglePlugin(pname) except Exception as e: # TODO error log + pass def loadSinglePlugin(self,pluginName): # TODO check already loaded plugin @@ -44,7 +45,7 @@ class PluginManager(object): info = imp.find_module(commandId, [location]) return imp.load_module(commandId, *info) else: - self.logger.warning('Command id - %s - not found',commandId) + self.logger.warning('Command id - - not found') return None def processTask(self,task): @@ -54,6 +55,7 @@ class PluginManager(object): except Exception as e: # TODO error log here # TODO update task - status to not found command + pass def reloadPlugins(self): # Not implemented yet diff --git a/opt/ahenk/base/plugin/PluginQueue.py b/opt/ahenk/base/plugin/PluginQueue.py index df7a961..3d97edc 100644 --- a/opt/ahenk/base/plugin/PluginQueue.py +++ b/opt/ahenk/base/plugin/PluginQueue.py @@ -2,9 +2,9 @@ # -*- coding: utf-8 -*- # Author: İsmail BAŞARAN -import Queue +from queue import Queue -class PluginQueue(Queue.PriorityQueue): +class PluginQueue(Queue): def __contains__(self, item): with self.mutex: return item in self.queue diff --git a/opt/ahenk/base/task/TaskManager.py b/opt/ahenk/base/task/TaskManager.py index 874ce07..82ab0bd 100644 --- a/opt/ahenk/base/task/TaskManager.py +++ b/opt/ahenk/base/task/TaskManager.py @@ -19,6 +19,7 @@ class TaskManager(object): self.pluginManager.processTask(task) except Exception as e: # TODO error log here + pass def saveTask(self,task): # TODO not implemented yet From 3956c0539bb2f7bacabe99e531f8fbe24eb19dc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Wed, 2 Mar 2016 17:20:42 +0200 Subject: [PATCH 5/5] message response queue and added classes to ahenkd service --- opt/ahenk/ahenkd.py | 47 ++++++++++++++----- opt/ahenk/base/Scope.py | 7 +++ .../base/messaging/MessageResponseQueue.py | 2 +- opt/ahenk/base/plugin/PluginManager.py | 4 ++ 4 files changed, 46 insertions(+), 14 deletions(-) diff --git a/opt/ahenk/ahenkd.py b/opt/ahenk/ahenkd.py index de8de8a..8bcdac8 100644 --- a/opt/ahenk/ahenkd.py +++ b/opt/ahenk/ahenkd.py @@ -7,10 +7,12 @@ from base.deamon.BaseDeamon import BaseDaemon from base.logger.AhenkLogger import Logger from base.Scope import Scope from base.messaging.Messaging import Messaging +from base.messaging.MessageResponseQueue import MessageResponseQueue +from base.plugin.PluginManager import PluginManager +from base.task.TaskManager import TaskManager from multiprocessing import Process from threading import Thread -import sys,logging -import time +import sys,logging,queue,time class AhenkDeamon(BaseDaemon): @@ -29,22 +31,41 @@ class AhenkDeamon(BaseDaemon): config = configManager.read() globalscope.setConfigurationManager(config) + # Logger must be second logger = Logger() - logger.info("obaraaa") + logger.info("this is info log") globalscope.setLogger(logger) + pluginManager = PluginManager() + pluginManager.loadPlugins() + globalscope.setPluginManager(pluginManager) - xmpp = Messaging() - print("xmpp is created") - p = Process(target=xmpp.connect_to_server) - print("Process thread starting") - p.start() - print("Process tread started") - print("waiting 5sn ") - time.sleep(5) - print("sleep is over ") - xmpp.send_direct_message("asdasdas")# not working ->connection error + taskManger = TaskManager() + globalscope.setTaskManager(taskManger) + # add services after this line + + """ + xmpp = Messaging() + print("xmpp is created") + p = Process(target=xmpp.connect_to_server) + print("Process thread starting") + p.start() + print("Process tread started") + print("waiting 5sn ") + time.sleep(5) + print("sleep is over ") + xmpp.send_direct_message("asdasdas")# not working ->connection error + """ + + """ + this is must be created after message services + responseQueue = queue.Queue() + messageResponseQueue = MessageResponseQueue(responseQueue) + messageResponseQueue.setDaemon(True) + messageResponseQueue.start() + globalscope.setResponseQueue(responseQueue) + """ if __name__ == '__main__': diff --git a/opt/ahenk/base/Scope.py b/opt/ahenk/base/Scope.py index 41259f0..b5e98f9 100644 --- a/opt/ahenk/base/Scope.py +++ b/opt/ahenk/base/Scope.py @@ -16,6 +16,7 @@ class Scope(object): self.logger=None self.pluginManager=None self.taskManager=None + self.responseQueue=None @staticmethod def getInstance(): @@ -64,3 +65,9 @@ class Scope(object): def setTaskManager(self,taskManager): self.taskManager = taskManager + + def getResponseQueue(self): + return self.responseQueue + + def setResponseQueue(self,responseQueue): + self.responseQueue=responseQueue diff --git a/opt/ahenk/base/messaging/MessageResponseQueue.py b/opt/ahenk/base/messaging/MessageResponseQueue.py index d382b71..e8b316b 100644 --- a/opt/ahenk/base/messaging/MessageResponseQueue.py +++ b/opt/ahenk/base/messaging/MessageResponseQueue.py @@ -24,6 +24,6 @@ class MessageResponseQueue(threading.Thread): print(item) # Call message manager for response self.messageManager.sendResponse(responseMessage) - self.outQueue.task_done() + #self.outQueue.task_done() except: pass diff --git a/opt/ahenk/base/plugin/PluginManager.py b/opt/ahenk/base/plugin/PluginManager.py index 5e8180c..4a0fd1e 100644 --- a/opt/ahenk/base/plugin/PluginManager.py +++ b/opt/ahenk/base/plugin/PluginManager.py @@ -18,6 +18,7 @@ class PluginManager(object): self.logger = scope.getLogger() def loadPlugins(self): + print("loading") self.plugins = [] possibleplugins = os.listdir(self.configManager.get("PLUGIN", "pluginFolderPath")) for pname in possibleplugins: @@ -72,3 +73,6 @@ class PluginManager(object): def checkCommandExist(self,pluginName,commandId): # Not implemented yet pass + + def printQueueSize(self): + print("size " + str(len(self.pluginQueueDict)))