Ahenk/opt/ahenk/base/execution/ExecutionManager.py

317 lines
19 KiB
Python
Raw Normal View History

2016-03-08 18:05:00 +02:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: Volkan Şahin <volkansah.in> <bm.volkansahin@gmail.com>
import hashlib
import json
import os
import shutil
import stat
import subprocess
2016-03-08 18:05:00 +02:00
from base.Scope import Scope
2016-06-29 12:42:43 +03:00
from base.util.util import Util
2016-04-06 14:28:29 +03:00
from base.model.PluginBean import PluginBean
from base.model.PolicyBean import PolicyBean
from base.model.ProfileBean import ProfileBean
from base.model.TaskBean import TaskBean
from base.model.enum.MessageType import MessageType
2016-06-29 12:42:43 +03:00
from base.file.file_transfer_manager import FileTransferManager
from base.system.system import System
2016-03-08 18:05:00 +02:00
class ExecutionManager(object):
"""docstring for FileTransferManager"""
2016-06-29 12:42:43 +03:00
# TODO more logs
def __init__(self):
super(ExecutionManager, self).__init__()
scope = Scope.getInstance()
self.config_manager = scope.getConfigurationManager()
self.event_manager = scope.getEventManager()
2016-03-14 17:16:26 +02:00
self.task_manager = scope.getTaskManager()
self.messenger = scope.getMessenger()
self.logger = scope.getLogger()
self.db_service = scope.getDbService()
self.message_manager = scope.getMessageManager()
self.plugin_manager = scope.getPluginManager()
self.event_manager.register_event(MessageType.EXECUTE_SCRIPT.value, self.execute_script)
# send file ahenk to lider
self.event_manager.register_event(MessageType.REQUEST_FILE.value, self.request_file)
# send file lider to ahenk
2016-06-29 12:42:43 +03:00
self.event_manager.register_event(MessageType.RETRIVE_FILE.value, self.retrieve_file)
self.event_manager.register_event(MessageType.MOVE_FILE.value, self.move_file)
self.event_manager.register_event(MessageType.EXECUTE_TASK.value, self.execute_task)
self.event_manager.register_event(MessageType.EXECUTE_POLICY.value, self.execute_policy)
self.event_manager.register_event(MessageType.INSTALL_PLUGIN.value, self.install_plugin)
2016-06-27 17:16:09 +03:00
self.event_manager.register_event(MessageType.RESPONSE_AGREEMENT.value, self.agreement_update)
def agreement_update(self, arg):
2016-06-29 12:42:43 +03:00
try:
json_data = json.loads(arg)
transfer_manager = FileTransferManager(json_data['protocol'], json_data['parameterMap'])
transfer_manager.transporter.connect()
file_name = transfer_manager.transporter.get_file()
transfer_manager.transporter.disconnect()
agreement_content = Util.read_file(System.Ahenk.received_dir_path() + file_name)
2016-06-30 12:40:29 +03:00
# TODO
title = 'Ahenk Agreement'
2016-06-29 12:42:43 +03:00
if agreement_content is not None and agreement_content != '':
2016-06-30 12:40:29 +03:00
self.db_service.update('contract', self.db_service.get_cols('contract'), [agreement_content, title, json_data['timestamp']])
2016-06-29 12:42:43 +03:00
except Exception as e:
self.logger.error('[ExecutionManager] A problem occurred while updating agreement. Error Message : {}'.format(str(e)))
def install_plugin(self, arg):
plugin = json.loads(arg)
self.logger.debug('[ExecutionManager] Installing missing plugin')
try:
plugin_name = plugin['pluginName']
plugin_version = plugin['pluginVersion']
try:
2016-06-29 12:42:43 +03:00
transfer_manager = FileTransferManager(plugin['protocol'], plugin['parameterMap'])
transfer_manager.transporter.connect()
file_name = transfer_manager.transporter.get_file()
transfer_manager.transporter.disconnect()
downloaded_file = Util.read_file(System.Ahenk.received_dir_path() + file_name)
except Exception as e:
self.logger.error('[ExecutionManager] Plugin package could not fetch. Error Message: {}.'.format(str(e)))
self.logger.error('[ExecutionManager] Plugin Installation is cancelling')
return
try:
self.install_deb(downloaded_file)
self.logger.debug('[ExecutionManager] Plugin installed.')
except Exception as e:
self.logger.error('[ExecutionManager] Could not install plugin. Error Message: {}'.format(str(e)))
return
try:
2016-06-29 12:42:43 +03:00
self.remove_file(downloaded_file)
self.logger.debug('[ExecutionManager] Temp files were removed.')
except Exception as e:
self.logger.error('[ExecutionManager] Could not remove temp file. Error Message: {}'.format(str(e)))
self.plugin_manager.loadSinglePlugin(plugin_name)
except Exception as e:
2016-06-29 12:42:43 +03:00
self.logger.error('[ExecutionManager] A problem occurred while installing new Ahenk plugin. Error Message:{}'.format(str(e)))
def execute_policy(self, arg):
self.logger.debug('[ExecutionManager] Updating policies...')
2016-04-06 14:28:29 +03:00
policy = self.json_to_PolicyBean(json.loads(arg))
2016-03-31 18:21:24 +03:00
machine_uid = self.db_service.select_one_result('registration', 'jid', 'registered=1')
ahenk_policy_ver = self.db_service.select_one_result('policy', 'version', 'type = \'A\'')
2016-04-06 14:28:29 +03:00
user_policy_version = self.db_service.select_one_result('policy', 'version', 'type = \'U\' and name = \'' + policy.get_username() + '\'')
profile_columns = ['id', 'create_date', 'modify_date', 'label', 'description', 'overridable', 'active', 'deleted', 'profile_data', 'plugin']
plugin_columns = ['active', 'create_date', 'deleted', 'description', 'machine_oriented', 'modify_date', 'name', 'policy_plugin', 'user_oriented', 'version', 'task_plugin', 'x_based']
2016-04-06 14:28:29 +03:00
if policy.get_ahenk_policy_version() != ahenk_policy_ver:
ahenk_policy_id = self.db_service.select_one_result('policy', 'id', 'type = \'A\'')
if ahenk_policy_id is not None:
self.db_service.delete('profile', 'id=' + str(ahenk_policy_id))
self.db_service.delete('plugin', 'id=' + str(ahenk_policy_id))
2016-04-06 14:28:29 +03:00
self.db_service.update('policy', ['version'], [str(policy.get_ahenk_policy_version())], 'type=\'A\'')
else:
self.db_service.update('policy', ['type', 'version', 'name', 'execution_id'], ['A', str(policy.get_ahenk_policy_version()), machine_uid, policy.get_agent_execution_id()])
ahenk_policy_id = self.db_service.select_one_result('policy', 'id', 'type = \'A\'')
2016-03-30 17:34:10 +03:00
for profile in policy.get_ahenk_profiles():
plugin = profile.get_plugin()
2016-04-06 14:28:29 +03:00
plugin_args = [str(plugin.get_active()), str(plugin.get_create_date()), str(plugin.get_deleted()), str(plugin.get_description()), str(plugin.get_machine_oriented()), str(plugin.get_modify_date()), str(plugin.get_name()), str(plugin.get_policy_plugin()), str(plugin.get_user_oriented()), str(plugin.get_version()), str(plugin.get_task_plugin()), str(plugin.get_x_based())]
plugin_id = self.db_service.update('plugin', plugin_columns, plugin_args)
2016-04-06 14:28:29 +03:00
profile_args = [str(ahenk_policy_id), str(profile.get_create_date()), str(profile.get_modify_date()), str(profile.get_label()), str(profile.get_description()), str(profile.get_overridable()), str(profile.get_active()), str(profile.get_deleted()), str(profile.get_profile_data()), plugin_id]
self.db_service.update('profile', profile_columns, profile_args)
2016-04-07 19:18:22 +03:00
else:
self.logger.debug('[ExecutionManager] Already there is ahenk policy. Command Execution Id is updating')
self.db_service.update('policy', ['execution_id'], [policy.get_agent_execution_id()], 'type = \'A\'')
2016-04-06 14:28:29 +03:00
if policy.get_user_policy_version() != user_policy_version:
user_policy_id = self.db_service.select_one_result('policy', 'id', 'type = \'U\' and name=\'' + policy.get_username() + '\'')
if user_policy_id is not None:
# TODO remove profiles' plugins
self.db_service.delete('profile', 'id=' + str(user_policy_id))
self.db_service.delete('plugin', 'id=' + str(user_policy_id))
2016-04-06 14:28:29 +03:00
self.db_service.update('policy', ['version'], [str(policy.get_user_policy_version())], 'type=\'U\' and name=\'' + policy.get_username() + '\'')
else:
self.db_service.update('policy', ['type', 'version', 'name', 'execution_id'], ['U', str(policy.get_user_policy_version()), policy.get_username(), policy.get_user_execution_id()])
2016-04-06 14:28:29 +03:00
user_policy_id = self.db_service.select_one_result('policy', 'id', 'type = \'U\' and name=\'' + policy.get_username() + '\'')
for profile in policy.get_user_profiles():
plugin = profile.get_plugin()
2016-04-06 14:28:29 +03:00
plugin_args = [str(plugin.get_active()), str(plugin.get_create_date()), str(plugin.get_deleted()), str(plugin.get_description()), str(plugin.get_machine_oriented()), str(plugin.get_modify_date()), str(plugin.get_name()), str(plugin.get_policy_plugin()), str(plugin.get_user_oriented()), str(plugin.get_version()), str(plugin.get_task_plugin()), str(plugin.get_x_based())]
plugin_id = self.db_service.update('plugin', plugin_columns, plugin_args)
2016-04-06 14:28:29 +03:00
profile_args = [str(user_policy_id), str(profile.get_create_date()), str(profile.get_modify_date()), str(profile.get_label()), str(profile.get_description()), str(profile.get_overridable()), str(profile.get_active()), str(profile.get_deleted()), str(profile.get_profile_data()), plugin_id]
self.db_service.update('profile', profile_columns, profile_args)
2016-04-06 14:28:29 +03:00
else:
self.logger.debug('[ExecutionManager] Already there is user policy. . Command Execution Id is updating')
self.db_service.update('policy', ['execution_id'], [policy.get_user_execution_id()], 'type = \'U\'')
2016-04-06 14:28:29 +03:00
policy = self.get_active_policies(policy.get_username())
self.task_manager.addPolicy(policy)
def get_active_policies(self, username):
user_policy = self.db_service.select('policy', ['id', 'version', 'name'], ' type=\'U\' and name=\'' + username + '\'')
ahenk_policy = self.db_service.select('policy', ['id', 'version'], ' type=\'A\' ')
plugin_columns = ['id', 'active', 'create_date', 'deleted', 'description', 'machine_oriented', 'modify_date', 'name', 'policy_plugin', 'user_oriented', 'version', 'task_plugin', 'x_based']
2016-04-06 14:28:29 +03:00
profile_columns = ['id', 'create_date', 'label', 'description', 'overridable', 'active', 'deleted', 'profile_data', 'modify_date', 'plugin']
policy = PolicyBean(username=username)
if len(user_policy) > 0:
user_policy_version = user_policy[0][0]
policy.set_user_policy_version(user_policy_version)
2016-04-06 14:28:29 +03:00
user_profiles = self.db_service.select('profile', profile_columns, ' id=' + str(user_policy_version) + ' ')
arr_profiles = []
if len(user_profiles) > 0:
for profile in user_profiles:
2016-04-06 14:28:29 +03:00
plu = self.db_service.select('plugin', plugin_columns, ' id=\'' + profile[9] + '\'')[0]
plugin = PluginBean(plu[0], plu[1], plu[2], plu[3], plu[4], plu[5], plu[6], plu[7], plu[8], plu[9], plu[10])
2016-04-06 14:28:29 +03:00
arr_profiles.append(ProfileBean(profile[0], profile[1], profile[2], profile[3], profile[4], profile[5], profile[6], profile[7], profile[8], plugin, policy.get_username()))
policy.set_user_profiles(arr_profiles)
if len(ahenk_policy) > 0:
ahenk_policy_version = ahenk_policy[0][0]
policy.set_ahenk_policy_version(ahenk_policy_version)
2016-04-06 14:28:29 +03:00
ahenk_profiles = self.db_service.select('profile', profile_columns, ' id=' + str(ahenk_policy_version) + ' ')
arr_profiles = []
if len(ahenk_profiles) > 0:
2016-04-06 14:28:29 +03:00
for profile in ahenk_profiles:
plu = self.db_service.select('plugin', plugin_columns, ' id=\'' + profile[9] + '\'')[0]
plugin = PluginBean(plu[0], plu[1], plu[2], plu[3], plu[4], plu[5], plu[6], plu[7], plu[8], plu[9], plu[10])
2016-04-06 14:28:29 +03:00
arr_profiles.append(ProfileBean(profile[0], profile[1], profile[2], profile[3], profile[4], profile[5], profile[6], profile[7], profile[8], plugin, policy.get_username()))
policy.set_ahenk_profiles(arr_profiles)
return policy
# from db
def get_installed_plugins(self):
plugins = self.db_service.select('plugin', ['name', 'version'])
p_list = []
for p in plugins:
p_list.append(str(p[0]) + '-' + str(p[1]))
return p_list
def execute_task(self, arg):
str_task = json.loads(arg)['task']
json_task = json.loads(str_task)
2016-06-30 12:40:29 +03:00
file_server_conf = None
task = self.json_to_task_bean(json_task, file_server_conf)
self.logger.debug('[ExecutionManager] Adding new task...Task is:{}'.format(task.get_command_cls_id()))
2016-03-14 17:16:26 +02:00
self.task_manager.addTask(task)
2016-03-21 12:02:15 +02:00
self.logger.debug('[ExecutionManager] Task added')
2016-06-30 12:40:29 +03:00
def json_to_task_bean(self, json_data, file_server_conf=None):
plu = json_data['plugin']
plugin = PluginBean(p_id=plu['id'], active=plu['active'], create_date=plu['createDate'], deleted=plu['deleted'], description=plu['description'], machine_oriented=plu['machineOriented'], modify_date=plu['modifyDate'], name=plu['name'], policy_plugin=plu['policyPlugin'], user_oriented=plu['userOriented'], version=plu['version'], task_plugin=plu['taskPlugin'], x_based=plu['xBased'])
2016-06-30 12:40:29 +03:00
return TaskBean(_id=json_data['id'], create_date=json_data['createDate'], modify_date=json_data['modifyDate'], command_cls_id=json_data['commandClsId'], parameter_map=json_data['parameterMap'], deleted=json_data['deleted'], plugin=plugin, cron_str=json_data['cronExpression'], file_server=json.dumps(file_server_conf))
def move_file(self, arg):
2016-06-29 12:42:43 +03:00
default_file_path = System.Ahenk.received_dir_path()
j = json.loads(arg)
# msg_id =str(j['id']).lower()
2016-03-25 17:56:15 +02:00
target_file_path = str(j['filePath']).lower()
file_name = str(j['filename']).lower()
self.logger.debug('[ExecutionManager] ' + file_name + ' will be moved to ' + target_file_path)
shutil.move(default_file_path + file_name, target_file_path + file_name)
def execute_script(self, arg):
j = json.loads(arg)
# msg_id =str(j['id']).lower()
2016-03-25 17:56:15 +02:00
file_path = str(j['filePath']).lower()
time_stamp = str(j['timestamp']).lower()
self.logger.debug('[ExecutionManager] Making executable file (%s) for execution' % file_path)
st = os.stat(file_path)
os.chmod(file_path, st.st_mode | stat.S_IEXEC)
subprocess.call("/bin/sh " + file_path, shell=True)
2016-06-29 12:42:43 +03:00
def retrieve_file(self, arg):
self.logger.debug('[ExecutionManager] Retrieving file ...')
try:
json_data = json.loads(arg)
self.logger.debug('[ExecutionManager] Retrieving file protocol is {}'.format(str(json_data['protocol'])))
transfer_manager = FileTransferManager(json_data['protocol'], json_data['parameterMap'])
transfer_manager.transporter.connect()
file_name = transfer_manager.transporter.get_file()
transfer_manager.transporter.disconnect()
downloaded_file = Util.read_file(System.Ahenk.received_dir_path() + file_name)
self.logger.debug('[ExecutionManager] Retrieving file is completed successfully. Full path of file is {}'.format(downloaded_file))
except Exception as e:
self.logger.debug('[ExecutionManager] A problem occurred while retrieving file. Error Message: {}'.format(str(e)))
def request_file(self, arg):
2016-06-29 12:42:43 +03:00
# TODO change to new transfer way
j = json.loads(arg)
# msg_id =str(j['id']).lower()
2016-03-25 17:56:15 +02:00
file_path = str(j['filePath']).lower()
time_stamp = str(j['timestamp']).lower()
self.logger.debug('[ExecutionManager] Requested file is ' + file_path)
self.messenger.send_file(file_path)
def get_md5_file(self, fname):
self.logger.debug('[ExecutionManager] md5 hashing')
hash_md5 = hashlib.md5()
with open(fname, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return str(hash_md5.hexdigest())
2016-04-06 14:28:29 +03:00
def json_to_PolicyBean(self, json_data):
username = json_data['username']
ahenk_prof_json_arr = json_data['agentPolicyProfiles']
user_prof_json_arr = json_data['userPolicyProfiles']
2016-04-07 19:18:22 +03:00
2016-04-06 14:28:29 +03:00
ahenk_prof_arr = []
user_prof_arr = []
if ahenk_prof_json_arr is not None:
for prof in ahenk_prof_json_arr:
plu = json.loads(json.dumps(prof['plugin']))
plugin = PluginBean(p_id=plu['id'], active=plu['active'], create_date=plu['createDate'], deleted=plu['deleted'], description=plu['description'], machine_oriented=plu['machineOriented'], modify_date=plu['modifyDate'], name=plu['name'], policy_plugin=plu['policyPlugin'], user_oriented=plu['userOriented'], version=plu['version'], task_plugin=plu['taskPlugin'], x_based=plu['xBased'])
ahenk_prof_arr.append(ProfileBean(prof['id'], prof['createDate'], prof['label'], prof['description'], prof['overridable'], prof['active'], prof['deleted'], json.dumps(prof['profileData']), prof['modifyDate'], plugin, username))
2016-04-06 14:28:29 +03:00
if user_prof_json_arr is not None:
for prof in user_prof_json_arr:
plu = json.loads(json.dumps(prof['plugin']))
plugin = PluginBean(p_id=plu['id'], active=plu['active'], create_date=plu['createDate'], deleted=plu['deleted'], description=plu['description'], machine_oriented=plu['machineOriented'], modify_date=plu['modifyDate'], name=plu['name'], policy_plugin=plu['policyPlugin'], user_oriented=plu['userOriented'], version=plu['version'], task_plugin=plu['taskPlugin'], x_based=plu['xBased'])
user_prof_arr.append(ProfileBean(prof['id'], prof['createDate'], prof['label'], prof['description'], prof['overridable'], prof['active'], prof['deleted'], json.dumps(prof['profileData']), prof['modifyDate'], plugin, username))
2016-04-06 14:28:29 +03:00
return PolicyBean(ahenk_policy_version=json_data['agentPolicyVersion'], user_policy_version=json_data['userPolicyVersion'], ahenk_profiles=ahenk_prof_arr, user_profiles=user_prof_arr, timestamp=json_data['timestamp'], username=json_data['username'], agent_execution_id=json_data['agentCommandExecutionId'], user_execution_id=json_data['userCommandExecutionId'])
def install_deb(self, full_path):
try:
process = subprocess.Popen('gdebi -n ' + full_path, shell=True)
process.wait()
except Exception as e:
self.logger.error('[ExecutionManager] Deb package couldn\'t install properly. Error Message: {}'.format(str(e)))
def remove_file(self, full_path):
try:
subprocess.Popen('rm ' + full_path, shell=True)
self.logger.debug('[ExecutionManager] Removed file is {}'.format(full_path))
except Exception as e:
self.logger.debug('[ExecutionManager] File couldn\'t removed. Error Message: {}'.format(str(e)))