diff --git a/opt/ahenk/base/execution/ExecutionManager.py b/opt/ahenk/base/execution/ExecutionManager.py index 3a89a49..d4722b0 100644 --- a/opt/ahenk/base/execution/ExecutionManager.py +++ b/opt/ahenk/base/execution/ExecutionManager.py @@ -8,21 +8,22 @@ import os import shutil import stat import subprocess -import urllib.request -import uuid from base.Scope import Scope -from base.messaging.ssh_file_transfer import FileTransfer +from base.util.util import Util from base.model.PluginBean import PluginBean from base.model.PolicyBean import PolicyBean from base.model.ProfileBean import ProfileBean from base.model.TaskBean import TaskBean from base.model.enum.MessageType import MessageType +from base.file.file_transfer_manager import FileTransferManager +from base.system.system import System class ExecutionManager(object): """docstring for FileTransferManager""" + # TODO more logs def __init__(self): super(ExecutionManager, self).__init__() @@ -40,7 +41,7 @@ class ExecutionManager(object): # send file ahenk to lider self.event_manager.register_event(MessageType.REQUEST_FILE.value, self.request_file) # send file lider to ahenk - self.event_manager.register_event(MessageType.RETRIVE_FILE.value, self.retrive_file) + self.event_manager.register_event(MessageType.RETRIVE_FILE.value, self.retrieve_file) self.event_manager.register_event(MessageType.MOVE_FILE.value, self.move_file) self.event_manager.register_event(MessageType.EXECUTE_TASK.value, self.execute_task) self.event_manager.register_event(MessageType.EXECUTE_POLICY.value, self.execute_policy) @@ -48,9 +49,21 @@ class ExecutionManager(object): self.event_manager.register_event(MessageType.RESPONSE_AGREEMENT.value, self.agreement_update) def agreement_update(self, arg): - plugin = json.loads(arg) - if plugin['content'] != 'null' and plugin['content'] != '': - self.db_service.update('contract', self.db_service.get_cols('contract'), [plugin['content'], plugin['title'], plugin['timestamp']]) + + try: + json_data = json.loads(arg) + transfer_manager = FileTransferManager(json_data['protocol'], json_data['parameterMap']) + + transfer_manager.transporter.connect() + file_name = transfer_manager.transporter.get_file() + transfer_manager.transporter.disconnect() + + agreement_content = Util.read_file(System.Ahenk.received_dir_path() + file_name) + + if agreement_content is not None and agreement_content != '': + self.db_service.update('contract', self.db_service.get_cols('contract'), [agreement_content, json_data['title'], json_data['timestamp']]) + except Exception as e: + self.logger.error('[ExecutionManager] A problem occurred while updating agreement. Error Message : {}'.format(str(e))) def install_plugin(self, arg): plugin = json.loads(arg) @@ -58,43 +71,27 @@ class ExecutionManager(object): try: plugin_name = plugin['pluginName'] plugin_version = plugin['pluginVersion'] - parameter_map = json.loads(json.dumps(plugin['parameterMap'])) - temp_file = self.config_manager.get('CONNECTION', 'receivefileparam') + str(uuid.uuid4()) + '.deb' - - if str(plugin['protocol']).lower() == 'ssh': - try: - self.logger.debug('[ExecutionManager] Distribution protocol is {}'.format(str(plugin['protocol']).lower())) - host = parameter_map['host'] - username = parameter_map['username'] - password = parameter_map['password'] - port = parameter_map['port'] - path = parameter_map['path'] - - transfer = FileTransfer(host, port, username, password) - transfer.connect() - transfer.get_file(temp_file, path) - except Exception as e: - self.logger.error('[ExecutionManager] Plugin package could not fetch. Error Message: {}.'.format(str(e))) - self.logger.error('[ExecutionManager] Plugin Installation is cancelling') - return - - elif plugin['protocol'].lower() == 'http': - self.logger.debug('[ExecutionManager] Distribution protocol is {}.'.format(str(plugin['protocol']).lower())) - urllib.request.urlretrieve(parameter_map['url'], temp_file) - else: - self.logger.debug('[ExecutionManager] Unsupported protocol is {}.'.format(str(plugin['protocol']).lower())) - - self.logger.debug('[ExecutionManager] Plugin package downloaded via {}.'.format(str(plugin['protocol']).lower())) try: - self.install_deb(temp_file) + transfer_manager = FileTransferManager(plugin['protocol'], plugin['parameterMap']) + transfer_manager.transporter.connect() + file_name = transfer_manager.transporter.get_file() + transfer_manager.transporter.disconnect() + downloaded_file = Util.read_file(System.Ahenk.received_dir_path() + file_name) + except Exception as e: + self.logger.error('[ExecutionManager] Plugin package could not fetch. Error Message: {}.'.format(str(e))) + self.logger.error('[ExecutionManager] Plugin Installation is cancelling') + return + + try: + self.install_deb(downloaded_file) self.logger.debug('[ExecutionManager] Plugin installed.') except Exception as e: self.logger.error('[ExecutionManager] Could not install plugin. Error Message: {}'.format(str(e))) return try: - self.remove_file(temp_file) + self.remove_file(downloaded_file) self.logger.debug('[ExecutionManager] Temp files were removed.') except Exception as e: self.logger.error('[ExecutionManager] Could not remove temp file. Error Message: {}'.format(str(e))) @@ -102,7 +99,7 @@ class ExecutionManager(object): self.plugin_manager.loadSinglePlugin(plugin_name) except Exception as e: - self.logger.error('[ExecutionManager] A problem occurred while installing new ahenk plugin. Error Message:{}'.format(str(e))) + self.logger.error('[ExecutionManager] A problem occurred while installing new Ahenk plugin. Error Message:{}'.format(str(e))) def execute_policy(self, arg): @@ -228,7 +225,7 @@ class ExecutionManager(object): return TaskBean(_id=json_data['id'], create_date=json_data['createDate'], modify_date=json_data['modifyDate'], command_cls_id=json_data['commandClsId'], parameter_map=json_data['parameterMap'], deleted=json_data['deleted'], plugin=plugin, cron_str=json_data['cronExpression']) def move_file(self, arg): - default_file_path = self.config_manager.get('CONNECTION', 'receiveFileParam') + default_file_path = System.Ahenk.received_dir_path() j = json.loads(arg) # msg_id =str(j['id']).lower() target_file_path = str(j['filePath']).lower() @@ -246,33 +243,22 @@ class ExecutionManager(object): os.chmod(file_path, st.st_mode | stat.S_IEXEC) subprocess.call("/bin/sh " + file_path, shell=True) - def retrive_file(self, arg): - j = json.loads(arg) - parameter_map = json.loads(json.dumps(j['parameterMap'])) - temp_file_path = self.config_manager.get('CONNECTION', 'receivefileparam') - temp_file_name = str(uuid.uuid4()) - - if str(j['protocol']).lower() == 'ssh': - self.logger.debug('[ExecutionManager] Retrive file protocol is {}'.format(str(j['protocol']).lower())) - host = parameter_map['host'] - username = parameter_map['username'] - password = parameter_map['password'] - port = parameter_map['port'] - path = parameter_map['path'] - - transfer = FileTransfer(host, port, username, password) - transfer.connect() - transfer.get_file(temp_file_path + temp_file_name, path) - elif str(j['protocol']).lower() == 'http': - self.logger.debug('[ExecutionManager] Retrive file protocol is {}.'.format(str(j['protocol']).lower())) - urllib.request.urlretrieve(parameter_map['url'], temp_file_path + temp_file_name) - else: - self.logger.debug('[ExecutionManager] Unsupported protocol is {}.'.format(str(j['protocol']).lower())) - - md5_hash = self.get_md5_file(temp_file_path + temp_file_name) - os.rename(temp_file_path + temp_file_name, temp_file_path + md5_hash) + def retrieve_file(self, arg): + self.logger.debug('[ExecutionManager] Retrieving file ...') + try: + json_data = json.loads(arg) + self.logger.debug('[ExecutionManager] Retrieving file protocol is {}'.format(str(json_data['protocol']))) + transfer_manager = FileTransferManager(json_data['protocol'], json_data['parameterMap']) + transfer_manager.transporter.connect() + file_name = transfer_manager.transporter.get_file() + transfer_manager.transporter.disconnect() + downloaded_file = Util.read_file(System.Ahenk.received_dir_path() + file_name) + self.logger.debug('[ExecutionManager] Retrieving file is completed successfully. Full path of file is {}'.format(downloaded_file)) + except Exception as e: + self.logger.debug('[ExecutionManager] A problem occurred while retrieving file. Error Message: {}'.format(str(e))) def request_file(self, arg): + # TODO change to new transfer way j = json.loads(arg) # msg_id =str(j['id']).lower() file_path = str(j['filePath']).lower()