This commit is contained in:
Volkan Şahin 2016-03-02 18:35:59 +02:00
commit 092cb69d43
9 changed files with 170 additions and 24 deletions

View file

@ -6,14 +6,15 @@ from base.config.ConfigManager import ConfigManager
from base.deamon.BaseDeamon import BaseDaemon
from base.logger.AhenkLogger import Logger
from base.Scope import Scope
from base.messaging.Messaging import Messaging
from base.messaging.MessageReceiver import MessageReceiver
from base.messaging.MessageSender import MessageSender
from base.registration.Registration import Registration
from base.messaging.MessageResponseQueue import MessageResponseQueue
from base.plugin.PluginManager import PluginManager
from base.task.TaskManager import TaskManager
from multiprocessing import Process
from threading import Thread
import sys,logging
import time
import sys,logging,queue,time
class AhenkDeamon(BaseDaemon):
@ -33,26 +34,44 @@ class AhenkDeamon(BaseDaemon):
config = configManager.read()
globalscope.setConfigurationManager(config)
# Logger must be second
logger = Logger()
logger.debug("[AhenkDeamon]logging")
logger.info("this is info log")
globalscope.setLogger(logger)
pluginManager = PluginManager()
pluginManager.loadPlugins()
globalscope.setPluginManager(pluginManager)
taskManger = TaskManager()
globalscope.setTaskManager(taskManger)
registration = Registration()
registration.register()
"""
#TODO send register message according to register status
print("sending registration message")
message_sender = MessageSender(registration.get_registration_message())
message_sender.connect_to_server()
print("registration message were sent")
#TODO add sender to scope
#TODO add sender to scope
message_receiver = MessageReceiver()
rec_process = Process(target=message_receiver.connect_to_server)
rec_process.start()
print("receiver online")
#set parameters which will use for message sending
"""
"""
this is must be created after message services
responseQueue = queue.Queue()
messageResponseQueue = MessageResponseQueue(responseQueue)
messageResponseQueue.setDaemon(True)
messageResponseQueue.start()
globalscope.setResponseQueue(responseQueue)
"""
if __name__ == '__main__':

View file

@ -14,6 +14,9 @@ class Scope(object):
self.configurationManager=None
self.messageManager=None
self.logger=None
self.pluginManager=None
self.taskManager=None
self.responseQueue=None
@staticmethod
def getInstance():
@ -50,3 +53,21 @@ class Scope(object):
def setMessageManager(self,messageManager):
self.messageManager = messageManager
def getPluginManager(self):
return self.pluginManager
def setPluginManager(self,pluginManager):
self.pluginManager = pluginManager
def getTaskManager(self):
return self.TaskManager
def setTaskManager(self,taskManager):
self.taskManager = taskManager
def getResponseQueue(self):
return self.responseQueue
def setResponseQueue(self,responseQueue):
self.responseQueue=responseQueue

View file

@ -0,0 +1,11 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
class AhenkDao(object):
"""
Sqlite manager for ahenk
"""
def __init__(self):
super(AhenkDao, self).__init__()

View file

@ -24,6 +24,6 @@ class MessageResponseQueue(threading.Thread):
print(item)
# Call message manager for response
self.messageManager.sendResponse(responseMessage)
self.outQueue.task_done()
#self.outQueue.task_done()
except:
pass

View file

@ -0,0 +1,16 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
class Task(object):
"""docstring for Task"""
def __init__(self,message):
super(Task, self).__init__()
def getPluginId(self):
# Not implemented yet
pass
def getcommandId(self):
# Not implemented yet
pass

View file

@ -16,11 +16,12 @@ class Plugin(threading.Thread):
def run():
try:
task=self.InQueue.get()
command = self.pluginManager.findCommand(task.getCommandId())
command = self.pluginManager.findCommand(self.getName(),task.getCommandId())
command.handle_task(task)
# TODO add result to response queue
except Exception as e:
#TODO error log here
print("exception occured when executing plugin")
def getName(self):

View file

@ -2,38 +2,77 @@
# -*- coding: utf-8 -*-
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
from base.plugin.Plugin import Plugin
from base.plugin.PluginQueue import PluginQueue
from base.Scope import Scope
import imp,os
class PluginManager(object):
"""docstring for PluginManager"""
#implement logger
def __init__(self, configManager):
def __init__(self):
super(PluginManager, self).__init__()
self.configManager = configManager
scope = Scope.getInstance()
self.configManager = scope.getConfigurationManager()
self.plugins = []
self.pluginQueueDict = dict()
self.logger = scope.getLogger()
def loadPlugins():
def loadPlugins(self):
print("loading")
self.plugins = []
possibleplugins = os.listdir(self.configManager.get("PLUGIN", "pluginFolderPath"))
for pname in possibleplugins:
location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pname)
if not os.path.isdir(location) or not self.configManager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir(location):
self.logger.debug('It is not a plugin location - - ')
continue
info = imp.find_module(self.configManager.get("PLUGIN", "mainModuleName"), [location])
#mainModule = self.loadSinglePlugin(info):
self.plugins.append(Plugin(pname,mainModule))
try:
self.loadSinglePlugin(pname)
except Exception as e:
# TODO error log
pass
def createQueueForPlugin(self):
def loadSinglePlugin(self,pluginName):
# TODO check already loaded plugin
self.pluginQueueDict[pluginName]=PluginQueue()
plugin = Plugin(pluginName,self.pluginQueueDict[pluginName])
plugin.setDaemon(True)
plugin.start()
self.plugins.append(plugin)
def findCommand(self,pluginName,commandId):
location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pluginName)
if os.path.dir(location) and commandId + ".py" in os.listdir(location):
info = imp.find_module(commandId, [location])
return imp.load_module(commandId, *info)
else:
self.logger.warning('Command id - - not found')
return None
def loadSinglePlugin(self,pluginInfo):
return imp.load_module(self.configManager.get("PLUGIN", "mainModuleName"), *pluginInfo)
def processTask(self,task):
try:
if task.getPluginId().lower() in self.pluginQueueDict :
self.pluginQueueDict[task.getPluginId().lower()].put(task,task.priority)
except Exception as e:
# TODO error log here
# TODO update task - status to not found command
pass
def findSinglePlugin(self,pluginName):
for plugin in self.getPlugins():
if plugin["name"] == self.plugins:
return self.loadSinglePlugin(plugin)
def findCommand(self,comamndName):
def reloadPlugins(self):
# Not implemented yet
pass
def checkPluginExists(self,pluginName):
# Not implemented yet
pass
def reloadSinglePlugin(self,pluginName):
# Not implemented yet
pass
def checkCommandExist(self,pluginName,commandId):
# Not implemented yet
pass
def printQueueSize(self):
print("size " + str(len(self.pluginQueueDict)))

View file

@ -0,0 +1,10 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
from queue import Queue
class PluginQueue(Queue):
def __contains__(self, item):
with self.mutex:
return item in self.queue

View file

@ -1,9 +1,38 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
from base.Scope import Scope
class TaskManager(object):
"""docstring for TaskManager"""
def __init__(self):
super(TaskManager, self).__init__()
scope = Scope.getInstance()
self.pluginManager = scope.getPluginManager()
self.logger= scope.getLogger()
def addTask(self,task):
try:
# TODO add log
# TODO save task to database
# TODO send task received message
self.pluginManager.processTask(task)
except Exception as e:
# TODO error log here
pass
def saveTask(self,task):
# TODO not implemented yet
# task reveiced to ahenk save to db firstly.
# if user close before processing task you can load from db for process
pass
def updateTask(self,task):
# TODO not implemented yet
# This is updates task status processing - processed ...
pass
def deleteTask(self,task):
# TODO not implemented yet
# remove task if it is processed
pass