diff --git a/opt/ahenk/base/database/ahenk_db_service.py b/opt/ahenk/base/database/ahenk_db_service.py index bb4758b..4d4c14d 100644 --- a/opt/ahenk/base/database/ahenk_db_service.py +++ b/opt/ahenk/base/database/ahenk_db_service.py @@ -28,13 +28,29 @@ class AhenkDbService(object): def initialize_table(self): - self.check_and_create_table('task', ['id INTEGER', 'create_date TEXT', 'modify_date TEXT', 'command_cls_id TEXT', 'parameter_map BLOB', 'deleted INTEGER', 'plugin TEXT', 'cron_expr TEXT','file_server TEXT']) - self.check_and_create_table('policy', ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'type TEXT', 'version TEXT', 'name TEXT', 'execution_id TEXT']) - self.check_and_create_table('profile', ['id INTEGER', 'create_date TEXT', 'label TEXT', 'description TEXT', 'overridable INTEGER', 'active TEXT', 'deleted TEXT', 'profile_data TEXT', 'modify_date TEXT', 'plugin TEXT']) - self.check_and_create_table('plugin', ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'active TEXT', 'create_date TEXT', 'deleted TEXT', 'description TEXT', 'machine_oriented TEXT', 'modify_date TEXT', 'name TEXT', 'policy_plugin TEXT', 'user_oriented TEXT', 'version TEXT', 'task_plugin TEXT', 'x_based TEXT']) - self.check_and_create_table('registration', ['jid TEXT', 'password TEXT', 'registered INTEGER', 'dn TEXT', 'params TEXT', 'timestamp TEXT']) - self.check_and_create_table('contract', ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'content BLOB', 'title TEXT', 'timestamp TEXT']) - self.check_and_create_table('agreement', ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'contract_id TEXT', 'username TEXT', 'timestamp TEXT', 'choice TEXT']) + self.check_and_create_table('task', + ['id INTEGER', 'create_date TEXT', 'modify_date TEXT', 'command_cls_id TEXT', + 'parameter_map BLOB', 'deleted INTEGER', 'plugin TEXT', 'cron_expr TEXT', + 'file_server TEXT']) + self.check_and_create_table('policy', + ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'type TEXT', 'version TEXT', 'name TEXT', + 'execution_id TEXT']) + self.check_and_create_table('profile', ['id INTEGER', 'create_date TEXT', 'label TEXT', 'description TEXT', + 'overridable INTEGER', 'active TEXT', 'deleted TEXT', + 'profile_data TEXT', 'modify_date TEXT', 'plugin TEXT']) + self.check_and_create_table('plugin', + ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'active TEXT', 'create_date TEXT', + 'deleted TEXT', 'description TEXT', 'machine_oriented TEXT', 'modify_date TEXT', + 'name TEXT', 'policy_plugin TEXT', 'user_oriented TEXT', 'version TEXT', + 'task_plugin TEXT', 'x_based TEXT']) + self.check_and_create_table('registration', + ['jid TEXT', 'password TEXT', 'registered INTEGER', 'dn TEXT', 'params TEXT', + 'timestamp TEXT']) + self.check_and_create_table('contract', ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'content BLOB', 'title TEXT', + 'timestamp TEXT']) + self.check_and_create_table('agreement', + ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'contract_id TEXT', 'username TEXT', + 'timestamp TEXT', 'choice TEXT']) self.check_and_create_table('session', ['username TEXT', 'display TEXT', 'desktop TEXT', 'timestamp TEXT']) def get_cols(self, table_name): @@ -44,6 +60,14 @@ class AhenkDbService(object): return ['content', 'title', 'timestamp'] elif table_name == 'session': return ['username', 'display', 'desktop', 'timestamp'] + elif table_name == 'task': + return ['id', 'create_date', 'modify_date', 'command_cls_id', 'parameter_map', 'deleted', 'plugin', + 'cron_expr', 'file_server'] + elif table_name == 'plugin': + return ['id', 'active', 'create_date', 'deleted', 'description', 'machine_oriented', 'modify_date', 'name', + 'policy_plugin', 'user_oriented', 'version', 'task_plugin', 'x_based'] + else: + return None def connect(self): try: @@ -94,7 +118,8 @@ class AhenkDbService(object): self.logger.warning('Could not update table cursor is None! Table Name : {0}'.format(str(table_name))) return None except Exception as e: - self.logger.error('Updating table error ! Table Name : {0} Error Mesage: {1}'.format(str(table_name),str(e))) + self.logger.error( + 'Updating table error ! Table Name : {0} Error Mesage: {1}'.format(str(table_name), str(e))) finally: self.lock.release() diff --git a/opt/ahenk/base/execution/execution_manager.py b/opt/ahenk/base/execution/execution_manager.py index a625ae3..af5315a 100644 --- a/opt/ahenk/base/execution/execution_manager.py +++ b/opt/ahenk/base/execution/execution_manager.py @@ -4,16 +4,17 @@ import json -from base.scope import Scope from base.file.file_transfer_manager import FileTransferManager +from base.model.enum.content_type import ContentType +from base.model.enum.message_code import MessageCode +from base.model.enum.message_type import MessageType from base.model.plugin_bean import PluginBean from base.model.policy_bean import PolicyBean from base.model.profile_bean import ProfileBean from base.model.response import Response from base.model.task_bean import TaskBean -from base.model.enum.message_code import MessageCode -from base.model.enum.message_type import MessageType -from base.model.enum.content_type import ContentType +from base.scheduler.custom.schedule_job import ScheduleTaskJob +from base.scope import Scope from base.system.system import System from base.util.util import Util @@ -41,6 +42,7 @@ class ExecutionManager(object): self.event_manager.register_event(MessageType.EXECUTE_POLICY.value, self.execute_policy) self.event_manager.register_event(MessageType.INSTALL_PLUGIN.value, self.install_plugin) self.event_manager.register_event(MessageType.RESPONSE_AGREEMENT.value, self.agreement_update) + self.event_manager.register_event(MessageType.UPDATE_SCHEDULED_TASK.value, self.update_scheduled_task) def agreement_update(self, arg): @@ -170,6 +172,44 @@ class ExecutionManager(object): self.logger.debug('Executing active policies for {0} user...'.format(username)) self.task_manager.addPolicy(self.get_active_policies(username)) + def update_scheduled_task(self, arg): + self.logger.debug('Working on scheduled task ...') + update_scheduled_json = json.loads(arg) + scheduler = Scope.get_instance().get_scheduler() + + if str(update_scheduled_json['cronExpression']).lower() == 'none' or update_scheduled_json[ + 'cronExpression'] is None: + self.logger.debug('Scheduled task will be removed') + scheduler.remove_job(int(update_scheduled_json['taskId'])) + self.logger.debug('Task removed from scheduled database') + self.db_service.update('task', ['deleted'], ['True'], + 'id={0}'.format(update_scheduled_json['taskId'])) + self.logger.debug('Task table updated.') + else: + self.logger.debug('Scheduled task cron expression will be updated.') + self.db_service.update('task', ['cron_expr'], [str(update_scheduled_json['cronExpression'])], + 'id={0}'.format(update_scheduled_json['taskId'])) + self.logger.debug('Task table updated.') + scheduler.remove_job(str(update_scheduled_json['taskId'])) + self.logger.debug('Previous scheduled task removed.') + scheduler.add_job(ScheduleTaskJob(self.get_task_bean_by_id(update_scheduled_json['taskId']))) + self.logger.debug('New scheduled task added') + + def get_task_bean_by_id(self, task_id): + + task_row = self.db_service.select('task', self.db_service.get_cols('task'), 'id={0}'.format(task_id))[0] + task = TaskBean(task_row[0], task_row[1], task_row[2], task_row[3], task_row[4], task_row[5], + self.get_plugin_bean_by_id(task_row[6]), + task_row[7], task_row[8]) + return task + + def get_plugin_bean_by_id(self, plugin_id): + plugin_row = self.db_service.select('plugin', self.db_service.get_cols('plugin'), 'id={0}'.format(plugin_id))[0] + plugin = PluginBean(plugin_row[0], plugin_row[1], plugin_row[2], plugin_row[3], plugin_row[4], plugin_row[5], + plugin_row[6], plugin_row[7], plugin_row[8], plugin_row[11], plugin_row[9], plugin_row[10], + plugin_row[12]) + return plugin + def execute_policy(self, arg): self.logger.debug('Updating policies...') policy = self.json_to_PolicyBean(json.loads(arg)) diff --git a/opt/ahenk/base/mail/mail_manager.py b/opt/ahenk/base/mail/mail_manager.py index 60ad6dd..ef339e9 100644 --- a/opt/ahenk/base/mail/mail_manager.py +++ b/opt/ahenk/base/mail/mail_manager.py @@ -46,7 +46,10 @@ class Mail: self.server.quit() self.logger.debug('Disconnected') - def send_mail(self, subject, message, files=[]): + def send_mail(self, subject, message, files=None): + + if files is None: + files = [] msg = MIMEMultipart() msg['Date'] = formatdate(localtime=True) @@ -54,7 +57,7 @@ class Mail: msg.attach(MIMEText(message)) - #TODO files attachment max size + # TODO files attachment max size if files is not None: for f in files: part = MIMEBase('application', "octet-stream") diff --git a/opt/ahenk/base/model/enum/message_type.py b/opt/ahenk/base/model/enum/message_type.py index 61b1ed3..46085b4 100644 --- a/opt/ahenk/base/model/enum/message_type.py +++ b/opt/ahenk/base/model/enum/message_type.py @@ -22,3 +22,4 @@ class MessageType(Enum): UNREGISTER = 'UNREGISTER' TASK_STATUS = 'TASK_STATUS' RESPONSE_AGREEMENT = 'RESPONSE_AGREEMENT' + UPDATE_SCHEDULED_TASK = 'UPDATE_SCHEDULED_TASK' diff --git a/opt/ahenk/base/plugin/plugin.py b/opt/ahenk/base/plugin/plugin.py index 6d5f78b..f07f70d 100644 --- a/opt/ahenk/base/plugin/plugin.py +++ b/opt/ahenk/base/plugin/plugin.py @@ -60,10 +60,10 @@ class Plugin(threading.Thread): Plugin class responsible for processing TASK or USER PLUGIN PROFILE. """ - def __init__(self, name, InQueue): + def __init__(self, name, in_ueue): threading.Thread.__init__(self) self.name = name - self.InQueue = InQueue + self.in_queue = in_ueue scope = Scope.get_instance() self.logger = scope.get_logger() @@ -79,7 +79,7 @@ class Plugin(threading.Thread): while self.keep_run: try: try: - item_obj = self.InQueue.get(block=True) + item_obj = self.in_queue.get(block=True) obj_name = item_obj.obj_name except Exception as e: self.logger.error( @@ -88,6 +88,7 @@ class Plugin(threading.Thread): if obj_name == "TASK": self.logger.debug('[Plugin] Executing task') command = Scope.get_instance().get_plugin_manager().find_command(self.getName(), + item_obj.get_plugin().get_version(), item_obj.get_command_cls_id().lower()) self.context.put('task_id', item_obj.get_id()) @@ -158,7 +159,7 @@ class Plugin(threading.Thread): self.logger.debug('[Plugin] Executing profile') profile_data = item_obj.get_profile_data() policy_module = Scope.get_instance().get_plugin_manager().find_policy_module( - item_obj.get_plugin().get_name()) + item_obj.get_plugin().get_name(), item_obj.get_plugin().get_version()) self.context.put('username', item_obj.get_username()) execution_id = self.get_execution_id(item_obj.get_id()) diff --git a/opt/ahenk/base/plugin/plugin_manager.py b/opt/ahenk/base/plugin/plugin_manager.py index b653d50..c2dc6d2 100644 --- a/opt/ahenk/base/plugin/plugin_manager.py +++ b/opt/ahenk/base/plugin/plugin_manager.py @@ -25,13 +25,13 @@ class PluginManager(object): def __init__(self): super(PluginManager, self).__init__() self.scope = Scope.get_instance() - self.configManager = self.scope.get_configuration_manager() + self.config_manager = self.scope.get_configuration_manager() self.db_service = self.scope.get_db_service() self.message_manager = self.scope.get_message_manager() self.logger = self.scope.get_logger() self.plugins = [] - self.pluginQueueDict = dict() + self.plugin_queue_dict = dict() # self.listener = \ self.install_listener() @@ -44,7 +44,7 @@ class PluginManager(object): self.plugins = [] self.logger.debug('Lookup for possible plugins...') try: - possible_plugins = os.listdir(self.configManager.get("PLUGIN", "pluginFolderPath")) + possible_plugins = os.listdir(self.config_manager.get("PLUGIN", "pluginFolderPath")) self.logger.debug('Possible plugins: {0} '.format(str(possible_plugins))) for plugin_name in possible_plugins: try: @@ -59,8 +59,8 @@ class PluginManager(object): def load_single_plugin(self, plugin_name): # TODO check already loaded plugin - location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), plugin_name) - if not os.path.isdir(location) or not self.configManager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir( + location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name) + if not os.path.isdir(location) or not self.config_manager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir( location): self.logger.debug( 'It is not a plugin location ! There is no main module - {0}'.format(str(location))) @@ -70,8 +70,8 @@ class PluginManager(object): '{0} plugin was already loaded. Reloading {0} plugin'.format(plugin_name)) # self.reload_single_plugin(plugin_name) else: - self.pluginQueueDict[plugin_name] = PluginQueue() - plugin = Plugin(plugin_name, self.pluginQueueDict[plugin_name]) + self.plugin_queue_dict[plugin_name] = PluginQueue() + plugin = Plugin(plugin_name, self.plugin_queue_dict[plugin_name]) plugin.setDaemon(True) plugin.start() self.plugins.append(plugin) @@ -79,22 +79,22 @@ class PluginManager(object): # active init mode mode = InitMode() - self.pluginQueueDict[plugin_name].put(mode, 1) + self.plugin_queue_dict[plugin_name].put(mode, 1) if plugin_name in self.delayed_profiles: - self.pluginQueueDict[plugin_name].put(self.delayed_profiles[plugin_name], 1) + self.plugin_queue_dict[plugin_name].put(self.delayed_profiles[plugin_name], 1) del self.delayed_profiles[plugin_name] self.logger.debug('Delayed profile was found for this plugin. It will be run.') if plugin_name in self.delayed_tasks: - self.pluginQueueDict[plugin_name].put(self.delayed_tasks[plugin_name], 1) + self.plugin_queue_dict[plugin_name].put(self.delayed_tasks[plugin_name], 1) del self.delayed_tasks[plugin_name] self.logger.debug('Delayed task was found for this plugin. It will be run.') def reload_plugins(self): try: self.logger.info('Reloading plugins...') - for p_queue in self.pluginQueueDict: - self.pluginQueueDict[p_queue].put(ShutdownMode(), 1) + for p_queue in self.plugin_queue_dict: + self.plugin_queue_dict[p_queue].put(ShutdownMode(), 1) self.plugins = [] self.load_plugins() self.logger.info('Plugin reloaded successfully.') @@ -116,10 +116,10 @@ class PluginManager(object): def remove_plugins(self): try: self.logger.debug('Removing all plugins...') - for p_queue in self.pluginQueueDict: - self.pluginQueueDict[p_queue].put(ShutdownMode(), 1) + for p_queue in self.plugin_queue_dict: + self.plugin_queue_dict[p_queue].put(ShutdownMode(), 1) self.plugins = [] - self.pluginQueueDict = dict() + self.plugin_queue_dict = dict() self.logger.debug('All plugins were removed successfully.') except Exception as e: self.logger.error( @@ -130,8 +130,8 @@ class PluginManager(object): self.logger.debug('Trying to remove {0} plugin...'.format(plugin_name)) if self.is_plugin_loaded(plugin_name): self.logger.debug('{0} plugin is killing...'.format(plugin_name)) - self.pluginQueueDict[plugin_name].put(ShutdownMode(), 1) - del self.pluginQueueDict[plugin_name] + self.plugin_queue_dict[plugin_name].put(ShutdownMode(), 1) + del self.plugin_queue_dict[plugin_name] for plugin in self.plugins: if plugin.name == plugin_name: @@ -144,13 +144,13 @@ class PluginManager(object): 'A problem occurred while removing {0} plugin. Error Message :{1}.'.format(plugin_name, str(e))) - def find_command(self, pluginName, commandId): - location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pluginName) - if os.path.isdir(location) and commandId + ".py" in os.listdir(location): - info = imp.find_module(commandId, [location]) - return imp.load_module(commandId, *info) + def find_command(self, plugin_name, version, command_id): + location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name) + if os.path.isdir(location) and command_id + ".py" in os.listdir(location): + info = imp.find_module(command_id, [location]) + return imp.load_module(command_id, *info) else: - self.logger.warning('Command id -' + commandId + ' - not found') + self.logger.warning('Command id -' + command_id + ' - not found') return None def process_task(self, task): @@ -163,12 +163,13 @@ class PluginManager(object): try: plugin_name = task.get_plugin().get_name().lower() plugin_ver = task.get_plugin().get_version() - if plugin_name in self.pluginQueueDict: - self.pluginQueueDict[plugin_name].put(task, 1) + + if self.does_plugin_exist(plugin_name, plugin_ver) and plugin_name in self.plugin_queue_dict: + self.plugin_queue_dict[plugin_name].put(task, 1) else: self.logger.warning( '{0} plugin not found. Task was delayed. Ahenk will request plugin from Lider if distribution available'.format( - plugin_name)) + plugin_name, plugin_ver)) self.delayed_tasks[plugin_name] = task msg = self.message_manager.missing_plugin_message(PluginBean(name=plugin_name, version=plugin_ver)) self.messenger.send_direct_message(msg) @@ -177,7 +178,7 @@ class PluginManager(object): 'Exception occurred while processing task. Error Message: {0}'.format(str(e))) def find_policy_module(self, plugin_name): - location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), plugin_name) + location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name) if os.path.isdir(location) and "policy.py" in os.listdir(location): info = imp.find_module("policy", [location]) return imp.load_module("policy", *info) @@ -229,12 +230,12 @@ class PluginManager(object): plugin = profile.get_plugin() plugin_name = plugin.get_name() plugin_ver = plugin.get_version() - if plugin_name in self.pluginQueueDict: - self.pluginQueueDict[plugin_name].put(profile, 1) + if self.does_plugin_exist(plugin_name, plugin_ver) and plugin_name in self.plugin_queue_dict: + self.plugin_queue_dict[plugin_name].put(profile, 1) else: self.logger.warning( - '{0} plugin not found. Profile was delayed. Ahenk will request plugin from Lider if distribution available'.format( - plugin_name)) + '{0} plugin {1} version not found. Profile was delayed. Ahenk will request plugin from Lider if distribution available'.format( + plugin_name, plugin_ver)) self.delayed_profiles[plugin_name] = profile msg = self.message_manager.missing_plugin_message(PluginBean(name=plugin_name, version=plugin_ver)) self.scope.get_messenger().send_direct_message(msg) @@ -242,17 +243,15 @@ class PluginManager(object): self.logger.error( 'Exception occurred while processing profile. Error Message: {0}'.format(str(e))) - def check_plugin_exists(self, plugin_name, version=None): - - criteria = ' name=\'' + plugin_name + '\'' - if version is not None: - criteria += ' and version=\'' + str(version) + '\'' - result = self.db_service.select('plugin', 'name', criteria) - - if result is None: - return False - else: - return True + def does_plugin_exist(self, name, version): + location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), name) + main = self.config_manager.get("PLUGIN", "mainModuleName") + if os.path.isdir(location) and main + ".py" in os.listdir(location): + info = imp.find_module(main, [location]) + main_py = imp.load_module(main, *info) + if main_py.info() is None or main_py.info()['version'] == version: + return True + return False def process_mode(self, mode_type, username=None): mode = None @@ -271,9 +270,9 @@ class PluginManager(object): if mode is not None: self.logger.info('{0} mode is running'.format(mode_type)) - for plugin_name in self.pluginQueueDict: + for plugin_name in self.plugin_queue_dict: try: - self.pluginQueueDict[plugin_name].put(mode, 1) + self.plugin_queue_dict[plugin_name].put(mode, 1) except Exception as e: self.logger.error( 'Exception occurred while switching safe mode. Error Message : {0}'.format( @@ -281,7 +280,7 @@ class PluginManager(object): def find_module(self, mode, plugin_name): mode = mode.lower().replace('_mode', '') - location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), plugin_name) + location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name) if os.path.isdir(location) and (mode + ".py") in os.listdir(location): info = imp.find_module(mode, [location]) @@ -297,7 +296,7 @@ class PluginManager(object): def is_plugin_loaded(self, plugin_name): try: - if self.pluginQueueDict[plugin_name] is not None: + if self.plugin_queue_dict[plugin_name] is not None: return True else: return False @@ -309,4 +308,4 @@ class PluginManager(object): pass def printQueueSize(self): - print("size " + str(len(self.pluginQueueDict))) + print("size " + str(len(self.plugin_queue_dict))) diff --git a/opt/ahenk/base/scheduler/custom/schedule_job.py b/opt/ahenk/base/scheduler/custom/schedule_job.py index 6a19e49..c6f5b1c 100644 --- a/opt/ahenk/base/scheduler/custom/schedule_job.py +++ b/opt/ahenk/base/scheduler/custom/schedule_job.py @@ -23,26 +23,26 @@ class ScheduleTaskJob(object): self.months = self.conv_to_set(cron_sj[3]) self.dow = self.conv_to_set(cron_sj[4]) self.action = self.process_task - self.logger.debug('[ScheduleTaskJob] Instance created.') + self.logger.debug('Instance created.') except Exception as e: self.logger.error( - '[ScheduleTaskJob] A problem occurred while creating instance of ScheduleTaskJob. Error Message : {0}'.format( + 'A problem occurred while creating instance of ScheduleTaskJob. Error Message : {0}'.format( str(e))) def process_task(self): try: - self.logger.debug('[ScheduleTaskJob] Running scheduled task now...') + self.logger.debug('Running scheduled task now...') self.plugin_manager.process_task(self.task) - self.logger.debug('[ScheduleTaskJob] Scheduled Task was executed.') + self.logger.debug('Scheduled Task was executed.') # There is no any single shot task # if self.is_single_shot(): # Scope.getInstance().get_scheduler().remove_job(self.task.get_id()) except Exception as e: self.logger.error( - '[ScheduleTaskJob] A problem occurred while running scheduled task. Error Message: {0}'.format(str(e))) + 'A problem occurred while running scheduled task. Error Message: {0}'.format(str(e))) def parse_cron_str(self, cron_str): - self.logger.debug('[ScheduleTaskJob] Parsing cron string...') + self.logger.debug('Parsing cron string...') try: cron_exp_arr = cron_str.split(" ") cron_sj = [] @@ -65,19 +65,19 @@ class ScheduleTaskJob(object): elif count == 3: cron_sj.append(range(0, 7, range_val)) else: - self.logger.warning('[ScheduleTaskJob] Unsupported expression.') + self.logger.warning('Unsupported expression.') elif ',' in exp: cron_sj.append("(" + str(exp) + ")") else: - self.logger.warning('[ScheduleTaskJob] Unsupported expression.') - count = count + 1 + self.logger.warning('Unsupported expression.') + count += 1 return cron_sj except Exception as e: self.logger.error( - '[ScheduleTaskJob] A problem occurred while parsing cron expression. Error Message: {0}'.format(str(e))) + 'A problem occurred while parsing cron expression. Error Message: {0}'.format(str(e))) def conv_to_set(self, obj): - self.logger.debug('[ScheduleTaskJob] Converting {0} to set'.format(str(obj))) + self.logger.debug('Converting {0} to set'.format(str(obj))) if str(obj).isdigit(): return set([int(obj)]) diff --git a/opt/ahenk/base/task/task_in_queue.py b/opt/ahenk/base/task/task_in_queue.py index 6621947..0aa0d5a 100644 --- a/opt/ahenk/base/task/task_in_queue.py +++ b/opt/ahenk/base/task/task_in_queue.py @@ -10,15 +10,15 @@ from base.task.task_job import TaskJob class TaskInQueue(threading.Thread): """docstring for TaskInQueue""" - def __init__(self, inQueue): + def __init__(self, in_queue): super(TaskInQueue, self).__init__() - self.inQueue = inQueue + self.in_queue = in_queue def run(self): # Add task to db. Adding task to db important because task can be lost when processing. # Call plugin manager and process message inside task job try: - task = self.inQueue.get() + task = self.in_queue.get() print(task) # Can be validate task before processing job = TaskJob(task)