mirror of
https://github.com/Pardus-LiderAhenk/ahenk
synced 2024-11-22 10:52:17 +03:00
installing missing plugins on running ahenk
This commit is contained in:
parent
46fbff6144
commit
f167d9c8c3
4 changed files with 147 additions and 52 deletions
|
@ -8,13 +8,16 @@ import os
|
|||
import shutil
|
||||
import stat
|
||||
import subprocess
|
||||
import uuid
|
||||
|
||||
|
||||
from base.Scope import Scope
|
||||
from base.model.MessageType import MessageType
|
||||
from base.messaging.ssh_file_transfer import FileTransfer
|
||||
from base.model.PluginBean import PluginBean
|
||||
from base.model.PolicyBean import PolicyBean
|
||||
from base.model.ProfileBean import ProfileBean
|
||||
from base.model.TaskBean import TaskBean
|
||||
from base.model.enum.MessageType import MessageType
|
||||
|
||||
|
||||
class ExecutionManager(object):
|
||||
|
@ -27,26 +30,70 @@ class ExecutionManager(object):
|
|||
self.config_manager = scope.getConfigurationManager()
|
||||
self.event_manager = scope.getEventManager()
|
||||
self.task_manager = scope.getTaskManager()
|
||||
self.messager = scope.getMessager()
|
||||
self.messenger = scope.getMessager()
|
||||
self.logger = scope.getLogger()
|
||||
self.db_service = scope.getDbService()
|
||||
self.message_manager = scope.getMessageManager()
|
||||
self.plugin_manager = scope.getPluginManager()
|
||||
|
||||
self.event_manager.register_event(MessageType.EXECUTE_SCRIPT.value, self.execute_script)
|
||||
self.event_manager.register_event(MessageType.REQUEST_FILE.value, self.request_file)
|
||||
self.event_manager.register_event(MessageType.MOVE_FILE.value, self.move_file)
|
||||
self.event_manager.register_event(MessageType.EXECUTE_TASK.value, self.execute_task)
|
||||
self.event_manager.register_event(MessageType.EXECUTE_POLICY.value, self.execute_policy)
|
||||
self.event_manager.register_event(MessageType.INSTALL_PLUGIN.value, self.install_plugin)
|
||||
|
||||
def install_plugin(self, arg):
|
||||
plugin = json.loads(arg)
|
||||
self.logger.debug('[ExecutionManager] Installing missing plugin')
|
||||
try:
|
||||
plugin_name = plugin['pluginName']
|
||||
plugin_version = plugin['pluginVersion']
|
||||
parameter_map = json.loads(json.dumps(plugin['parameterMap']))
|
||||
|
||||
temp_file = self.config_manager.get('CONNECTION', 'receivefileparam') + str(uuid.uuid4()) + '.deb'
|
||||
|
||||
if str(plugin['protocol']).lower() == 'ssh':
|
||||
self.logger.debug('[ExecutionManager] Distribution protocol is {}'.format(str(plugin['protocol']).lower()))
|
||||
host = parameter_map['host']
|
||||
username = parameter_map['username']
|
||||
password = parameter_map['password']
|
||||
port = parameter_map['port']
|
||||
path = parameter_map['path']
|
||||
|
||||
transfer = FileTransfer(host, port, username, password)
|
||||
transfer.connect()
|
||||
transfer.get_file(temp_file, path)
|
||||
|
||||
elif plugin['protocol'].lower() == 'http':
|
||||
self.logger.debug('[ExecutionManager] Distribution protocol is {}.'.format(str(plugin['protocol']).lower()))
|
||||
#TODO
|
||||
#wget.download(parameter_map['url'], temp_file)
|
||||
pass
|
||||
|
||||
self.logger.debug('[ExecutionManager] Plugin package downloaded via {}.'.format(str(plugin['protocol']).lower()))
|
||||
self.install_deb(temp_file)
|
||||
self.logger.debug('[ExecutionManager] Plugin installed.')
|
||||
self.remove_file(temp_file)
|
||||
self.logger.debug('[ExecutionManager] Temp files were removed.')
|
||||
self.plugin_manager.loadSinglePlugin(plugin_name)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error('[ExecutionManager] A problem occurred while installing new ahenk plugin. Error Message:{}'.format(str(e)))
|
||||
|
||||
def execute_policy(self, arg):
|
||||
|
||||
##
|
||||
scope = Scope().getInstance()
|
||||
self.messenger = scope.getMessager()
|
||||
##
|
||||
|
||||
self.logger.debug('[ExecutionManager] Updating policies...')
|
||||
policy = self.json_to_PolicyBean(json.loads(arg))
|
||||
machine_uid = self.db_service.select_one_result('registration', 'jid', 'registered=1')
|
||||
ahenk_policy_ver = self.db_service.select_one_result('policy', 'version', 'type = \'A\'')
|
||||
user_policy_version = self.db_service.select_one_result('policy', 'version', 'type = \'U\' and name = \'' + policy.get_username() + '\'')
|
||||
|
||||
# TODO get installed plugins
|
||||
# installed_plugins = self.get_installed_plugins()
|
||||
# missing_plugins = []
|
||||
profile_columns = ['id', 'create_date', 'modify_date', 'label', 'description', 'overridable', 'active', 'deleted', 'profile_data', 'plugin']
|
||||
plugin_columns = ['active', 'create_date', 'deleted', 'description', 'machine_oriented', 'modify_date', 'name', 'policy_plugin', 'user_oriented', 'version']
|
||||
|
||||
|
@ -69,11 +116,6 @@ class ExecutionManager(object):
|
|||
profile_args = [str(ahenk_policy_id), str(profile.get_create_date()), str(profile.get_modify_date()), str(profile.get_label()), str(profile.get_description()), str(profile.get_overridable()), str(profile.get_active()), str(profile.get_deleted()), str(profile.get_profile_data()), plugin_id]
|
||||
self.db_service.update('profile', profile_columns, profile_args)
|
||||
|
||||
"""
|
||||
if profile.plugin.name not in installed_plugins and profile.plugin.name not in missing_plugins:
|
||||
missing_plugins.append(profile.plugin.name)
|
||||
"""
|
||||
|
||||
else:
|
||||
self.logger.debug('[ExecutionManager] Already there is ahenk policy')
|
||||
|
||||
|
@ -97,16 +139,9 @@ class ExecutionManager(object):
|
|||
profile_args = [str(user_policy_id), str(profile.get_create_date()), str(profile.get_modify_date()), str(profile.get_label()), str(profile.get_description()), str(profile.get_overridable()), str(profile.get_active()), str(profile.get_deleted()), str(profile.get_profile_data()), plugin_id]
|
||||
self.db_service.update('profile', profile_columns, profile_args)
|
||||
|
||||
"""
|
||||
if profile.get_plugin()['name'] not in installed_plugins and profile.get_plugin()['name'] not in missing_plugins:
|
||||
missing_plugins.append(profile.get_plugin()['name'])
|
||||
"""
|
||||
else:
|
||||
self.logger.debug('[ExecutionManager] Already there is user policy')
|
||||
|
||||
# TODO check plugins
|
||||
# print("but first need these plugins:" + str(missing_plugins))
|
||||
|
||||
policy = self.get_active_policies(policy.get_username())
|
||||
self.task_manager.addPolicy(policy)
|
||||
|
||||
|
@ -161,8 +196,7 @@ class ExecutionManager(object):
|
|||
json_task = json.loads(str_task)
|
||||
task = self.json_to_TaskBean(json_task)
|
||||
|
||||
print(task.get_command_cls_id())
|
||||
self.logger.debug('[ExecutionManager] Adding new task...')
|
||||
self.logger.debug('[ExecutionManager] Adding new task...Task is:{}'.format(task.get_command_cls_id()))
|
||||
|
||||
self.task_manager.addTask(task)
|
||||
self.logger.debug('[ExecutionManager] Task added')
|
||||
|
@ -200,7 +234,7 @@ class ExecutionManager(object):
|
|||
file_path = str(j['filePath']).lower()
|
||||
time_stamp = str(j['timestamp']).lower()
|
||||
self.logger.debug('[ExecutionManager] Requested file is ' + file_path)
|
||||
self.messager.send_file(file_path)
|
||||
self.messenger.send_file(file_path)
|
||||
|
||||
def get_md5_file(self, fname):
|
||||
self.logger.debug('[ExecutionManager] md5 hashing')
|
||||
|
@ -231,3 +265,17 @@ class ExecutionManager(object):
|
|||
user_prof_arr.append(ProfileBean(prof['id'], prof['createDate'], prof['label'], prof['description'], prof['overridable'], prof['active'], prof['deleted'], json.dumps(prof['profileData']), prof['modifyDate'], plugin, username))
|
||||
|
||||
return PolicyBean(ahenk_policy_version=json_data['agentPolicyVersion'], user_policy_version=json_data['userPolicyVersion'], ahenk_profiles=ahenk_prof_arr, user_profiles=user_prof_arr, timestamp=json_data['timestamp'], username=json_data['username'], agent_execution_id=json_data['agentCommandExecutionId'], user_execution_id=json_data['userCommandExecutionId'])
|
||||
|
||||
def install_deb(self, full_path):
|
||||
try:
|
||||
process = subprocess.Popen('gdebi -n ' + full_path, shell=True)
|
||||
process.wait()
|
||||
except Exception as e:
|
||||
self.logger.error('[ExecutionManager] Deb package couldn\'t install properly. Error Message: {}'.format(str(e)))
|
||||
|
||||
def remove_file(self, full_path):
|
||||
try:
|
||||
subprocess.Popen('rm ' + full_path, shell=True)
|
||||
self.logger.debug('[ExecutionManager] Removed file is {}'.format(full_path))
|
||||
except Exception as e:
|
||||
self.logger.debug('[ExecutionManager] File couldn\'t removed. Error Message: {}'.format(str(e)))
|
||||
|
|
|
@ -5,8 +5,8 @@ import subprocess
|
|||
import threading
|
||||
|
||||
from base.Scope import Scope
|
||||
from base.model.MessageType import MessageType
|
||||
from base.model.Response import Response
|
||||
from base.model.enum.MessageType import MessageType
|
||||
|
||||
|
||||
class Context(object):
|
||||
|
@ -59,44 +59,55 @@ class Plugin(threading.Thread):
|
|||
self.response_queue = scope.getResponseQueue()
|
||||
self.messaging = scope.getMessageManager()
|
||||
self.db_service = scope.getDbService()
|
||||
self.messager = None
|
||||
# self.messager = None
|
||||
|
||||
self.keep_run = True
|
||||
self.context = Context()
|
||||
|
||||
def run(self):
|
||||
|
||||
while self.keep_run:
|
||||
try:
|
||||
item_obj = self.InQueue.get(block=True)
|
||||
obj_name = item_obj.obj_name
|
||||
try:
|
||||
item_obj = self.InQueue.get(block=True)
|
||||
obj_name = item_obj.obj_name
|
||||
except Exception as e:
|
||||
self.logger.error('[Plugin] A problem occurred while executing process. Error Message: {}'.format())
|
||||
|
||||
if obj_name == "TASK":
|
||||
self.logger.debug('[Plugin] Executing task')
|
||||
command = Scope.getInstance().getPluginManager().findCommand(self.getName(), item_obj.get_command_cls_id().lower())
|
||||
command.handle_task(item_obj, self.context)
|
||||
# TODO create response message from context and item_obj. item_obj is task
|
||||
|
||||
response = Response(type=MessageType.TASK_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType'))
|
||||
# self.response_queue.put(self.messaging.response_msg(response)) #TODO DEBUG
|
||||
Scope.getInstance().getMessager().send_direct_message(self.messaging.task_status_msg(response)) # TODO REMOVE
|
||||
# Scope.getInstance().getMessager().send_direct_message(self.messaging.task_status_msg(response)) # TODO REMOVE
|
||||
|
||||
# Empty context for next use
|
||||
self.context.empty_data()
|
||||
|
||||
elif obj_name == "PROFILE":
|
||||
self.logger.debug('[Plugin] Executing profile')
|
||||
profile_data = item_obj.get_profile_data()
|
||||
policy_module = Scope.getInstance().getPluginManager().findPolicyModule(item_obj.get_plugin().get_name())
|
||||
self.context.put('username', item_obj.get_username())
|
||||
policy_module.handle_policy(profile_data, self.context)
|
||||
|
||||
execution_id = self.get_execution_id(item_obj.get_id())
|
||||
policy_ver = self.get_policy_version(item_obj.get_id())
|
||||
try:
|
||||
policy_module = Scope.getInstance().getPluginManager().findPolicyModule(item_obj.get_plugin().get_name())
|
||||
except Exception as e:
|
||||
self.logger.error('[Plugin] A problem occurred while getting module. Error Message: {}'.format(str(e)))
|
||||
|
||||
response = Response(type=MessageType.POLICY_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType'), execution_id=execution_id, policy_version=policy_ver)
|
||||
# self.response_queue.put(self.messaging.response_msg(response)) #TODO DEBUG
|
||||
Scope.getInstance().getMessager().send_direct_message(self.messaging.policy_status_msg(response)) # TODO REMOVE
|
||||
if policy_module is not None:
|
||||
self.context.put('username', item_obj.get_username())
|
||||
policy_module.handle_policy(profile_data, self.context)
|
||||
|
||||
# Empty context for next use
|
||||
self.context.empty_data()
|
||||
execution_id = self.get_execution_id(item_obj.get_id())
|
||||
policy_ver = self.get_policy_version(item_obj.get_id())
|
||||
|
||||
response = Response(type=MessageType.POLICY_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType'), execution_id=execution_id, policy_version=policy_ver)
|
||||
# self.response_queue.put(self.messaging.response_msg(response)) #TODO DEBUG
|
||||
# Scope.getInstance().getMessager().send_direct_message(self.messaging.policy_status_msg(response)) # TODO REMOVE
|
||||
|
||||
# Empty context for next use
|
||||
self.context.empty_data()
|
||||
|
||||
elif obj_name == "KILL_SIGNAL":
|
||||
self.keep_run = False
|
||||
|
|
|
@ -24,6 +24,7 @@ class PluginManager(object):
|
|||
self.plugins = []
|
||||
self.pluginQueueDict = dict()
|
||||
self.logger = self.scope.getLogger()
|
||||
|
||||
|
||||
#TODO version?
|
||||
def loadPlugins(self):
|
||||
|
@ -94,7 +95,6 @@ class PluginManager(object):
|
|||
return None
|
||||
|
||||
def processPolicy(self, policy):
|
||||
#TODO do you need username in profile?
|
||||
|
||||
username = policy.get_username()
|
||||
ahenk_profiles = policy.get_ahenk_profiles()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#!/usr/bin/python3
|
||||
# -*- coding: utf-8 -*-
|
||||
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
|
||||
# Author: Volkan Şahin <volkansah.in> <bm.volkansahin@gmail.com>
|
||||
import imp
|
||||
import os
|
||||
|
||||
|
@ -8,6 +9,8 @@ from base.Scope import Scope
|
|||
from base.plugin.Plugin import Plugin
|
||||
from base.plugin.PluginQueue import PluginQueue
|
||||
from base.model.PluginKillSignal import PluginKillSignal
|
||||
from base.model.PluginBean import PluginBean
|
||||
|
||||
|
||||
# TODO create base abstract class
|
||||
class PluginManager(object):
|
||||
|
@ -22,8 +25,11 @@ class PluginManager(object):
|
|||
self.plugins = []
|
||||
self.pluginQueueDict = dict()
|
||||
self.logger = self.scope.getLogger()
|
||||
self.message_manager = self.scope.getMessageManager()
|
||||
self.delayed_profiles = {}
|
||||
self.delayed_tasks = {}
|
||||
|
||||
#TODO version?
|
||||
# TODO version?
|
||||
def loadPlugins(self):
|
||||
"""
|
||||
This method loads plugins
|
||||
|
@ -41,17 +47,26 @@ class PluginManager(object):
|
|||
try:
|
||||
self.loadSinglePlugin(pname)
|
||||
except Exception as e:
|
||||
self.logger.error('Exception occured when loading plugin ! Plugin name : ' + str(pname) + ' Exception : ' + str(e))
|
||||
self.logger.error('[PluginManager] Exception occured when loading plugin ! Plugin name : ' + str(pname) + ' Exception : ' + str(e))
|
||||
self.logger.info('[PluginManager] Loaded plugins successfully.')
|
||||
|
||||
def loadSinglePlugin(self, pluginName):
|
||||
def loadSinglePlugin(self, plugin_name):
|
||||
# TODO check already loaded plugin
|
||||
self.pluginQueueDict[pluginName] = PluginQueue()
|
||||
plugin = Plugin(pluginName, self.pluginQueueDict[pluginName])
|
||||
self.pluginQueueDict[plugin_name] = PluginQueue()
|
||||
plugin = Plugin(plugin_name, self.pluginQueueDict[plugin_name])
|
||||
plugin.setDaemon(True)
|
||||
plugin.start()
|
||||
self.plugins.append(plugin)
|
||||
|
||||
self.logger.debug('[PluginManager] New plugin was loaded.')
|
||||
|
||||
if len(self.delayed_profiles) > 0:
|
||||
self.pluginQueueDict[plugin_name].put(self.delayed_profiles[plugin_name], 1)
|
||||
self.logger.debug('[PluginManager] Delayed profile was found for this plugin. It will be run.')
|
||||
if len(self.delayed_tasks) > 0:
|
||||
self.pluginQueueDict[plugin_name].put(self.delayed_tasks[plugin_name], 1)
|
||||
self.logger.debug('[PluginManager] Delayed task was found for this plugin. It will be run.')
|
||||
|
||||
def findCommand(self, pluginName, commandId):
|
||||
location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pluginName)
|
||||
if os.path.isdir(location) and commandId + ".py" in os.listdir(location):
|
||||
|
@ -63,15 +78,25 @@ class PluginManager(object):
|
|||
|
||||
def processTask(self, task):
|
||||
|
||||
##
|
||||
scope = Scope().getInstance()
|
||||
self.messenger = scope.getMessager()
|
||||
##
|
||||
|
||||
try:
|
||||
if task.get_plugin().get_name().lower() in self.pluginQueueDict:
|
||||
self.pluginQueueDict[task.get_plugin().get_name().lower()].put(task, 1)
|
||||
plugin_name = task.get_plugin().get_name().lower()
|
||||
plugin_ver = task.get_plugin().get_version()
|
||||
if plugin_name in self.pluginQueueDict:
|
||||
self.pluginQueueDict[plugin_name].put(task, 1)
|
||||
else:
|
||||
self.logger.warning('[PluginManager] {} plugin not found. Task was delayed. Ahenk will request plugin from Lider if distribution available'.format(plugin_name))
|
||||
self.delayed_tasks[plugin_name] = task
|
||||
msg = self.message_manager.missing_plugin_message(PluginBean(name=plugin_name, version=plugin_ver))
|
||||
self.messenger.send_direct_message(msg)
|
||||
except Exception as e:
|
||||
# TODO update task - status to not found command
|
||||
self.logger.error("[PluginManager] Exception occurred when processing task " + str(e))
|
||||
self.logger.error('[PluginManager] Exception occurred while processing task. Error Message: {}'.format(str(e)))
|
||||
|
||||
def reloadPlugins(self):
|
||||
#TODO
|
||||
try:
|
||||
self.logger.info('[PluginManager] Reloading plugins... ')
|
||||
kill_sgnl = PluginKillSignal()
|
||||
|
@ -93,35 +118,47 @@ class PluginManager(object):
|
|||
return None
|
||||
|
||||
def processPolicy(self, policy):
|
||||
#TODO do you need username in profile?
|
||||
|
||||
self.logger.info('[PluginManager] Processing policies...')
|
||||
username = policy.username
|
||||
ahenk_profiles = policy.ahenk_profiles
|
||||
user_profiles = policy.user_profiles
|
||||
|
||||
if ahenk_profiles is not None:
|
||||
self.logger.info('[PluginManager] Working on Ahenk profiles...')
|
||||
for profile in ahenk_profiles:
|
||||
profile.set_username(None)
|
||||
self.process_profile(profile)
|
||||
|
||||
if user_profiles is not None:
|
||||
self.logger.info('[PluginManager] Working on User profiles...')
|
||||
for profile in user_profiles:
|
||||
profile.set_username(username)
|
||||
self.process_profile(profile)
|
||||
|
||||
def process_profile(self, profile):
|
||||
|
||||
##
|
||||
scope = Scope().getInstance()
|
||||
self.messenger = scope.getMessager()
|
||||
##
|
||||
try:
|
||||
plugin = profile.get_plugin()
|
||||
plugin_name = plugin.get_name()
|
||||
plugin_ver = plugin.get_version()
|
||||
if plugin_name in self.pluginQueueDict:
|
||||
self.pluginQueueDict[plugin_name].put(profile, 1)
|
||||
else:
|
||||
self.logger.warning('[PluginManager] {} plugin not found. Profile was delayed. Ahenk will request plugin from Lider if distribution available'.format(plugin_name))
|
||||
self.delayed_profiles[plugin_name] = profile
|
||||
msg = self.message_manager.missing_plugin_message(PluginBean(name=plugin_name, version=plugin_ver))
|
||||
self.messenger.send_direct_message(msg)
|
||||
except Exception as e:
|
||||
print("Exception occured..")
|
||||
self.logger.error("Policy profile not processed " + str(profile.plugin.name))
|
||||
self.logger.error('[PluginManager] Exception occurred while processing profile. Error Message: {}'.format(str(e)))
|
||||
|
||||
def checkPluginExists(self, plugin_name, version=None):
|
||||
|
||||
criteria = ' name=\''+plugin_name+'\''
|
||||
criteria = ' name=\'' + plugin_name + '\''
|
||||
if version is not None:
|
||||
criteria += ' and version=\'' + str(version) + '\''
|
||||
result = self.db_service.select('plugin', 'name', criteria)
|
||||
|
@ -142,4 +179,3 @@ class PluginManager(object):
|
|||
|
||||
def printQueueSize(self):
|
||||
print("size " + str(len(self.pluginQueueDict)))
|
||||
|
||||
|
|
Loading…
Reference in a new issue