file transfer operations were updated

This commit is contained in:
Volkan Şahin 2016-06-29 12:42:43 +03:00
parent e53a989cd2
commit edf0d13082

View file

@ -8,21 +8,22 @@ import os
import shutil
import stat
import subprocess
import urllib.request
import uuid
from base.Scope import Scope
from base.messaging.ssh_file_transfer import FileTransfer
from base.util.util import Util
from base.model.PluginBean import PluginBean
from base.model.PolicyBean import PolicyBean
from base.model.ProfileBean import ProfileBean
from base.model.TaskBean import TaskBean
from base.model.enum.MessageType import MessageType
from base.file.file_transfer_manager import FileTransferManager
from base.system.system import System
class ExecutionManager(object):
"""docstring for FileTransferManager"""
# TODO more logs
def __init__(self):
super(ExecutionManager, self).__init__()
@ -40,7 +41,7 @@ class ExecutionManager(object):
# send file ahenk to lider
self.event_manager.register_event(MessageType.REQUEST_FILE.value, self.request_file)
# send file lider to ahenk
self.event_manager.register_event(MessageType.RETRIVE_FILE.value, self.retrive_file)
self.event_manager.register_event(MessageType.RETRIVE_FILE.value, self.retrieve_file)
self.event_manager.register_event(MessageType.MOVE_FILE.value, self.move_file)
self.event_manager.register_event(MessageType.EXECUTE_TASK.value, self.execute_task)
self.event_manager.register_event(MessageType.EXECUTE_POLICY.value, self.execute_policy)
@ -48,9 +49,21 @@ class ExecutionManager(object):
self.event_manager.register_event(MessageType.RESPONSE_AGREEMENT.value, self.agreement_update)
def agreement_update(self, arg):
plugin = json.loads(arg)
if plugin['content'] != 'null' and plugin['content'] != '':
self.db_service.update('contract', self.db_service.get_cols('contract'), [plugin['content'], plugin['title'], plugin['timestamp']])
try:
json_data = json.loads(arg)
transfer_manager = FileTransferManager(json_data['protocol'], json_data['parameterMap'])
transfer_manager.transporter.connect()
file_name = transfer_manager.transporter.get_file()
transfer_manager.transporter.disconnect()
agreement_content = Util.read_file(System.Ahenk.received_dir_path() + file_name)
if agreement_content is not None and agreement_content != '':
self.db_service.update('contract', self.db_service.get_cols('contract'), [agreement_content, json_data['title'], json_data['timestamp']])
except Exception as e:
self.logger.error('[ExecutionManager] A problem occurred while updating agreement. Error Message : {}'.format(str(e)))
def install_plugin(self, arg):
plugin = json.loads(arg)
@ -58,43 +71,27 @@ class ExecutionManager(object):
try:
plugin_name = plugin['pluginName']
plugin_version = plugin['pluginVersion']
parameter_map = json.loads(json.dumps(plugin['parameterMap']))
temp_file = self.config_manager.get('CONNECTION', 'receivefileparam') + str(uuid.uuid4()) + '.deb'
if str(plugin['protocol']).lower() == 'ssh':
try:
self.logger.debug('[ExecutionManager] Distribution protocol is {}'.format(str(plugin['protocol']).lower()))
host = parameter_map['host']
username = parameter_map['username']
password = parameter_map['password']
port = parameter_map['port']
path = parameter_map['path']
transfer = FileTransfer(host, port, username, password)
transfer.connect()
transfer.get_file(temp_file, path)
except Exception as e:
self.logger.error('[ExecutionManager] Plugin package could not fetch. Error Message: {}.'.format(str(e)))
self.logger.error('[ExecutionManager] Plugin Installation is cancelling')
return
elif plugin['protocol'].lower() == 'http':
self.logger.debug('[ExecutionManager] Distribution protocol is {}.'.format(str(plugin['protocol']).lower()))
urllib.request.urlretrieve(parameter_map['url'], temp_file)
else:
self.logger.debug('[ExecutionManager] Unsupported protocol is {}.'.format(str(plugin['protocol']).lower()))
self.logger.debug('[ExecutionManager] Plugin package downloaded via {}.'.format(str(plugin['protocol']).lower()))
try:
self.install_deb(temp_file)
transfer_manager = FileTransferManager(plugin['protocol'], plugin['parameterMap'])
transfer_manager.transporter.connect()
file_name = transfer_manager.transporter.get_file()
transfer_manager.transporter.disconnect()
downloaded_file = Util.read_file(System.Ahenk.received_dir_path() + file_name)
except Exception as e:
self.logger.error('[ExecutionManager] Plugin package could not fetch. Error Message: {}.'.format(str(e)))
self.logger.error('[ExecutionManager] Plugin Installation is cancelling')
return
try:
self.install_deb(downloaded_file)
self.logger.debug('[ExecutionManager] Plugin installed.')
except Exception as e:
self.logger.error('[ExecutionManager] Could not install plugin. Error Message: {}'.format(str(e)))
return
try:
self.remove_file(temp_file)
self.remove_file(downloaded_file)
self.logger.debug('[ExecutionManager] Temp files were removed.')
except Exception as e:
self.logger.error('[ExecutionManager] Could not remove temp file. Error Message: {}'.format(str(e)))
@ -102,7 +99,7 @@ class ExecutionManager(object):
self.plugin_manager.loadSinglePlugin(plugin_name)
except Exception as e:
self.logger.error('[ExecutionManager] A problem occurred while installing new ahenk plugin. Error Message:{}'.format(str(e)))
self.logger.error('[ExecutionManager] A problem occurred while installing new Ahenk plugin. Error Message:{}'.format(str(e)))
def execute_policy(self, arg):
@ -228,7 +225,7 @@ class ExecutionManager(object):
return TaskBean(_id=json_data['id'], create_date=json_data['createDate'], modify_date=json_data['modifyDate'], command_cls_id=json_data['commandClsId'], parameter_map=json_data['parameterMap'], deleted=json_data['deleted'], plugin=plugin, cron_str=json_data['cronExpression'])
def move_file(self, arg):
default_file_path = self.config_manager.get('CONNECTION', 'receiveFileParam')
default_file_path = System.Ahenk.received_dir_path()
j = json.loads(arg)
# msg_id =str(j['id']).lower()
target_file_path = str(j['filePath']).lower()
@ -246,33 +243,22 @@ class ExecutionManager(object):
os.chmod(file_path, st.st_mode | stat.S_IEXEC)
subprocess.call("/bin/sh " + file_path, shell=True)
def retrive_file(self, arg):
j = json.loads(arg)
parameter_map = json.loads(json.dumps(j['parameterMap']))
temp_file_path = self.config_manager.get('CONNECTION', 'receivefileparam')
temp_file_name = str(uuid.uuid4())
if str(j['protocol']).lower() == 'ssh':
self.logger.debug('[ExecutionManager] Retrive file protocol is {}'.format(str(j['protocol']).lower()))
host = parameter_map['host']
username = parameter_map['username']
password = parameter_map['password']
port = parameter_map['port']
path = parameter_map['path']
transfer = FileTransfer(host, port, username, password)
transfer.connect()
transfer.get_file(temp_file_path + temp_file_name, path)
elif str(j['protocol']).lower() == 'http':
self.logger.debug('[ExecutionManager] Retrive file protocol is {}.'.format(str(j['protocol']).lower()))
urllib.request.urlretrieve(parameter_map['url'], temp_file_path + temp_file_name)
else:
self.logger.debug('[ExecutionManager] Unsupported protocol is {}.'.format(str(j['protocol']).lower()))
md5_hash = self.get_md5_file(temp_file_path + temp_file_name)
os.rename(temp_file_path + temp_file_name, temp_file_path + md5_hash)
def retrieve_file(self, arg):
self.logger.debug('[ExecutionManager] Retrieving file ...')
try:
json_data = json.loads(arg)
self.logger.debug('[ExecutionManager] Retrieving file protocol is {}'.format(str(json_data['protocol'])))
transfer_manager = FileTransferManager(json_data['protocol'], json_data['parameterMap'])
transfer_manager.transporter.connect()
file_name = transfer_manager.transporter.get_file()
transfer_manager.transporter.disconnect()
downloaded_file = Util.read_file(System.Ahenk.received_dir_path() + file_name)
self.logger.debug('[ExecutionManager] Retrieving file is completed successfully. Full path of file is {}'.format(downloaded_file))
except Exception as e:
self.logger.debug('[ExecutionManager] A problem occurred while retrieving file. Error Message: {}'.format(str(e)))
def request_file(self, arg):
# TODO change to new transfer way
j = json.loads(arg)
# msg_id =str(j['id']).lower()
file_path = str(j['filePath']).lower()