This commit is contained in:
Volkan Şahin 2016-04-06 14:28:38 +03:00
commit 94d00c420c
13 changed files with 348 additions and 85 deletions

View file

@ -24,6 +24,7 @@ from base.messaging.Messaging import Messaging
from base.plugin.PluginManager import PluginManager
from base.registration.Registration import Registration
from base.task.TaskManager import TaskManager
from base.scheduler.scheduler_factory import SchedulerFactory
pidfilePath = '/var/run/ahenk.pid'
@ -31,10 +32,96 @@ pidfilePath = '/var/run/ahenk.pid'
class AhenkDeamon(BaseDaemon):
"""docstring for AhenkDeamon"""
def reload(self, msg):
def reload(self):
# reload service here
pass
def init_logger(self):
logger = Logger()
logger.info('[AhenkDeamon] Log was set')
Scope.getInstance().setLogger(logger)
return logger
def init_config_manager(self, configFilePath, configfileFolderPath):
configManager = ConfigManager(configFilePath, configfileFolderPath)
config = configManager.read()
Scope.getInstance().setConfigurationManager(config)
return config
def init_scheduler(self):
scheduler_ins = SchedulerFactory.get_intstance()
scheduler_ins.initialize()
Scope.getInstance().set_scheduler(scheduler_ins)
sc_thread = threading.Thread(target=scheduler_ins.run)
sc_thread.setDaemon(True)
sc_thread.start()
return scheduler_ins
def init_event_manager(self):
eventManager = EventManager()
Scope.getInstance().setEventManager(eventManager)
return eventManager
def init_ahenk_db(self):
db_service = AhenkDbService()
db_service.connect()
db_service.initialize_table()
Scope.getInstance().setDbService(db_service)
return db_service
def init_messaging(self):
messageManager = Messaging()
Scope.getInstance().setMessageManager(messageManager)
return messageManager
def init_plugin_manager(self):
pluginManager = PluginManager()
pluginManager.loadPlugins()
Scope.getInstance().setPluginManager(pluginManager)
return pluginManager
def init_task_manager(self):
taskManager = TaskManager()
Scope.getInstance().setTaskManager(taskManager)
return taskManager
def init_registration(self):
registration = Registration()
Scope.getInstance().setRegistration(registration)
return registration
def init_execution_manager(self):
execution_manager = ExecutionManager()
Scope.getInstance().setExecutionManager(execution_manager)
return execution_manager
def init_messager(self):
messager = Messager()
messanger_thread = threading.Thread(target=messager.connect_to_server)
messanger_thread.start()
while messager.is_connected() is False:
time.sleep(1)
time.sleep(5)
Scope.getInstance().setMessager(messager)
return messager
def init_message_response_queue(self):
responseQueue = queue.Queue()
messageResponseQueue = MessageResponseQueue(responseQueue)
messageResponseQueue.setDaemon(True)
messageResponseQueue.start()
Scope.getInstance().setResponseQueue(responseQueue)
return responseQueue
def check_registration(self):
# TODO restrict number of attemption
while Scope.getInstance().getRegistration().is_registered() is False:
print('registration need')
Scope.getInstance().getLogger().debug('[AhenkDeamon] Attempting to register')
Scope.getInstance().getRegistration().registration_request()
def run(self):
print('Ahenk running...')
@ -45,65 +132,41 @@ class AhenkDeamon(BaseDaemon):
configfileFolderPath = '/etc/ahenk/config.d/'
# configuration manager must be first load
configManager = ConfigManager(configFilePath, configfileFolderPath)
config = configManager.read()
globalscope.setConfigurationManager(config)
self.init_config_manager(configFilePath, configfileFolderPath)
# Logger must be second
logger = Logger()
logger.info('[AhenkDeamon] Log was set')
globalscope.setLogger(logger)
logger = self.init_logger()
eventManager = EventManager()
globalscope.setEventManager(eventManager)
self.init_event_manager()
logger.info('[AhenkDeamon] Event Manager was set')
db_service = AhenkDbService()
db_service.connect()
db_service.initialize_table()
globalscope.setDbService(db_service)
logger.info('[AhenkDeamon] Data Base Service was set')
self.init_ahenk_db()
logger.info('[AhenkDeamon] DataBase Service was set')
messageManager = Messaging()
globalscope.setMessageManager(messageManager)
self.init_messaging()
logger.info('[AhenkDeamon] Message Manager was set')
pluginManager = PluginManager()
pluginManager.loadPlugins()
globalscope.setPluginManager(pluginManager)
self.init_plugin_manager()
logger.info('[AhenkDeamon] Plugin Manager was set')
taskManager = TaskManager()
globalscope.setTaskManager(taskManager)
self.init_task_manager()
logger.info('[AhenkDeamon] Task Manager was set')
registration = Registration()
globalscope.setRegistration(registration)
#self.init_registration()
logger.info('[AhenkDeamon] Registration was set')
execution_manager = ExecutionManager()
globalscope.setExecutionManager(execution_manager)
self.init_execution_manager()
logger.info('[AhenkDeamon] Execution Manager was set')
# TODO restrict number of attemption
while registration.is_registered() is False:
print('registration need')
logger.debug('[AhenkDeamon] Attempting to register')
registration.registration_request()
#self.check_registration()
logger.info('[AhenkDeamon] Ahenk is registered')
messager = Messager()
messanger_thread = threading.Thread(target=messager.connect_to_server)
messanger_thread.start()
while messager.is_connected() is False:
time.sleep(1)
time.sleep(5)
globalscope.setMessager(messager)
messager = self.init_messager()
logger.info('[AhenkDeamon] Messager was set')
self.init_message_response_queue()
# if registration.is_ldap_registered() is False:
# logger.debug('[AhenkDeamon] Attempting to registering ldap')
# registration.ldap_registration_request() #TODO work on message
@ -127,14 +190,6 @@ class AhenkDeamon(BaseDaemon):
messager.send_direct_message('test')
responseQueue = queue.Queue()
messageResponseQueue = MessageResponseQueue(responseQueue)
messageResponseQueue.setDaemon(True)
messageResponseQueue.start()
globalscope.setResponseQueue(responseQueue)
while True:
time.sleep(1)

View file

@ -22,6 +22,7 @@ class Scope(object):
self.executionManager = None
self.dbService = None
self.messager = None
self.scheduler = None
@staticmethod
def getInstance():
@ -106,3 +107,9 @@ class Scope(object):
def setMessager(self, messager):
self.messager = messager
def set_scheduler(self, scheduler):
self.scheduler = scheduler
def get_scheduler(self):
return self.scheduler

View file

@ -11,6 +11,9 @@ class Task(object):
"""docstring for Task"""
def __init__(self, message):
#TODO message must be json !!! otherwise we can not use task json methods!
#Remove if condition and change message param to json task type.
if message:
self.task = message['task']
@property
@ -41,12 +44,25 @@ class Task(object):
def plugin(self):
return Plugin(self.task['plugin'])
@property
def cron_str(self):
return '1 * * * *' #TODO update when task cron field added.
def to_string(self):
return str(self.task)
def to_json(self):
return json.load(self.task)
return json.dumps(self.task)
def from_json(self,json_value):
self.task = json.load(json_value)
@property
def obj_name(self):
return "TASK"
def cols(self):
return ['id', 'create_date', 'modify_date', 'command_cls_id', 'parameter_map', 'deleted', 'plugin']
def values(self):
return [str(self.id), str(self.create_date), str(self.modify_date), str(self.command_cls_id), str(self.parameter_map), str(self.deleted), self.plugin.to_string()]

View file

@ -1,28 +0,0 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
from base.scheduler.BaseScheduler import BaseScheduler
class APSchedulerImpl(BaseScheduler):
def initialize(self):
# Not implemented yet
pass
def add_job(self):
# Not implemented yet
pass
def add_job_by_hour(self):
# Not implemented yet
pass
def add_job_by_mount(self):
# Not implemented yet
pass
def add_job_by_minute(self):
# Not implemented yet
pass

View file

View file

@ -0,0 +1,7 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Some utility classes / functions first
class AllMatch(set):
"""Universal set - match everything"""
def __contains__(self, item): return True

View file

@ -0,0 +1,64 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
from base.scheduler.base_scheduler import BaseScheduler
from base.Scope import Scope
from base.scheduler.custom.scheduledb import ScheduleTaskDB
from base.scheduler.custom.schedule_job import ScheduleTaskJob
from datetime import datetime, timedelta
import time
class CustomScheduler(BaseScheduler):
def __init__(self):
self.events = []
self.keep_run = True
self.logger = Scope.getInstance().getLogger()
self.scheduledb = ScheduleTaskDB()
def initialize(self):
self.scheduledb.initialize()
tasks = self.scheduledb.load()
for task in tasks:
self.add_job(ScheduleTaskJob(task))
def add_job(self, job):
self.events.append(job)
def save_and_add_job(self, task):
self.scheduledb.save(task)
self.events.append(ScheduleTaskJob(task))
def remove_job(self, task):
for event in self.events:
if event.task.id == task.id:
self.scheduledb.delete(task)
self.events.remove(event)
def remove_job_via_task_id(self,task_id):
for event in self.events:
if event.task.id == task_id:
self.scheduledb.delete(event.task)
self.events.remove(event)
def stop(self):
self.keep_run = False
def list_schedule_tasks(self):
return self.scheduledb.load()
def run(self):
t = datetime(*datetime.now().timetuple()[:5])
while 1 and self.keep_run:
for e in self.events:
e.check(t)
t += timedelta(minutes=1)
while datetime.now() < t:
time.sleep((t - datetime.now()).seconds)

View file

@ -0,0 +1,87 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
from base.scheduler.custom.all_match import AllMatch
from base.Scope import Scope
class ScheduleTaskJob(object):
def __init__(self, task):
self.task = task
cron_sj = self.parse_cron_str(task.cron_str)
if cron_sj:
self.mins = self.conv_to_set(cron_sj[0])
self.hours= self.conv_to_set(cron_sj[1])
self.days = self.conv_to_set(cron_sj[2])
self.months = self.conv_to_set(cron_sj[3])
self.dow = self.conv_to_set(cron_sj[4])
self.action = self.processTask
scope = Scope.getInstance()
self.logger = scope.getLogger()
self.task_manager = scope.getTaskManager()
def processTask(self):
try:
self.task_manager.addTask(self.task)
if self.is_single_shot():
Scope.getInstance().get_scheduler().remove_job(self.task)
except Exception as e:
self.logger.error(e)
def parse_cron_str(self, cron_str):
try:
cron_exp_arr = cron_str.split(" ")
cron_sj = []
count = 0
for exp in cron_exp_arr:
if exp.isdigit():
cron_sj.append(exp)
elif '*' == exp:
cron_sj.append(AllMatch())
elif '/' in exp:
range_val = int(exp.split("/")[1])
if count == 0:
cron_sj.append(range(0, 60, range_val))
elif count == 1:
cron_sj.append(range(0, 24, range_val))
elif count == 2:
cron_sj.append(range(0, 7, range_val))
elif count == 3:
cron_sj.append(range(0, 12, range_val))
elif count == 3:
cron_sj.append(range(0, 7, range_val))
else:
print("it is not supported")
elif ',' in exp:
cron_sj.append("(" + str(exp) + ")")
else:
print("it is not supported")
count = count + 1
return cron_sj
except Exception as e:
self.logger.error(str(e))
def conv_to_set(obj):
if isinstance(obj, (int)):
return set([obj])
if not isinstance(obj, set):
obj = set(obj)
return obj
def is_single_shot(self):
if '*' in self.task.cron_str:
return True
else:
return False
def matchtime(self, t):
"""Return True if this event should trigger at the specified datetime"""
return ((t.minute in self.mins) and
(t.hour in self.hours) and
(t.day in self.days) and
(t.month in self.months) and
(t.weekday() in self.dow))
def check(self, t):
if self.matchtime(t):
self.action(*self.args, **self.kwargs)

View file

@ -0,0 +1,50 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
from base.Scope import Scope
from base.model.Task import Task
class ScheduleTaskDB(object):
def __init__(self):
scope = Scope.getInstance()
self.logger = scope.getLogger()
self.db_service = scope.getDbService()
def initialize(self):
self.logger.debug('[ScheduleTaskDB] Initializing scheduler database...')
self.db_service.check_and_create_table('schedule_task', ['id INTEGER', 'task_json TEXT'])
self.logger.debug('[ScheduleTaskDB] Scheduler database is ok.')
def save(self, task):
self.logger.debug('[ScheduleTaskDB] Preparing schedule task for save operation... creating columns and values...')
cols = ['id', 'task_json']
values = [str(task.id), str(task.to_json())]
self.logger.debug('[ScheduleTaskDB] Saving scheduler task to db... ')
self.db_service.update('schedule_task', cols, values, None)
self.logger.debug('[ScheduleTaskDB] Scheduler task saved.')
def delete(self, task):
try:
self.logger.debug('[ScheduleTaskDB] Deleting schedule task. Task id=' + str(task.id))
self.db_service.delete('schedule_task', 'id=' + str(task.id))
self.logger.debug('[ScheduleTaskDB] Deleting schedule task deleted successfully. task id=' + str(task.id))
except Exception as e:
self.logger.error('[ScheduleTaskDB] Exception occur when deleting schedule task ' + str(e))
def load(self):
try:
self.logger.debug('[ScheduleTaskDB] Loading schedule tasks...')
rows = self.db_service.select('schedule_task')
tasks = []
for row in rows:
task_json = row['task_json']
task = Task(None)
task.from_json(task_json)
tasks.append(task)
self.logger.debug(
'[ScheduleTaskDB] Schedule tasks loaded successfully. Schedule Tasks size=' + str(len(tasks)))
return tasks
except Exception as e:
self.logger.error('[ScheduleTaskDB] Exception occur when loading schedule tasks! ' + str(e))

View file

@ -2,9 +2,10 @@
# -*- coding: utf-8 -*-
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
from base.scheduler.APSchedulerImpl import APSchedulerImpl
from base.scheduler.custom.custom_scheduler import CustomScheduler
class SchedulerFactory():
def get_intstance(self):
return APSchedulerImpl()
return CustomScheduler()
get_intstance = staticmethod(get_intstance)

View file

@ -15,14 +15,18 @@ class TaskManager(object):
self.pluginManager = scope.getPluginManager()
self.logger = scope.getLogger()
self.db_service = scope.getDbService()
self.scheduler = scope.get_scheduler()
def addTask(self, task):
try:
if not task.cron_str == None or task.cron_str == '':
self.logger.debug('Adding task ... ')
self.saveTask(task)
self.logger.info('Task saved ')
# TODO send task received message
self.pluginManager.processTask(task)
else:
self.scheduler.save_and_add_job(task)
except Exception as e:
# TODO error log here