diff --git a/opt/ahenk/base/plugin/plugin.py b/opt/ahenk/base/plugin/plugin.py index 6d5f78b..f07f70d 100644 --- a/opt/ahenk/base/plugin/plugin.py +++ b/opt/ahenk/base/plugin/plugin.py @@ -60,10 +60,10 @@ class Plugin(threading.Thread): Plugin class responsible for processing TASK or USER PLUGIN PROFILE. """ - def __init__(self, name, InQueue): + def __init__(self, name, in_ueue): threading.Thread.__init__(self) self.name = name - self.InQueue = InQueue + self.in_queue = in_ueue scope = Scope.get_instance() self.logger = scope.get_logger() @@ -79,7 +79,7 @@ class Plugin(threading.Thread): while self.keep_run: try: try: - item_obj = self.InQueue.get(block=True) + item_obj = self.in_queue.get(block=True) obj_name = item_obj.obj_name except Exception as e: self.logger.error( @@ -88,6 +88,7 @@ class Plugin(threading.Thread): if obj_name == "TASK": self.logger.debug('[Plugin] Executing task') command = Scope.get_instance().get_plugin_manager().find_command(self.getName(), + item_obj.get_plugin().get_version(), item_obj.get_command_cls_id().lower()) self.context.put('task_id', item_obj.get_id()) @@ -158,7 +159,7 @@ class Plugin(threading.Thread): self.logger.debug('[Plugin] Executing profile') profile_data = item_obj.get_profile_data() policy_module = Scope.get_instance().get_plugin_manager().find_policy_module( - item_obj.get_plugin().get_name()) + item_obj.get_plugin().get_name(), item_obj.get_plugin().get_version()) self.context.put('username', item_obj.get_username()) execution_id = self.get_execution_id(item_obj.get_id()) diff --git a/opt/ahenk/base/plugin/plugin_manager.py b/opt/ahenk/base/plugin/plugin_manager.py index b653d50..c2dc6d2 100644 --- a/opt/ahenk/base/plugin/plugin_manager.py +++ b/opt/ahenk/base/plugin/plugin_manager.py @@ -25,13 +25,13 @@ class PluginManager(object): def __init__(self): super(PluginManager, self).__init__() self.scope = Scope.get_instance() - self.configManager = self.scope.get_configuration_manager() + self.config_manager = self.scope.get_configuration_manager() self.db_service = self.scope.get_db_service() self.message_manager = self.scope.get_message_manager() self.logger = self.scope.get_logger() self.plugins = [] - self.pluginQueueDict = dict() + self.plugin_queue_dict = dict() # self.listener = \ self.install_listener() @@ -44,7 +44,7 @@ class PluginManager(object): self.plugins = [] self.logger.debug('Lookup for possible plugins...') try: - possible_plugins = os.listdir(self.configManager.get("PLUGIN", "pluginFolderPath")) + possible_plugins = os.listdir(self.config_manager.get("PLUGIN", "pluginFolderPath")) self.logger.debug('Possible plugins: {0} '.format(str(possible_plugins))) for plugin_name in possible_plugins: try: @@ -59,8 +59,8 @@ class PluginManager(object): def load_single_plugin(self, plugin_name): # TODO check already loaded plugin - location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), plugin_name) - if not os.path.isdir(location) or not self.configManager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir( + location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name) + if not os.path.isdir(location) or not self.config_manager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir( location): self.logger.debug( 'It is not a plugin location ! There is no main module - {0}'.format(str(location))) @@ -70,8 +70,8 @@ class PluginManager(object): '{0} plugin was already loaded. Reloading {0} plugin'.format(plugin_name)) # self.reload_single_plugin(plugin_name) else: - self.pluginQueueDict[plugin_name] = PluginQueue() - plugin = Plugin(plugin_name, self.pluginQueueDict[plugin_name]) + self.plugin_queue_dict[plugin_name] = PluginQueue() + plugin = Plugin(plugin_name, self.plugin_queue_dict[plugin_name]) plugin.setDaemon(True) plugin.start() self.plugins.append(plugin) @@ -79,22 +79,22 @@ class PluginManager(object): # active init mode mode = InitMode() - self.pluginQueueDict[plugin_name].put(mode, 1) + self.plugin_queue_dict[plugin_name].put(mode, 1) if plugin_name in self.delayed_profiles: - self.pluginQueueDict[plugin_name].put(self.delayed_profiles[plugin_name], 1) + self.plugin_queue_dict[plugin_name].put(self.delayed_profiles[plugin_name], 1) del self.delayed_profiles[plugin_name] self.logger.debug('Delayed profile was found for this plugin. It will be run.') if plugin_name in self.delayed_tasks: - self.pluginQueueDict[plugin_name].put(self.delayed_tasks[plugin_name], 1) + self.plugin_queue_dict[plugin_name].put(self.delayed_tasks[plugin_name], 1) del self.delayed_tasks[plugin_name] self.logger.debug('Delayed task was found for this plugin. It will be run.') def reload_plugins(self): try: self.logger.info('Reloading plugins...') - for p_queue in self.pluginQueueDict: - self.pluginQueueDict[p_queue].put(ShutdownMode(), 1) + for p_queue in self.plugin_queue_dict: + self.plugin_queue_dict[p_queue].put(ShutdownMode(), 1) self.plugins = [] self.load_plugins() self.logger.info('Plugin reloaded successfully.') @@ -116,10 +116,10 @@ class PluginManager(object): def remove_plugins(self): try: self.logger.debug('Removing all plugins...') - for p_queue in self.pluginQueueDict: - self.pluginQueueDict[p_queue].put(ShutdownMode(), 1) + for p_queue in self.plugin_queue_dict: + self.plugin_queue_dict[p_queue].put(ShutdownMode(), 1) self.plugins = [] - self.pluginQueueDict = dict() + self.plugin_queue_dict = dict() self.logger.debug('All plugins were removed successfully.') except Exception as e: self.logger.error( @@ -130,8 +130,8 @@ class PluginManager(object): self.logger.debug('Trying to remove {0} plugin...'.format(plugin_name)) if self.is_plugin_loaded(plugin_name): self.logger.debug('{0} plugin is killing...'.format(plugin_name)) - self.pluginQueueDict[plugin_name].put(ShutdownMode(), 1) - del self.pluginQueueDict[plugin_name] + self.plugin_queue_dict[plugin_name].put(ShutdownMode(), 1) + del self.plugin_queue_dict[plugin_name] for plugin in self.plugins: if plugin.name == plugin_name: @@ -144,13 +144,13 @@ class PluginManager(object): 'A problem occurred while removing {0} plugin. Error Message :{1}.'.format(plugin_name, str(e))) - def find_command(self, pluginName, commandId): - location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pluginName) - if os.path.isdir(location) and commandId + ".py" in os.listdir(location): - info = imp.find_module(commandId, [location]) - return imp.load_module(commandId, *info) + def find_command(self, plugin_name, version, command_id): + location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name) + if os.path.isdir(location) and command_id + ".py" in os.listdir(location): + info = imp.find_module(command_id, [location]) + return imp.load_module(command_id, *info) else: - self.logger.warning('Command id -' + commandId + ' - not found') + self.logger.warning('Command id -' + command_id + ' - not found') return None def process_task(self, task): @@ -163,12 +163,13 @@ class PluginManager(object): try: plugin_name = task.get_plugin().get_name().lower() plugin_ver = task.get_plugin().get_version() - if plugin_name in self.pluginQueueDict: - self.pluginQueueDict[plugin_name].put(task, 1) + + if self.does_plugin_exist(plugin_name, plugin_ver) and plugin_name in self.plugin_queue_dict: + self.plugin_queue_dict[plugin_name].put(task, 1) else: self.logger.warning( '{0} plugin not found. Task was delayed. Ahenk will request plugin from Lider if distribution available'.format( - plugin_name)) + plugin_name, plugin_ver)) self.delayed_tasks[plugin_name] = task msg = self.message_manager.missing_plugin_message(PluginBean(name=plugin_name, version=plugin_ver)) self.messenger.send_direct_message(msg) @@ -177,7 +178,7 @@ class PluginManager(object): 'Exception occurred while processing task. Error Message: {0}'.format(str(e))) def find_policy_module(self, plugin_name): - location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), plugin_name) + location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name) if os.path.isdir(location) and "policy.py" in os.listdir(location): info = imp.find_module("policy", [location]) return imp.load_module("policy", *info) @@ -229,12 +230,12 @@ class PluginManager(object): plugin = profile.get_plugin() plugin_name = plugin.get_name() plugin_ver = plugin.get_version() - if plugin_name in self.pluginQueueDict: - self.pluginQueueDict[plugin_name].put(profile, 1) + if self.does_plugin_exist(plugin_name, plugin_ver) and plugin_name in self.plugin_queue_dict: + self.plugin_queue_dict[plugin_name].put(profile, 1) else: self.logger.warning( - '{0} plugin not found. Profile was delayed. Ahenk will request plugin from Lider if distribution available'.format( - plugin_name)) + '{0} plugin {1} version not found. Profile was delayed. Ahenk will request plugin from Lider if distribution available'.format( + plugin_name, plugin_ver)) self.delayed_profiles[plugin_name] = profile msg = self.message_manager.missing_plugin_message(PluginBean(name=plugin_name, version=plugin_ver)) self.scope.get_messenger().send_direct_message(msg) @@ -242,17 +243,15 @@ class PluginManager(object): self.logger.error( 'Exception occurred while processing profile. Error Message: {0}'.format(str(e))) - def check_plugin_exists(self, plugin_name, version=None): - - criteria = ' name=\'' + plugin_name + '\'' - if version is not None: - criteria += ' and version=\'' + str(version) + '\'' - result = self.db_service.select('plugin', 'name', criteria) - - if result is None: - return False - else: - return True + def does_plugin_exist(self, name, version): + location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), name) + main = self.config_manager.get("PLUGIN", "mainModuleName") + if os.path.isdir(location) and main + ".py" in os.listdir(location): + info = imp.find_module(main, [location]) + main_py = imp.load_module(main, *info) + if main_py.info() is None or main_py.info()['version'] == version: + return True + return False def process_mode(self, mode_type, username=None): mode = None @@ -271,9 +270,9 @@ class PluginManager(object): if mode is not None: self.logger.info('{0} mode is running'.format(mode_type)) - for plugin_name in self.pluginQueueDict: + for plugin_name in self.plugin_queue_dict: try: - self.pluginQueueDict[plugin_name].put(mode, 1) + self.plugin_queue_dict[plugin_name].put(mode, 1) except Exception as e: self.logger.error( 'Exception occurred while switching safe mode. Error Message : {0}'.format( @@ -281,7 +280,7 @@ class PluginManager(object): def find_module(self, mode, plugin_name): mode = mode.lower().replace('_mode', '') - location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), plugin_name) + location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name) if os.path.isdir(location) and (mode + ".py") in os.listdir(location): info = imp.find_module(mode, [location]) @@ -297,7 +296,7 @@ class PluginManager(object): def is_plugin_loaded(self, plugin_name): try: - if self.pluginQueueDict[plugin_name] is not None: + if self.plugin_queue_dict[plugin_name] is not None: return True else: return False @@ -309,4 +308,4 @@ class PluginManager(object): pass def printQueueSize(self): - print("size " + str(len(self.pluginQueueDict))) + print("size " + str(len(self.plugin_queue_dict)))