diff --git a/opt/ahenk/base/execution/ExecutionManager.py b/opt/ahenk/base/execution/ExecutionManager.py index 99a08ec..ec2ffdf 100644 --- a/opt/ahenk/base/execution/ExecutionManager.py +++ b/opt/ahenk/base/execution/ExecutionManager.py @@ -8,13 +8,16 @@ import os import shutil import stat import subprocess +import uuid + from base.Scope import Scope -from base.model.MessageType import MessageType +from base.messaging.ssh_file_transfer import FileTransfer from base.model.PluginBean import PluginBean from base.model.PolicyBean import PolicyBean from base.model.ProfileBean import ProfileBean from base.model.TaskBean import TaskBean +from base.model.enum.MessageType import MessageType class ExecutionManager(object): @@ -27,26 +30,70 @@ class ExecutionManager(object): self.config_manager = scope.getConfigurationManager() self.event_manager = scope.getEventManager() self.task_manager = scope.getTaskManager() - self.messager = scope.getMessager() + self.messenger = scope.getMessager() self.logger = scope.getLogger() self.db_service = scope.getDbService() + self.message_manager = scope.getMessageManager() + self.plugin_manager = scope.getPluginManager() self.event_manager.register_event(MessageType.EXECUTE_SCRIPT.value, self.execute_script) self.event_manager.register_event(MessageType.REQUEST_FILE.value, self.request_file) self.event_manager.register_event(MessageType.MOVE_FILE.value, self.move_file) self.event_manager.register_event(MessageType.EXECUTE_TASK.value, self.execute_task) self.event_manager.register_event(MessageType.EXECUTE_POLICY.value, self.execute_policy) + self.event_manager.register_event(MessageType.INSTALL_PLUGIN.value, self.install_plugin) + + def install_plugin(self, arg): + plugin = json.loads(arg) + self.logger.debug('[ExecutionManager] Installing missing plugin') + try: + plugin_name = plugin['pluginName'] + plugin_version = plugin['pluginVersion'] + parameter_map = json.loads(json.dumps(plugin['parameterMap'])) + + temp_file = self.config_manager.get('CONNECTION', 'receivefileparam') + str(uuid.uuid4()) + '.deb' + + if str(plugin['protocol']).lower() == 'ssh': + self.logger.debug('[ExecutionManager] Distribution protocol is {}'.format(str(plugin['protocol']).lower())) + host = parameter_map['host'] + username = parameter_map['username'] + password = parameter_map['password'] + port = parameter_map['port'] + path = parameter_map['path'] + + transfer = FileTransfer(host, port, username, password) + transfer.connect() + transfer.get_file(temp_file, path) + + elif plugin['protocol'].lower() == 'http': + self.logger.debug('[ExecutionManager] Distribution protocol is {}.'.format(str(plugin['protocol']).lower())) + #TODO + #wget.download(parameter_map['url'], temp_file) + pass + + self.logger.debug('[ExecutionManager] Plugin package downloaded via {}.'.format(str(plugin['protocol']).lower())) + self.install_deb(temp_file) + self.logger.debug('[ExecutionManager] Plugin installed.') + self.remove_file(temp_file) + self.logger.debug('[ExecutionManager] Temp files were removed.') + self.plugin_manager.loadSinglePlugin(plugin_name) + + except Exception as e: + self.logger.error('[ExecutionManager] A problem occurred while installing new ahenk plugin. Error Message:{}'.format(str(e))) def execute_policy(self, arg): + + ## + scope = Scope().getInstance() + self.messenger = scope.getMessager() + ## + self.logger.debug('[ExecutionManager] Updating policies...') policy = self.json_to_PolicyBean(json.loads(arg)) machine_uid = self.db_service.select_one_result('registration', 'jid', 'registered=1') ahenk_policy_ver = self.db_service.select_one_result('policy', 'version', 'type = \'A\'') user_policy_version = self.db_service.select_one_result('policy', 'version', 'type = \'U\' and name = \'' + policy.get_username() + '\'') - # TODO get installed plugins - # installed_plugins = self.get_installed_plugins() - # missing_plugins = [] profile_columns = ['id', 'create_date', 'modify_date', 'label', 'description', 'overridable', 'active', 'deleted', 'profile_data', 'plugin'] plugin_columns = ['active', 'create_date', 'deleted', 'description', 'machine_oriented', 'modify_date', 'name', 'policy_plugin', 'user_oriented', 'version'] @@ -69,11 +116,6 @@ class ExecutionManager(object): profile_args = [str(ahenk_policy_id), str(profile.get_create_date()), str(profile.get_modify_date()), str(profile.get_label()), str(profile.get_description()), str(profile.get_overridable()), str(profile.get_active()), str(profile.get_deleted()), str(profile.get_profile_data()), plugin_id] self.db_service.update('profile', profile_columns, profile_args) - """ - if profile.plugin.name not in installed_plugins and profile.plugin.name not in missing_plugins: - missing_plugins.append(profile.plugin.name) - """ - else: self.logger.debug('[ExecutionManager] Already there is ahenk policy') @@ -97,16 +139,9 @@ class ExecutionManager(object): profile_args = [str(user_policy_id), str(profile.get_create_date()), str(profile.get_modify_date()), str(profile.get_label()), str(profile.get_description()), str(profile.get_overridable()), str(profile.get_active()), str(profile.get_deleted()), str(profile.get_profile_data()), plugin_id] self.db_service.update('profile', profile_columns, profile_args) - """ - if profile.get_plugin()['name'] not in installed_plugins and profile.get_plugin()['name'] not in missing_plugins: - missing_plugins.append(profile.get_plugin()['name']) - """ else: self.logger.debug('[ExecutionManager] Already there is user policy') - # TODO check plugins - # print("but first need these plugins:" + str(missing_plugins)) - policy = self.get_active_policies(policy.get_username()) self.task_manager.addPolicy(policy) @@ -161,8 +196,7 @@ class ExecutionManager(object): json_task = json.loads(str_task) task = self.json_to_TaskBean(json_task) - print(task.get_command_cls_id()) - self.logger.debug('[ExecutionManager] Adding new task...') + self.logger.debug('[ExecutionManager] Adding new task...Task is:{}'.format(task.get_command_cls_id())) self.task_manager.addTask(task) self.logger.debug('[ExecutionManager] Task added') @@ -200,7 +234,7 @@ class ExecutionManager(object): file_path = str(j['filePath']).lower() time_stamp = str(j['timestamp']).lower() self.logger.debug('[ExecutionManager] Requested file is ' + file_path) - self.messager.send_file(file_path) + self.messenger.send_file(file_path) def get_md5_file(self, fname): self.logger.debug('[ExecutionManager] md5 hashing') @@ -231,3 +265,17 @@ class ExecutionManager(object): user_prof_arr.append(ProfileBean(prof['id'], prof['createDate'], prof['label'], prof['description'], prof['overridable'], prof['active'], prof['deleted'], json.dumps(prof['profileData']), prof['modifyDate'], plugin, username)) return PolicyBean(ahenk_policy_version=json_data['agentPolicyVersion'], user_policy_version=json_data['userPolicyVersion'], ahenk_profiles=ahenk_prof_arr, user_profiles=user_prof_arr, timestamp=json_data['timestamp'], username=json_data['username'], agent_execution_id=json_data['agentCommandExecutionId'], user_execution_id=json_data['userCommandExecutionId']) + + def install_deb(self, full_path): + try: + process = subprocess.Popen('gdebi -n ' + full_path, shell=True) + process.wait() + except Exception as e: + self.logger.error('[ExecutionManager] Deb package couldn\'t install properly. Error Message: {}'.format(str(e))) + + def remove_file(self, full_path): + try: + subprocess.Popen('rm ' + full_path, shell=True) + self.logger.debug('[ExecutionManager] Removed file is {}'.format(full_path)) + except Exception as e: + self.logger.debug('[ExecutionManager] File couldn\'t removed. Error Message: {}'.format(str(e))) diff --git a/opt/ahenk/base/plugin/Plugin.py b/opt/ahenk/base/plugin/Plugin.py index e8ab6eb..70992ef 100644 --- a/opt/ahenk/base/plugin/Plugin.py +++ b/opt/ahenk/base/plugin/Plugin.py @@ -5,8 +5,8 @@ import subprocess import threading from base.Scope import Scope -from base.model.MessageType import MessageType from base.model.Response import Response +from base.model.enum.MessageType import MessageType class Context(object): @@ -59,44 +59,55 @@ class Plugin(threading.Thread): self.response_queue = scope.getResponseQueue() self.messaging = scope.getMessageManager() self.db_service = scope.getDbService() - self.messager = None + # self.messager = None self.keep_run = True self.context = Context() def run(self): - while self.keep_run: try: - item_obj = self.InQueue.get(block=True) - obj_name = item_obj.obj_name + try: + item_obj = self.InQueue.get(block=True) + obj_name = item_obj.obj_name + except Exception as e: + self.logger.error('[Plugin] A problem occurred while executing process. Error Message: {}'.format()) + if obj_name == "TASK": + self.logger.debug('[Plugin] Executing task') command = Scope.getInstance().getPluginManager().findCommand(self.getName(), item_obj.get_command_cls_id().lower()) command.handle_task(item_obj, self.context) # TODO create response message from context and item_obj. item_obj is task response = Response(type=MessageType.TASK_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType')) # self.response_queue.put(self.messaging.response_msg(response)) #TODO DEBUG - Scope.getInstance().getMessager().send_direct_message(self.messaging.task_status_msg(response)) # TODO REMOVE + # Scope.getInstance().getMessager().send_direct_message(self.messaging.task_status_msg(response)) # TODO REMOVE # Empty context for next use self.context.empty_data() elif obj_name == "PROFILE": + self.logger.debug('[Plugin] Executing profile') profile_data = item_obj.get_profile_data() - policy_module = Scope.getInstance().getPluginManager().findPolicyModule(item_obj.get_plugin().get_name()) - self.context.put('username', item_obj.get_username()) - policy_module.handle_policy(profile_data, self.context) - execution_id = self.get_execution_id(item_obj.get_id()) - policy_ver = self.get_policy_version(item_obj.get_id()) + try: + policy_module = Scope.getInstance().getPluginManager().findPolicyModule(item_obj.get_plugin().get_name()) + except Exception as e: + self.logger.error('[Plugin] A problem occurred while getting module. Error Message: {}'.format(str(e))) - response = Response(type=MessageType.POLICY_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType'), execution_id=execution_id, policy_version=policy_ver) - # self.response_queue.put(self.messaging.response_msg(response)) #TODO DEBUG - Scope.getInstance().getMessager().send_direct_message(self.messaging.policy_status_msg(response)) # TODO REMOVE + if policy_module is not None: + self.context.put('username', item_obj.get_username()) + policy_module.handle_policy(profile_data, self.context) - # Empty context for next use - self.context.empty_data() + execution_id = self.get_execution_id(item_obj.get_id()) + policy_ver = self.get_policy_version(item_obj.get_id()) + + response = Response(type=MessageType.POLICY_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType'), execution_id=execution_id, policy_version=policy_ver) + # self.response_queue.put(self.messaging.response_msg(response)) #TODO DEBUG + # Scope.getInstance().getMessager().send_direct_message(self.messaging.policy_status_msg(response)) # TODO REMOVE + + # Empty context for next use + self.context.empty_data() elif obj_name == "KILL_SIGNAL": self.keep_run = False diff --git a/opt/ahenk/base/plugin/PluginManager.py b/opt/ahenk/base/plugin/PluginManager.py index 344d35a..53a8a77 100644 --- a/opt/ahenk/base/plugin/PluginManager.py +++ b/opt/ahenk/base/plugin/PluginManager.py @@ -24,6 +24,7 @@ class PluginManager(object): self.plugins = [] self.pluginQueueDict = dict() self.logger = self.scope.getLogger() + #TODO version? def loadPlugins(self): @@ -94,7 +95,6 @@ class PluginManager(object): return None def processPolicy(self, policy): - #TODO do you need username in profile? username = policy.get_username() ahenk_profiles = policy.get_ahenk_profiles() diff --git a/opt/ahenk/base/plugin/plugin_manager.py b/opt/ahenk/base/plugin/plugin_manager.py index 7cf085a..bc9c718 100644 --- a/opt/ahenk/base/plugin/plugin_manager.py +++ b/opt/ahenk/base/plugin/plugin_manager.py @@ -1,6 +1,7 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- # Author: İsmail BAŞARAN +# Author: Volkan Şahin import imp import os @@ -8,6 +9,8 @@ from base.Scope import Scope from base.plugin.Plugin import Plugin from base.plugin.PluginQueue import PluginQueue from base.model.PluginKillSignal import PluginKillSignal +from base.model.PluginBean import PluginBean + # TODO create base abstract class class PluginManager(object): @@ -22,8 +25,11 @@ class PluginManager(object): self.plugins = [] self.pluginQueueDict = dict() self.logger = self.scope.getLogger() + self.message_manager = self.scope.getMessageManager() + self.delayed_profiles = {} + self.delayed_tasks = {} - #TODO version? + # TODO version? def loadPlugins(self): """ This method loads plugins @@ -41,17 +47,26 @@ class PluginManager(object): try: self.loadSinglePlugin(pname) except Exception as e: - self.logger.error('Exception occured when loading plugin ! Plugin name : ' + str(pname) + ' Exception : ' + str(e)) + self.logger.error('[PluginManager] Exception occured when loading plugin ! Plugin name : ' + str(pname) + ' Exception : ' + str(e)) self.logger.info('[PluginManager] Loaded plugins successfully.') - def loadSinglePlugin(self, pluginName): + def loadSinglePlugin(self, plugin_name): # TODO check already loaded plugin - self.pluginQueueDict[pluginName] = PluginQueue() - plugin = Plugin(pluginName, self.pluginQueueDict[pluginName]) + self.pluginQueueDict[plugin_name] = PluginQueue() + plugin = Plugin(plugin_name, self.pluginQueueDict[plugin_name]) plugin.setDaemon(True) plugin.start() self.plugins.append(plugin) + self.logger.debug('[PluginManager] New plugin was loaded.') + + if len(self.delayed_profiles) > 0: + self.pluginQueueDict[plugin_name].put(self.delayed_profiles[plugin_name], 1) + self.logger.debug('[PluginManager] Delayed profile was found for this plugin. It will be run.') + if len(self.delayed_tasks) > 0: + self.pluginQueueDict[plugin_name].put(self.delayed_tasks[plugin_name], 1) + self.logger.debug('[PluginManager] Delayed task was found for this plugin. It will be run.') + def findCommand(self, pluginName, commandId): location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pluginName) if os.path.isdir(location) and commandId + ".py" in os.listdir(location): @@ -63,15 +78,25 @@ class PluginManager(object): def processTask(self, task): + ## + scope = Scope().getInstance() + self.messenger = scope.getMessager() + ## + try: - if task.get_plugin().get_name().lower() in self.pluginQueueDict: - self.pluginQueueDict[task.get_plugin().get_name().lower()].put(task, 1) + plugin_name = task.get_plugin().get_name().lower() + plugin_ver = task.get_plugin().get_version() + if plugin_name in self.pluginQueueDict: + self.pluginQueueDict[plugin_name].put(task, 1) + else: + self.logger.warning('[PluginManager] {} plugin not found. Task was delayed. Ahenk will request plugin from Lider if distribution available'.format(plugin_name)) + self.delayed_tasks[plugin_name] = task + msg = self.message_manager.missing_plugin_message(PluginBean(name=plugin_name, version=plugin_ver)) + self.messenger.send_direct_message(msg) except Exception as e: - # TODO update task - status to not found command - self.logger.error("[PluginManager] Exception occurred when processing task " + str(e)) + self.logger.error('[PluginManager] Exception occurred while processing task. Error Message: {}'.format(str(e))) def reloadPlugins(self): - #TODO try: self.logger.info('[PluginManager] Reloading plugins... ') kill_sgnl = PluginKillSignal() @@ -93,35 +118,47 @@ class PluginManager(object): return None def processPolicy(self, policy): - #TODO do you need username in profile? + self.logger.info('[PluginManager] Processing policies...') username = policy.username ahenk_profiles = policy.ahenk_profiles user_profiles = policy.user_profiles if ahenk_profiles is not None: + self.logger.info('[PluginManager] Working on Ahenk profiles...') for profile in ahenk_profiles: profile.set_username(None) self.process_profile(profile) if user_profiles is not None: + self.logger.info('[PluginManager] Working on User profiles...') for profile in user_profiles: profile.set_username(username) self.process_profile(profile) def process_profile(self, profile): + + ## + scope = Scope().getInstance() + self.messenger = scope.getMessager() + ## try: plugin = profile.get_plugin() plugin_name = plugin.get_name() + plugin_ver = plugin.get_version() if plugin_name in self.pluginQueueDict: self.pluginQueueDict[plugin_name].put(profile, 1) + else: + self.logger.warning('[PluginManager] {} plugin not found. Profile was delayed. Ahenk will request plugin from Lider if distribution available'.format(plugin_name)) + self.delayed_profiles[plugin_name] = profile + msg = self.message_manager.missing_plugin_message(PluginBean(name=plugin_name, version=plugin_ver)) + self.messenger.send_direct_message(msg) except Exception as e: - print("Exception occured..") - self.logger.error("Policy profile not processed " + str(profile.plugin.name)) + self.logger.error('[PluginManager] Exception occurred while processing profile. Error Message: {}'.format(str(e))) def checkPluginExists(self, plugin_name, version=None): - criteria = ' name=\''+plugin_name+'\'' + criteria = ' name=\'' + plugin_name + '\'' if version is not None: criteria += ' and version=\'' + str(version) + '\'' result = self.db_service.select('plugin', 'name', criteria) @@ -142,4 +179,3 @@ class PluginManager(object): def printQueueSize(self): print("size " + str(len(self.pluginQueueDict))) -