From 49ad8610ecd80b6e81413751be2d9576df02ddba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20=C5=9Eahin?= Date: Tue, 18 Oct 2016 18:12:27 +0300 Subject: [PATCH] updating scheduled task feature added --- opt/ahenk/base/execution/execution_manager.py | 43 ++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/opt/ahenk/base/execution/execution_manager.py b/opt/ahenk/base/execution/execution_manager.py index a625ae3..23c5873 100644 --- a/opt/ahenk/base/execution/execution_manager.py +++ b/opt/ahenk/base/execution/execution_manager.py @@ -3,7 +3,7 @@ # Author: Volkan Şahin import json - +import threading from base.scope import Scope from base.file.file_transfer_manager import FileTransferManager from base.model.plugin_bean import PluginBean @@ -16,6 +16,8 @@ from base.model.enum.message_type import MessageType from base.model.enum.content_type import ContentType from base.system.system import System from base.util.util import Util +from base.scheduler.custom.schedule_job import ScheduleTaskJob +from base.scheduler.scheduler_factory import SchedulerFactory class ExecutionManager(object): @@ -41,6 +43,7 @@ class ExecutionManager(object): self.event_manager.register_event(MessageType.EXECUTE_POLICY.value, self.execute_policy) self.event_manager.register_event(MessageType.INSTALL_PLUGIN.value, self.install_plugin) self.event_manager.register_event(MessageType.RESPONSE_AGREEMENT.value, self.agreement_update) + self.event_manager.register_event(MessageType.UPDATE_SCHEDULED_TASK.value, self.update_scheduled_task) def agreement_update(self, arg): @@ -170,6 +173,44 @@ class ExecutionManager(object): self.logger.debug('Executing active policies for {0} user...'.format(username)) self.task_manager.addPolicy(self.get_active_policies(username)) + def update_scheduled_task(self, arg): + self.logger.debug('Working on scheduled task ...') + update_scheduled_json = json.loads(arg) + scheduler = Scope.get_instance().get_scheduler() + + if str(update_scheduled_json['cronExpression']).lower() == 'none' or update_scheduled_json[ + 'cronExpression'] is None: + self.logger.debug('Scheduled task will be removed') + scheduler.remove_job(int(update_scheduled_json['taskId'])) + self.logger.debug('Task removed from scheduled database') + self.db_service.update('task', ['deleted'], ['True'], + 'id={0}'.format(update_scheduled_json['taskId'])) + self.logger.debug('Task table updated.') + else: + self.logger.debug('Scheduled task cron expression will be updated.') + self.db_service.update('task', ['cron_expr'], [str(update_scheduled_json['cronExpression'])], + 'id={0}'.format(update_scheduled_json['taskId'])) + self.logger.debug('Task table updated.') + scheduler.remove_job(str(update_scheduled_json['taskId'])) + self.logger.debug('Previous scheduled task removed.') + scheduler.add_job(ScheduleTaskJob(self.get_task_bean_by_id(update_scheduled_json['taskId']))) + self.logger.debug('New scheduled task added') + + def get_task_bean_by_id(self, task_id): + + task_row = self.db_service.select('task', self.db_service.get_cols('task'), 'id={0}'.format(task_id))[0] + task = TaskBean(task_row[0], task_row[1], task_row[2], task_row[3], task_row[4], task_row[5], + self.get_plugin_bean_by_id(task_row[6]), + task_row[7], task_row[8]) + return task + + def get_plugin_bean_by_id(self, plugin_id): + plugin_row = self.db_service.select('plugin', self.db_service.get_cols('plugin'), 'id={0}'.format(plugin_id))[0] + plugin = PluginBean(plugin_row[0], plugin_row[1], plugin_row[2], plugin_row[3], plugin_row[4], plugin_row[5], + plugin_row[6], plugin_row[7], plugin_row[8], plugin_row[11], plugin_row[9], plugin_row[10], + plugin_row[12]) + return plugin + def execute_policy(self, arg): self.logger.debug('Updating policies...') policy = self.json_to_PolicyBean(json.loads(arg))