From 75ce3d84fabd645d80efdaf378799063c8614d98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Tue, 5 Apr 2016 14:30:12 +0300 Subject: [PATCH 1/8] custom scheduler impl --- opt/ahenk/base/scheduler/APSchedulerImpl.py | 28 ------- opt/ahenk/base/scheduler/__init__.py | 0 .../{BaseScheduler.py => base_scheduler.py} | 0 opt/ahenk/base/scheduler/custom/__init__.py | 0 opt/ahenk/base/scheduler/custom/all_match.py | 7 ++ .../base/scheduler/custom/custom_scheduler.py | 40 ++++++++++ .../base/scheduler/custom/schedule_job.py | 80 +++++++++++++++++++ opt/ahenk/base/scheduler/custom/scheduledb.py | 18 +++++ ...hedulerFactory.py => scheduler_factory.py} | 4 +- 9 files changed, 147 insertions(+), 30 deletions(-) delete mode 100644 opt/ahenk/base/scheduler/APSchedulerImpl.py create mode 100644 opt/ahenk/base/scheduler/__init__.py rename opt/ahenk/base/scheduler/{BaseScheduler.py => base_scheduler.py} (100%) create mode 100644 opt/ahenk/base/scheduler/custom/__init__.py create mode 100644 opt/ahenk/base/scheduler/custom/all_match.py create mode 100644 opt/ahenk/base/scheduler/custom/custom_scheduler.py create mode 100644 opt/ahenk/base/scheduler/custom/schedule_job.py create mode 100644 opt/ahenk/base/scheduler/custom/scheduledb.py rename opt/ahenk/base/scheduler/{SchedulerFactory.py => scheduler_factory.py} (65%) diff --git a/opt/ahenk/base/scheduler/APSchedulerImpl.py b/opt/ahenk/base/scheduler/APSchedulerImpl.py deleted file mode 100644 index d75eed9..0000000 --- a/opt/ahenk/base/scheduler/APSchedulerImpl.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/python3 -# -*- coding: utf-8 -*- -# Author: İsmail BAŞARAN - -from base.scheduler.BaseScheduler import BaseScheduler - -class APSchedulerImpl(BaseScheduler): - - def initialize(self): - # Not implemented yet - pass - - def add_job(self): - # Not implemented yet - pass - - def add_job_by_hour(self): - # Not implemented yet - pass - - def add_job_by_mount(self): - # Not implemented yet - pass - - def add_job_by_minute(self): - # Not implemented yet - pass - diff --git a/opt/ahenk/base/scheduler/__init__.py b/opt/ahenk/base/scheduler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/opt/ahenk/base/scheduler/BaseScheduler.py b/opt/ahenk/base/scheduler/base_scheduler.py similarity index 100% rename from opt/ahenk/base/scheduler/BaseScheduler.py rename to opt/ahenk/base/scheduler/base_scheduler.py diff --git a/opt/ahenk/base/scheduler/custom/__init__.py b/opt/ahenk/base/scheduler/custom/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/opt/ahenk/base/scheduler/custom/all_match.py b/opt/ahenk/base/scheduler/custom/all_match.py new file mode 100644 index 0000000..f4c549d --- /dev/null +++ b/opt/ahenk/base/scheduler/custom/all_match.py @@ -0,0 +1,7 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +# Some utility classes / functions first +class AllMatch(set): + """Universal set - match everything""" + def __contains__(self, item): return True \ No newline at end of file diff --git a/opt/ahenk/base/scheduler/custom/custom_scheduler.py b/opt/ahenk/base/scheduler/custom/custom_scheduler.py new file mode 100644 index 0000000..de8603a --- /dev/null +++ b/opt/ahenk/base/scheduler/custom/custom_scheduler.py @@ -0,0 +1,40 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# Author: İsmail BAŞARAN + +from base.scheduler.base_scheduler import BaseScheduler +from base.Scope import Scope +from datetime import datetime, timedelta +import time + + +class CustomScheduler(BaseScheduler): + + def __init__(self): + self.events = [] + self.keep_run = True + self.logger = Scope.getInstance().getLogger() + + def initialize(self): + # Implement this from your implementation class + pass + + def add_job(self,job): + self.events.append(job) + + def stop(self): + self.keep_run = False + + def run(self): + t = datetime(*datetime.now().timetuple()[:5]) + while 1 and self.keep_run: + for e in self.events: + e.check(t) + + t += timedelta(minutes=1) + while datetime.now() < t: + time.sleep((t - datetime.now()).seconds) + + + + diff --git a/opt/ahenk/base/scheduler/custom/schedule_job.py b/opt/ahenk/base/scheduler/custom/schedule_job.py new file mode 100644 index 0000000..b99d9c4 --- /dev/null +++ b/opt/ahenk/base/scheduler/custom/schedule_job.py @@ -0,0 +1,80 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# Author: İsmail BAŞARAN + +from base.scheduler.custom.all_match import AllMatch +from base.Scope import Scope + +class ScheduleTaskJob(object): + def __init__(self, task): + self.task = task + cron_sj = self.parse_cron_str(task.cron_str) + if cron_sj: + self.mins = self.conv_to_set(cron_sj[0]) + self.hours= self.conv_to_set(cron_sj[1]) + self.days = self.conv_to_set(cron_sj[2]) + self.months = self.conv_to_set(cron_sj[3]) + self.dow = self.conv_to_set(cron_sj[4]) + self.action = self.processTask + scope = Scope.getInstance() + self.logger = scope.getLogger() + self.task_manager = scope.getTaskManager() + + def processTask(self): + try: + self.task_manager.addTask(self.task) + except Exception as e: + self.logger.error(e) + + def parse_cron_str(self,cron_str): + try: + cron_exp_arr = cron_str.split(" ") + cron_sj = [] + count = 0 + for exp in cron_exp_arr: + if exp.isdigit(): + cron_sj.append(exp) + elif '*' == exp: + cron_sj.append(AllMatch()) + elif '/' in exp: + range_val = int(exp.split("/")[1]) + if count == 0: + cron_sj.append(range(0, 60, range_val)) + elif count == 1: + cron_sj.append(range(0, 24, range_val)) + elif count == 2: + cron_sj.append(range(0, 7, range_val)) + elif count == 3: + cron_sj.append(range(0, 12, range_val)) + elif count == 3: + cron_sj.append(range(0, 7, range_val)) + else: + print("it is not supported") + elif ',' in exp: + cron_sj.append("(" + str(exp) + ")") + else: + print("it is not supported") + count = count + 1 + return cron_sj + except Exception as e: + self.logger.error(str(e)) + + def conv_to_set(obj): + if isinstance(obj, (int)): + return set([obj]) + if not isinstance(obj, set): + obj = set(obj) + return obj + + + def matchtime(self, t): + """Return True if this event should trigger at the specified datetime""" + return ((t.minute in self.mins) and + (t.hour in self.hours) and + (t.day in self.days) and + (t.month in self.months) and + (t.weekday() in self.dow)) + + def check(self, t): + if self.matchtime(t): + self.action(*self.args, **self.kwargs) \ No newline at end of file diff --git a/opt/ahenk/base/scheduler/custom/scheduledb.py b/opt/ahenk/base/scheduler/custom/scheduledb.py new file mode 100644 index 0000000..e47e6f8 --- /dev/null +++ b/opt/ahenk/base/scheduler/custom/scheduledb.py @@ -0,0 +1,18 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +# Author: İsmail BAŞARAN + + +class ScheduleDB(object): + + def save(self): + pass + + def delete(self): + pass + + def load(self): + pass + + def check_table_exists(self): + pass diff --git a/opt/ahenk/base/scheduler/SchedulerFactory.py b/opt/ahenk/base/scheduler/scheduler_factory.py similarity index 65% rename from opt/ahenk/base/scheduler/SchedulerFactory.py rename to opt/ahenk/base/scheduler/scheduler_factory.py index c6fb0b6..799decc 100644 --- a/opt/ahenk/base/scheduler/SchedulerFactory.py +++ b/opt/ahenk/base/scheduler/scheduler_factory.py @@ -2,9 +2,9 @@ # -*- coding: utf-8 -*- # Author: İsmail BAŞARAN -from base.scheduler.APSchedulerImpl import APSchedulerImpl +from base.scheduler.custom_scheduler_impl import CustomScheduler class SchedulerFactory(): def get_intstance(self): - return APSchedulerImpl() \ No newline at end of file + return CustomScheduler() \ No newline at end of file From 61cac2b507ccfe3826de55aeb1683d56a4f8063a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Tue, 5 Apr 2016 15:23:20 +0300 Subject: [PATCH 2/8] added json fields to task obj --- opt/ahenk/base/model/Task.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/opt/ahenk/base/model/Task.py b/opt/ahenk/base/model/Task.py index a12295b..d47a692 100644 --- a/opt/ahenk/base/model/Task.py +++ b/opt/ahenk/base/model/Task.py @@ -41,12 +41,25 @@ class Task(object): def plugin(self): return Plugin(self.task['plugin']) + @property + def cron_str(self): + return '1 * * * *' #TODO update when task cron field added. + def to_string(self): return str(self.task) def to_json(self): - return json.load(self.task) + return json.dumps(self.task) + + def from_json(self,json_value): + self.task = json.load(json_value) @property def obj_name(self): return "TASK" + + def cols(self): + return ['id', 'create_date', 'modify_date', 'command_cls_id', 'parameter_map', 'deleted', 'plugin'] + + def values(self): + return [str(self.id), str(self.create_date), str(self.modify_date), str(self.command_cls_id), str(self.parameter_map), str(self.deleted), self.plugin.to_string()] From 44891af1729687452279e5a0f130ebe6cb95b969 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Tue, 5 Apr 2016 17:28:22 +0300 Subject: [PATCH 3/8] scheduler db --- .../base/scheduler/custom/custom_scheduler.py | 11 ++-- opt/ahenk/base/scheduler/custom/scheduledb.py | 50 +++++++++++++++---- opt/ahenk/base/scheduler/scheduler_factory.py | 3 +- 3 files changed, 51 insertions(+), 13 deletions(-) diff --git a/opt/ahenk/base/scheduler/custom/custom_scheduler.py b/opt/ahenk/base/scheduler/custom/custom_scheduler.py index de8603a..05b8dda 100644 --- a/opt/ahenk/base/scheduler/custom/custom_scheduler.py +++ b/opt/ahenk/base/scheduler/custom/custom_scheduler.py @@ -4,6 +4,8 @@ from base.scheduler.base_scheduler import BaseScheduler from base.Scope import Scope +from base.scheduler.custom.scheduledb import ScheduleTaskDB +from base.scheduler.custom.schedule_job import ScheduleTaskJob from datetime import datetime, timedelta import time @@ -14,12 +16,15 @@ class CustomScheduler(BaseScheduler): self.events = [] self.keep_run = True self.logger = Scope.getInstance().getLogger() + self.scheduledb = ScheduleTaskDB() def initialize(self): - # Implement this from your implementation class - pass + self.scheduledb.initialize() + tasks = self.scheduledb.load() + for task in tasks: + self.add_job(ScheduleTaskJob(task)) - def add_job(self,job): + def add_job(self, job): self.events.append(job) def stop(self): diff --git a/opt/ahenk/base/scheduler/custom/scheduledb.py b/opt/ahenk/base/scheduler/custom/scheduledb.py index e47e6f8..fa1bc65 100644 --- a/opt/ahenk/base/scheduler/custom/scheduledb.py +++ b/opt/ahenk/base/scheduler/custom/scheduledb.py @@ -2,17 +2,49 @@ # -*- coding: utf-8 -*- # Author: İsmail BAŞARAN +from base.Scope import Scope +from base.model.Task import Task -class ScheduleDB(object): - def save(self): - pass +class ScheduleTaskDB(object): + def __init__(self): + scope = Scope.getInstance() + self.logger = scope.getLogger() + self.db_service = scope.getDbService() - def delete(self): - pass + def initialize(self): + self.logger.debug('[ScheduleTaskDB] Initializing scheduler database...') + self.db_service.check_and_create_table('schedule_task', ['id INTEGER', 'task_json TEXT']) + self.logger.debug('[ScheduleTaskDB] Scheduler database is ok.') + + def save(self, task): + self.logger.debug('[ScheduleTaskDB] Preparing schedule task for save operation... creating columns and values...') + cols = ['id', 'task_json'] + values = [str(task.id), str(task.to_json())] + self.logger.debug('[ScheduleTaskDB] Saving scheduler task to db... ') + self.db_service.update('schedule_task', cols, values, None) + self.logger.debug('[ScheduleTaskDB] Scheduler task saved.') + + def delete(self, task): + try: + self.logger.debug('[ScheduleTaskDB] Deleting schedule task. Task id=' + str(task.id)) + self.db_service.delete('schedule_task', 'id=' + str(task.id)) + self.logger.debug('[ScheduleTaskDB] Deleting schedule task deleted successfully. task id=' + str(task.id)) + except Exception as e: + self.logger.error('[ScheduleTaskDB] Exception occur when deleting schedule task ' + str(e)) def load(self): - pass - - def check_table_exists(self): - pass + try: + self.logger.debug('[ScheduleTaskDB] Loading schedule tasks...') + rows = self.db_service.select('schedule_task') + tasks = [] + for row in rows: + task_json = row['task_json'] + task = Task(None) + task.from_json(task_json) + tasks.append(task) + self.logger.debug( + '[ScheduleTaskDB] Schedule tasks loaded successfully. Schedule Tasks size=' + str(len(tasks))) + return tasks + except Exception as e: + self.logger.error('[ScheduleTaskDB] Exception occur when loading schedule tasks! ' + str(e)) diff --git a/opt/ahenk/base/scheduler/scheduler_factory.py b/opt/ahenk/base/scheduler/scheduler_factory.py index 799decc..a550059 100644 --- a/opt/ahenk/base/scheduler/scheduler_factory.py +++ b/opt/ahenk/base/scheduler/scheduler_factory.py @@ -7,4 +7,5 @@ from base.scheduler.custom_scheduler_impl import CustomScheduler class SchedulerFactory(): def get_intstance(self): - return CustomScheduler() \ No newline at end of file + return CustomScheduler() + get_intstance = staticmethod(get_intstance) \ No newline at end of file From 85a9bcd5a18591ce599eddf6ece18a30d1c5dae3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Tue, 5 Apr 2016 17:55:15 +0300 Subject: [PATCH 4/8] ahenk service init methots --- opt/ahenk/ahenkd.py | 150 ++++++++++++++++++++++++++++++-------------- 1 file changed, 102 insertions(+), 48 deletions(-) diff --git a/opt/ahenk/ahenkd.py b/opt/ahenk/ahenkd.py index 700d26b..6fed261 100755 --- a/opt/ahenk/ahenkd.py +++ b/opt/ahenk/ahenkd.py @@ -24,6 +24,7 @@ from base.messaging.Messaging import Messaging from base.plugin.PluginManager import PluginManager from base.registration.Registration import Registration from base.task.TaskManager import TaskManager +from base.scheduler.scheduler_factory import SchedulerFactory pidfilePath = '/var/run/ahenk.pid' @@ -31,10 +32,95 @@ pidfilePath = '/var/run/ahenk.pid' class AhenkDeamon(BaseDaemon): """docstring for AhenkDeamon""" - def reload(self, msg): + def reload(self): # reload service here pass + def init_logger(self): + logger = Logger() + logger.info('[AhenkDeamon] Log was set') + Scope.getInstance().setLogger(logger) + return logger + + def init_config_manager(self, configFilePath, configfileFolderPath): + configManager = ConfigManager(configFilePath, configfileFolderPath) + config = configManager.read() + Scope.getInstance().setConfigurationManager(config) + return config + + def init_scheduler(self): + scheduler_ins = SchedulerFactory.get_intstance() + scheduler_ins.initialize() + Scope.getInstance().set_scheduler(scheduler_ins) + sc_thread = threading.Thread(target=scheduler_ins.run) + sc_thread.setDaemon(True) + sc_thread.start() + + def init_event_manager(self): + eventManager = EventManager() + Scope.getInstance().setEventManager(eventManager) + return eventManager + + def init_ahenk_db(self): + db_service = AhenkDbService() + db_service.connect() + db_service.initialize_table() + Scope.getInstance().setDbService(db_service) + return db_service + + def init_messaging(self): + messageManager = Messaging() + Scope.getInstance().setMessageManager(messageManager) + return messageManager + + def init_plugin_manager(self): + pluginManager = PluginManager() + pluginManager.loadPlugins() + Scope.getInstance().setPluginManager(pluginManager) + return pluginManager + + def init_task_manager(self): + taskManager = TaskManager() + Scope.getInstance().setTaskManager(taskManager) + return taskManager + + def init_registration(self): + registration = Registration() + Scope.getInstance().setRegistration(registration) + return registration + + def init_execution_manager(self): + execution_manager = ExecutionManager() + Scope.getInstance().setExecutionManager(execution_manager) + return execution_manager + + def init_messager(self): + messager = Messager() + messanger_thread = threading.Thread(target=messager.connect_to_server) + messanger_thread.start() + + while messager.is_connected() is False: + time.sleep(1) + time.sleep(5) + + Scope.getInstance().setMessager(messager) + return messager + + def init_message_response_queue(self): + responseQueue = queue.Queue() + messageResponseQueue = MessageResponseQueue(responseQueue) + messageResponseQueue.setDaemon(True) + messageResponseQueue.start() + Scope.getInstance().setResponseQueue(responseQueue) + return responseQueue + + def check_registration(self): + # TODO restrict number of attemption + while Scope.getInstance().getRegistration().is_registered() is False: + print('registration need') + Scope.getInstance().getLogger().debug('[AhenkDeamon] Attempting to register') + Scope.getInstance().getRegistration().registration_request() + def run(self): print('Ahenk running...') @@ -45,65 +131,41 @@ class AhenkDeamon(BaseDaemon): configfileFolderPath = '/etc/ahenk/config.d/' # configuration manager must be first load - configManager = ConfigManager(configFilePath, configfileFolderPath) - config = configManager.read() - globalscope.setConfigurationManager(config) + self.init_config_manager(configFilePath, configfileFolderPath) # Logger must be second - logger = Logger() - logger.info('[AhenkDeamon] Log was set') - globalscope.setLogger(logger) + logger = self.init_logger() - eventManager = EventManager() - globalscope.setEventManager(eventManager) + self.init_event_manager() logger.info('[AhenkDeamon] Event Manager was set') - db_service = AhenkDbService() - db_service.connect() - db_service.initialize_table() - globalscope.setDbService(db_service) - logger.info('[AhenkDeamon] Data Base Service was set') + self.init_ahenk_db() + logger.info('[AhenkDeamon] DataBase Service was set') - messageManager = Messaging() - globalscope.setMessageManager(messageManager) + self.init_messaging() logger.info('[AhenkDeamon] Message Manager was set') - pluginManager = PluginManager() - pluginManager.loadPlugins() - globalscope.setPluginManager(pluginManager) + self.init_plugin_manager() logger.info('[AhenkDeamon] Plugin Manager was set') - taskManager = TaskManager() - globalscope.setTaskManager(taskManager) + self.init_task_manager() logger.info('[AhenkDeamon] Task Manager was set') - registration = Registration() - globalscope.setRegistration(registration) + #self.init_registration() logger.info('[AhenkDeamon] Registration was set') - execution_manager = ExecutionManager() - globalscope.setExecutionManager(execution_manager) + self.init_execution_manager() logger.info('[AhenkDeamon] Execution Manager was set') - # TODO restrict number of attemption - while registration.is_registered() is False: - print('registration need') - logger.debug('[AhenkDeamon] Attempting to register') - registration.registration_request() - + #self.check_registration() logger.info('[AhenkDeamon] Ahenk is registered') - messager = Messager() - messanger_thread = threading.Thread(target=messager.connect_to_server) - messanger_thread.start() - - while messager.is_connected() is False: - time.sleep(1) - time.sleep(5) - - globalscope.setMessager(messager) + messager = self.init_messager() logger.info('[AhenkDeamon] Messager was set') + self.init_message_response_queue() + + # if registration.is_ldap_registered() is False: # logger.debug('[AhenkDeamon] Attempting to registering ldap') # registration.ldap_registration_request() #TODO work on message @@ -127,14 +189,6 @@ class AhenkDeamon(BaseDaemon): messager.send_direct_message('test') - - responseQueue = queue.Queue() - messageResponseQueue = MessageResponseQueue(responseQueue) - messageResponseQueue.setDaemon(True) - messageResponseQueue.start() - globalscope.setResponseQueue(responseQueue) - - while True: time.sleep(1) From 30689c31ada00175d411dc05c5d6e224588dfad1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Tue, 5 Apr 2016 17:56:54 +0300 Subject: [PATCH 5/8] import statement bugfix --- opt/ahenk/base/scheduler/scheduler_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opt/ahenk/base/scheduler/scheduler_factory.py b/opt/ahenk/base/scheduler/scheduler_factory.py index a550059..047400a 100644 --- a/opt/ahenk/base/scheduler/scheduler_factory.py +++ b/opt/ahenk/base/scheduler/scheduler_factory.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- # Author: İsmail BAŞARAN -from base.scheduler.custom_scheduler_impl import CustomScheduler +from base.scheduler.custom.custom_scheduler import CustomScheduler class SchedulerFactory(): From e35f538387a83939411ea2305230db28f7744480 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Tue, 5 Apr 2016 17:57:35 +0300 Subject: [PATCH 6/8] Task message init None check --- opt/ahenk/base/model/Task.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/opt/ahenk/base/model/Task.py b/opt/ahenk/base/model/Task.py index d47a692..81e810f 100644 --- a/opt/ahenk/base/model/Task.py +++ b/opt/ahenk/base/model/Task.py @@ -11,7 +11,10 @@ class Task(object): """docstring for Task""" def __init__(self, message): - self.task = message['task'] + #TODO message must be json !!! otherwise we can not use task json methods! + #Remove if condition and change message param to json task type. + if message: + self.task = message['task'] @property def id(self): From a68ea26f7925c2b2850f88b8c7dc79584fcebf51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Tue, 5 Apr 2016 18:47:05 +0300 Subject: [PATCH 7/8] updated task manager added function for checking scheduler jobs --- opt/ahenk/base/Scope.py | 7 +++++++ .../base/scheduler/custom/custom_scheduler.py | 10 ++++++++++ opt/ahenk/base/scheduler/custom/schedule_job.py | 9 ++++++++- opt/ahenk/base/task/TaskManager.py | 14 +++++++++----- 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/opt/ahenk/base/Scope.py b/opt/ahenk/base/Scope.py index f02e1f4..d12fd02 100644 --- a/opt/ahenk/base/Scope.py +++ b/opt/ahenk/base/Scope.py @@ -22,6 +22,7 @@ class Scope(object): self.executionManager = None self.dbService = None self.messager = None + self.scheduler = None @staticmethod def getInstance(): @@ -106,3 +107,9 @@ class Scope(object): def setMessager(self, messager): self.messager = messager + + def set_scheduler(self, scheduler): + self.scheduler = scheduler + + def get_scheduler(self): + return self.scheduler \ No newline at end of file diff --git a/opt/ahenk/base/scheduler/custom/custom_scheduler.py b/opt/ahenk/base/scheduler/custom/custom_scheduler.py index 05b8dda..c14ce18 100644 --- a/opt/ahenk/base/scheduler/custom/custom_scheduler.py +++ b/opt/ahenk/base/scheduler/custom/custom_scheduler.py @@ -27,6 +27,16 @@ class CustomScheduler(BaseScheduler): def add_job(self, job): self.events.append(job) + def save_and_add_job(self, task): + self.scheduledb.save(task) + self.events.append(ScheduleTaskJob(task)) + + def remove_job(self,task): + self.scheduledb.delete(task) + for event in self.events: + if event.task.id == task.id: + self.events.remove(event) + def stop(self): self.keep_run = False diff --git a/opt/ahenk/base/scheduler/custom/schedule_job.py b/opt/ahenk/base/scheduler/custom/schedule_job.py index b99d9c4..92a2881 100644 --- a/opt/ahenk/base/scheduler/custom/schedule_job.py +++ b/opt/ahenk/base/scheduler/custom/schedule_job.py @@ -23,10 +23,12 @@ class ScheduleTaskJob(object): def processTask(self): try: self.task_manager.addTask(self.task) + if self.is_single_shot(): + Scope.getInstance().get_scheduler().remove_job(self.task) except Exception as e: self.logger.error(e) - def parse_cron_str(self,cron_str): + def parse_cron_str(self, cron_str): try: cron_exp_arr = cron_str.split(" ") cron_sj = [] @@ -66,6 +68,11 @@ class ScheduleTaskJob(object): obj = set(obj) return obj + def is_single_shot(self): + if '*' in self.task.cron_str: + return True + else: + return False def matchtime(self, t): """Return True if this event should trigger at the specified datetime""" diff --git a/opt/ahenk/base/task/TaskManager.py b/opt/ahenk/base/task/TaskManager.py index 2c5bcc0..fd3e103 100644 --- a/opt/ahenk/base/task/TaskManager.py +++ b/opt/ahenk/base/task/TaskManager.py @@ -15,14 +15,18 @@ class TaskManager(object): self.pluginManager = scope.getPluginManager() self.logger = scope.getLogger() self.db_service = scope.getDbService() + self.scheduler = scope.get_scheduler() def addTask(self, task): try: - self.logger.debug('Adding task ... ') - self.saveTask(task) - self.logger.info('Task saved ') - # TODO send task received message - self.pluginManager.processTask(task) + if not task.cron_str == None or task.cron_str == '': + self.logger.debug('Adding task ... ') + self.saveTask(task) + self.logger.info('Task saved ') + # TODO send task received message + self.pluginManager.processTask(task) + else: + self.scheduler.save_and_add_job(task) except Exception as e: # TODO error log here From a341deefeee97e918367053e685af0b14f4bdd31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0smail=20Ba=C5=9Faran?= Date: Tue, 5 Apr 2016 19:12:40 +0300 Subject: [PATCH 8/8] added remove schedule job via task id method --- opt/ahenk/ahenkd.py | 1 + opt/ahenk/base/scheduler/custom/custom_scheduler.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/opt/ahenk/ahenkd.py b/opt/ahenk/ahenkd.py index 6fed261..355048a 100755 --- a/opt/ahenk/ahenkd.py +++ b/opt/ahenk/ahenkd.py @@ -55,6 +55,7 @@ class AhenkDeamon(BaseDaemon): sc_thread = threading.Thread(target=scheduler_ins.run) sc_thread.setDaemon(True) sc_thread.start() + return scheduler_ins def init_event_manager(self): eventManager = EventManager() diff --git a/opt/ahenk/base/scheduler/custom/custom_scheduler.py b/opt/ahenk/base/scheduler/custom/custom_scheduler.py index c14ce18..f911d13 100644 --- a/opt/ahenk/base/scheduler/custom/custom_scheduler.py +++ b/opt/ahenk/base/scheduler/custom/custom_scheduler.py @@ -31,15 +31,24 @@ class CustomScheduler(BaseScheduler): self.scheduledb.save(task) self.events.append(ScheduleTaskJob(task)) - def remove_job(self,task): - self.scheduledb.delete(task) + def remove_job(self, task): for event in self.events: if event.task.id == task.id: + self.scheduledb.delete(task) + self.events.remove(event) + + def remove_job_via_task_id(self,task_id): + for event in self.events: + if event.task.id == task_id: + self.scheduledb.delete(event.task) self.events.remove(event) def stop(self): self.keep_run = False + def list_schedule_tasks(self): + return self.scheduledb.load() + def run(self): t = datetime(*datetime.now().timetuple()[:5]) while 1 and self.keep_run: