diff --git a/opt/ahenk/ahenkd.py b/opt/ahenk/ahenkd.py index 060c561..9de0b65 100755 --- a/opt/ahenk/ahenkd.py +++ b/opt/ahenk/ahenkd.py @@ -33,29 +33,37 @@ from base.timer.setup_timer import SetupTimer from base.timer.timer import Timer from base.util.util import Util -ahenkdaemon = None +ahenk_daemon = None -class AhenkDeamon(BaseDaemon): - """docstring for AhenkDeamon""" +class AhenkDaemon(BaseDaemon): + """Ahenk service base class which initializes services and maintains events/commands""" - def reload(self): + @staticmethod + def reload(): + """ docstring""" # reload service here pass - def init_logger(self): + @staticmethod + def init_logger(): + """ docstring""" logger = Logger() - logger.info('[AhenkDeamon] Log was set') + logger.info('[AhenkDaemon] Log was set') Scope.getInstance().setLogger(logger) return logger - def init_config_manager(self, configFilePath, configfileFolderPath): - configManager = ConfigManager(configFilePath, configfileFolderPath) - config = configManager.read() + @staticmethod + def init_config_manager(config_file_path, configfile_folder_path): + """ docstring""" + config_manager = ConfigManager(config_file_path, configfile_folder_path) + config = config_manager.read() Scope.getInstance().setConfigurationManager(config) return config - def init_scheduler(self): + @staticmethod + def init_scheduler(): + """ docstring""" scheduler_ins = SchedulerFactory.get_intstance() scheduler_ins.initialize() Scope.getInstance().set_scheduler(scheduler_ins) @@ -64,87 +72,113 @@ class AhenkDeamon(BaseDaemon): sc_thread.start() return scheduler_ins - def init_event_manager(self): - eventManager = EventManager() - Scope.getInstance().setEventManager(eventManager) - return eventManager + @staticmethod + def init_event_manager(): + """ docstring""" + event_manager = EventManager() + Scope.getInstance().setEventManager(event_manager) + return event_manager - def init_ahenk_db(self): + @staticmethod + def init_ahenk_db(): + """ docstring""" db_service = AhenkDbService() db_service.connect() db_service.initialize_table() Scope.getInstance().setDbService(db_service) return db_service - def init_messaging(self): - messageManager = Messaging() - Scope.getInstance().setMessageManager(messageManager) - return messageManager + @staticmethod + def init_messaging(): + """ docstring""" + message_manager = Messaging() + Scope.getInstance().setMessageManager(message_manager) + return message_manager - def init_plugin_manager(self): - pluginManager = PluginManagerFactory.get_instance() - Scope.getInstance().setPluginManager(pluginManager) + @staticmethod + def init_plugin_manager(): + """ docstring""" + plugin_manager = PluginManagerFactory.get_instance() + Scope.getInstance().setPluginManager(plugin_manager) # order changed, problem? - pluginManager.load_plugins() - return pluginManager + plugin_manager.load_plugins() + return plugin_manager - def init_task_manager(self): - taskManager = TaskManager() - Scope.getInstance().setTaskManager(taskManager) - return taskManager + @staticmethod + def init_task_manager(): + """ docstring""" + task_manager = TaskManager() + Scope.getInstance().setTaskManager(task_manager) + return task_manager - def init_registration(self): + @staticmethod + def init_registration(): + """ docstring""" registration = Registration() Scope.getInstance().setRegistration(registration) return registration - def init_execution_manager(self): + @staticmethod + def init_execution_manager(): + """ docstring""" execution_manager = ExecutionManager() Scope.getInstance().setExecutionManager(execution_manager) return execution_manager - def init_messenger(self): - messenger = Messenger() - messenger.connect_to_server() - Scope.getInstance().setMessenger(messenger) - return messenger + @staticmethod + def init_messenger(): + """ docstring""" + messenger_ = Messenger() + messenger_.connect_to_server() + Scope.getInstance().setMessenger(messenger_) + return messenger_ - def init_message_response_queue(self): - responseQueue = queue.Queue() - messageResponseQueue = MessageResponseQueue(responseQueue) - messageResponseQueue.setDaemon(True) - messageResponseQueue.start() - Scope.getInstance().setResponseQueue(responseQueue) - return responseQueue + @staticmethod + def init_message_response_queue(): + """ docstring""" + response_queue = queue.Queue() + message_response_queue = MessageResponseQueue(response_queue) + message_response_queue.setDaemon(True) + message_response_queue.start() + Scope.getInstance().setResponseQueue(response_queue) + return response_queue def check_registration(self): - max_attemp_number = int(System.Hardware.Network.interface_size()) * 3 - logger = Scope.getInstance().getLogger() + """ docstring""" + max_attempt_number = int(System.Hardware.Network.interface_size()) * 3 + # self.logger.debug() + # logger = Scope.getInstance().getLogger() registration = Scope.getInstance().getRegistration() try: while registration.is_registered() is False: - max_attemp_number -= 1 - logger.debug('[AhenkDeamon] Ahenk is not registered. Attempting for registration') + max_attempt_number -= 1 + self.logger.debug('[AhenkDaemon] Ahenk is not registered. Attempting for registration') registration.registration_request() - if max_attemp_number < 0: - logger.warning('[AhenkDeamon] Number of Attempting for registration is over') + if max_attempt_number < 0: + self.logger.warning('[AhenkDaemon] Number of Attempting for registration is over') self.registration_failed() break except Exception as e: - logger.error('[AhenkDeamon] Registration failed. Error message: {}'.format(str(e))) + self.logger.error('[AhenkDaemon] Registration failed. Error message: {0}'.format(str(e))) - def shutdown_mode(self): + @staticmethod + def shutdown_mode(): + """ docstring""" scope = Scope().getInstance() plugin_manager = scope.getPluginManager() plugin_manager.process_mode('shutdown') def registration_failed(self): - self.logger.error('[AhenkDeamon] Registration failed. All registration attemps were failed. Ahenk is stopping...') + """ docstring""" + self.logger.error( + '[AhenkDaemon] Registration failed. All registration attempts were failed. Ahenk is stopping...') print('Registration failed. Ahenk is stopping..') - ahenkdaemon.stop() + ahenk_daemon.stop() - def reload_plugins(self): + @staticmethod + def reload_plugins(): + """ docstring""" Scope.getInstance().getPluginManager().reloadPlugins() def reload_configuration(self): @@ -160,90 +194,95 @@ class AhenkDeamon(BaseDaemon): pass def update_plugin_manager(self): + """ docstring""" # TODO destroy plugin manager here self.init_plugin_manager() def init_signal_listener(self): + """ docstring""" try: signal.signal(signal.SIGALRM, self.run_command_from_fifo) - self.logger.info('[AhenkDeamon] Signal handler is set up') + self.logger.info('[AhenkDaemon] Signal handler is set up') except Exception as e: - self.logger.error('[AhenkDeamon] Signal handler could not set up. Error Message: {} '.format(str(e))) + self.logger.error('[AhenkDaemon] Signal handler could not set up. Error Message: {0} '.format(str(e))) - def init_pid_file(self): + @staticmethod + def init_pid_file(): + """ docstring""" with open(System.Ahenk.pid_path(), 'w+') as f: f.write(str(os.getpid())) def run(self): + """ docstring""" print('Ahenk running...') - globalscope = Scope() - globalscope.setInstance(globalscope) + global_scope = Scope() + global_scope.setInstance(global_scope) - configfileFolderPath = '/etc/ahenk/config.d/' + config_file_folder_path = '/etc/ahenk/config.d/' # configuration manager must be first load - self.init_config_manager(System.Ahenk.config_path(), configfileFolderPath) + self.init_config_manager(System.Ahenk.config_path(), config_file_folder_path) # Logger must be second self.logger = self.init_logger() self.init_pid_file() - self.logger.info('[AhenkDeamon] Pid file was created') + self.logger.info('[AhenkDaemon] Pid file was created') self.init_event_manager() - self.logger.info('[AhenkDeamon] Event Manager was set') + self.logger.info('[AhenkDaemon] Event Manager was set') self.init_ahenk_db() - self.logger.info('[AhenkDeamon] DataBase Service was set') + self.logger.info('[AhenkDaemon] DataBase Service was set') self.init_messaging() - self.logger.info('[AhenkDeamon] Message Manager was set') + self.logger.info('[AhenkDaemon] Message Manager was set') self.init_plugin_manager() - self.logger.info('[AhenkDeamon] Plugin Manager was set') + self.logger.info('[AhenkDaemon] Plugin Manager was set') self.init_scheduler() - self.logger.info('[AhenkDeamon] Scheduler was set') + self.logger.info('[AhenkDaemon] Scheduler was set') self.init_task_manager() - self.logger.info('[AhenkDeamon] Task Manager was set') + self.logger.info('[AhenkDaemon] Task Manager was set') self.init_registration() - self.logger.info('[AhenkDeamon] Registration was set') + self.logger.info('[AhenkDaemon] Registration was set') self.init_execution_manager() - self.logger.info('[AhenkDeamon] Execution Manager was set') + self.logger.info('[AhenkDaemon] Execution Manager was set') self.check_registration() - self.logger.info('[AhenkDeamon] Ahenk was registered') + self.logger.info('[AhenkDaemon] Ahenk was registered') self.messenger = self.init_messenger() - self.logger.info('[AhenkDeamon] Messenger was set') + self.logger.info('[AhenkDaemon] Messenger was set') self.init_signal_listener() - self.logger.info('[AhenkDeamon] Signals listeners was set') + self.logger.info('[AhenkDaemon] Signals listeners was set') Agreement().agreement_contract_update() self.init_message_response_queue() # if registration.is_ldap_registered() is False: - # logger.debug('[AhenkDeamon] Attempting to registering ldap') + # logger.debug('[AhenkDaemon] Attempting to registering ldap') # registration.ldap_registration_request() #TODO work on message - self.logger.info('[AhenkDeamon] LDAP registration of Ahenk is completed') + self.logger.info('[AhenkDaemon] LDAP registration of Ahenk is completed') self.messenger.send_direct_message('test') + + while True: - # if messager.is_connected() is False: - # self.logger.debug('reconnecting') - # Scope.getInstance().getLogger().warning('[AhenkDeamon] Connection is lost. Ahenk is trying for reconnection') - # messager = self.init_messager() time.sleep(1) - def running_plugin(self): + @staticmethod + def running_plugin(): + """ docstring""" scope = Scope().getInstance() plugin_manager = scope.getPluginManager() @@ -253,7 +292,7 @@ class AhenkDeamon(BaseDaemon): return True def run_command_from_fifo(self, num, stack): - + """ docstring""" scope = Scope().getInstance() plugin_manager = scope.getPluginManager() message_manager = scope.getMessageManager() @@ -262,164 +301,182 @@ class AhenkDeamon(BaseDaemon): db_service = scope.getDbService() execute_manager = scope.getExecutionManager() - event='' - try: - event=Commander().get_event() - json_data = json.loads(event) - except Exception as e: - self.logger.error('[AhenkDeamon] A problem occurred while loading json. Check json format! Error Message: {0}.Event={1} '.format(str(e),str(event))) - return + while True: + try: + event = Commander().get_event() + if event is None: + break + json_data = json.loads(event) + except Exception as e: + self.logger.error( + '[AhenkDaemon] A problem occurred while loading json. Check json format! Error Message: {0}.' + ' Event = {1}'.format(str(e), str(event))) + return - if json_data is not None: + if json_data is not None: - self.logger.debug('[AhenkDeamon] Signal handled') - self.logger.debug('[AhenkDeamon] Signal is :{}'.format(str(json_data['event']))) + self.logger.debug('[AhenkDaemon] Signal handled') + self.logger.debug('[AhenkDaemon] Signal is :{0}'.format(str(json_data['event']))) - if 'login' == str(json_data['event']): - username = json_data['username'] - display = json_data['display'] - desktop = json_data['desktop'] - self.logger.info('[AhenkDeamon] login event is handled for user: {}'.format(username)) - login_message = message_manager.login_msg(username) - messenger.send_direct_message(login_message) + if str(json_data['event']) == 'login': + username = json_data['username'] + display = json_data['display'] + desktop = json_data['desktop'] + self.logger.info('[AhenkDaemon] login event is handled for user: {0}'.format(username)) + login_message = message_manager.login_msg(username) + messenger.send_direct_message(login_message) - agreement = Agreement() - agreement_choice = None + agreement = Agreement() + agreement_choice = None - if agreement.check_agreement(username) is not True: - self.logger.debug('[AhenkDeamon] User {} has not accepted agreement.'.format(username)) - thread_ask = Process(target=agreement.ask, args=(username, display,)) - thread_ask.start() + if agreement.check_agreement(username) is not True: + self.logger.debug('[AhenkDaemon] User {0} has not accepted agreement.'.format(username)) + thread_ask = Process(target=agreement.ask, args=(username, display,)) + thread_ask.start() - agreement_timeout = conf_manager.get('SESSION', 'agreement_timeout') + agreement_timeout = conf_manager.get('SESSION', 'agreement_timeout') - timeout = int(agreement_timeout) # sec - timer = time.time() - while 1: - if thread_ask.is_alive() is False: - self.logger.warning('[AhenkDeamon] {} was answered the question '.format(username)) - if Agreement().check_agreement(username) is True: - self.logger.warning('[AhenkDeamon] Choice of {} is YES'.format(username)) - agreement_choice = True - break - elif Agreement().check_agreement(username) is False: - self.logger.warning('[AhenkDeamon] Choice of {} is NO'.format(username)) - agreement_choice = False - Util.execute('pkill -9 -u {}'.format(username)) + timeout = int(agreement_timeout) # sec + timer = time.time() + while 1: + if thread_ask.is_alive() is False: + self.logger.warning('[AhenkDaemon] {0} was answered the question '.format(username)) + if Agreement().check_agreement(username) is True: + self.logger.warning('[AhenkDaemon] Choice of {0} is YES'.format(username)) + agreement_choice = True + break + elif Agreement().check_agreement(username) is False: + self.logger.warning('[AhenkDaemon] Choice of {0} is NO'.format(username)) + agreement_choice = False + Util.close_session(username) + break + + if (time.time() - timer) > timeout: + if thread_ask.is_alive(): + thread_ask.terminate() + Util.close_session(username) + self.logger.warning( + '[AhenkDaemon] Session of {0} was ended because of timeout of contract agreement'.format( + username)) break + time.sleep(1) - if (time.time() - timer) > timeout: - if thread_ask.is_alive(): - thread_ask.terminate() - Util.execute('pkill -9 -u {}'.format(username)) - self.logger.warning('[AhenkDeamon] Session of {} was ended because of timeout of contract agreement'.format(username)) - break - time.sleep(1) + if agreement_choice is not None: + messenger.send_direct_message(message_manager.agreement_answer_msg(username, agreement_choice)) + else: + agreement_choice = True - if agreement_choice is not None: - messenger.send_direct_message(message_manager.agreement_answer_msg(username, agreement_choice)) - else: - agreement_choice = True + if agreement_choice is True: + db_service.delete('session', 'username=\'{0}\''.format(username)) - if agreement_choice is True: + self.logger.info( + '[AhenkDaemon] Display is {0}, desktop env is {1} for {2}'.format(display, desktop, username)) + + db_service.update('session', scope.getDbService().get_cols('session'), + [username, display, desktop, Util.timestamp()]) + get_policy_message = message_manager.policy_request_msg(username) + + plugin_manager.process_mode('safe', username) + plugin_manager.process_mode('login', username) + + kward = dict() + kward['timeout_args'] = username + kward['checker_args'] = username + + SetupTimer.start(Timer(timeout=System.Ahenk.get_policy_timeout(), + timeout_function=execute_manager.execute_default_policy, + checker_func=execute_manager.is_policy_executed, kwargs=kward)) + + self.logger.info( + '[AhenkDaemon] Requesting updated policies from Lider. If Ahenk could not reach updated ' + 'policies in {0} sec, booked policies will be executed'.format( + System.Ahenk.get_policy_timeout())) + messenger.send_direct_message(get_policy_message) + + elif str(json_data['event']) == 'logout': + username = json_data['username'] db_service.delete('session', 'username=\'{0}\''.format(username)) + execute_manager.remove_user_executed_policy_dict(username) + # TODO delete all user records while initializing + self.logger.info('[AhenkDaemon] logout event is handled for user: {0}'.format(username)) + logout_message = message_manager.logout_msg(username) + messenger.send_direct_message(logout_message) - self.logger.info('[AhenkDeamon] Display is {0}, desktop env is {1} for {2}'.format(display, desktop, username)) - - db_service.update('session', scope.getDbService().get_cols('session'), [username, display, desktop, Util.timestamp()]) - get_policy_message = message_manager.policy_request_msg(username) - + plugin_manager.process_mode('logout', username) plugin_manager.process_mode('safe', username) - plugin_manager.process_mode('login', username) - kward = {} - kward['timeout_args'] = username - kward['checker_args'] = username + elif str(json_data['event']) == 'send': + self.logger.info('[AhenkDaemon] Sending message over ahenkd command. Response Message: {0}'.format( + json.dumps(json_data['message']))) + message = json.dumps(json_data['message']) + messenger.send_direct_message(message) - SetupTimer.start(Timer(timeout=System.Ahenk.get_policy_timeout(), timeout_function=execute_manager.execute_default_policy, checker_func=execute_manager.is_policy_executed, kwargs=kward)) + elif str(json_data['event']) == 'load': + plugin_name = str(json_data['plugins']) - self.logger.info('[AhenkDeamon] Requesting updated policies from Lider. If Ahenk could not reach updated policies in {0} sec, booked policies will be executed'.format(System.Ahenk.get_policy_timeout())) - messenger.send_direct_message(get_policy_message) + if plugin_name == 'all': + self.logger.debug('[AhenkDaemon] All plugins are loading to ahenk') + plugin_manager.load_plugins() + else: + for p_name in plugin_name.split(','): + self.logger.debug('[AhenkDaemon] {0} plugin is loading to ahenk'.format(p_name)) + plugin_manager.load_single_plugin(p_name) - elif 'logout' == str(json_data['event']): - username = json_data['username'] - db_service.delete('session', 'username=\'{}\''.format(username)) - execute_manager.remove_user_executed_policy_dict(username) - # TODO delete all user records while initializing - self.logger.info('[AhenkDeamon] logout event is handled for user: {}'.format(username)) - logout_message = message_manager.logout_msg(username) - messenger.send_direct_message(logout_message) + elif str(json_data['event']) == 'reload': + plugin_name = str(json_data['plugins']) - plugin_manager.process_mode('logout', username) - plugin_manager.process_mode('safe', username) + if plugin_name == 'all': + self.logger.debug('[AhenkDaemon] All plugins are reloading to ahenk') + plugin_manager.reload_plugins() + else: + for p_name in plugin_name.split(','): + self.logger.debug('[AhenkDaemon] {0} plugin is reloading to ahenk'.format(p_name)) + plugin_manager.reload_single_plugin(p_name) - elif 'send' == str(json_data['event']): - self.logger.info('[AhenkDeamon] Sending message over ahenkd command. Response Message: {}'.format(json.dumps(json_data['message']))) - message = json.dumps(json_data['message']) - messenger.send_direct_message(message) + elif str(json_data['event']) == 'remove': + plugin_name = str(json_data['plugins']) - elif 'load' == str(json_data['event']): - plugin_name = str(json_data['plugins']) + if plugin_name == 'all': + self.logger.debug('[AhenkDaemon] All plugins are removing from ahenk') + plugin_manager.remove_plugins() + else: + for p_name in plugin_name.split(','): + self.logger.debug('[AhenkDaemon] {0} plugin is removing from ahenk'.format(p_name)) + plugin_manager.remove_single_plugin(p_name) - if plugin_name == 'all': - self.logger.debug('[AhenkDeamon] All plugins are loading to ahenk') - plugin_manager.load_plugins() + elif str(json_data['event']) == 'stop': + self.shutdown_mode() + self.logger.info('[AhenkDaemon] Shutdown mode activated.') + + # TODO timeout + while self.running_plugin() is False: + self.logger.debug('[AhenkDaemon] Waiting for progress of plugins...') + time.sleep(0.5) + + if Util.is_exist('/tmp/liderahenk.fifo'): + Util.delete_file('/tmp/liderahenk.fifo') + ahenk_daemon.stop() else: - for p_name in plugin_name.split(','): - self.logger.debug('[AhenkDeamon] {} plugin is loading to ahenk'.format(p_name)) - plugin_manager.load_single_plugin(p_name) - - elif 'reload' == str(json_data['event']): - plugin_name = str(json_data['plugins']) - - if plugin_name == 'all': - self.logger.debug('[AhenkDeamon] All plugins are reloading to ahenk') - plugin_manager.reload_plugins() - else: - for p_name in plugin_name.split(','): - self.logger.debug('[AhenkDeamon] {} plugin is reloading to ahenk'.format(p_name)) - plugin_manager.reload_single_plugin(p_name) - - elif 'remove' == str(json_data['event']): - plugin_name = str(json_data['plugins']) - - if plugin_name == 'all': - self.logger.debug('[AhenkDeamon] All plugins are removing from ahenk') - plugin_manager.remove_plugins() - else: - for p_name in plugin_name.split(','): - self.logger.debug('[AhenkDeamon] {} plugin is removing from ahenk'.format(p_name)) - plugin_manager.remove_single_plugin(p_name) - - elif 'stop' == str(json_data['event']): - self.shutdown_mode() - self.logger.info('[AhenkDeamon] Shutdown mode activated.') - - # TODO timeout - while self.running_plugin() is False: - self.logger.debug('[AhenkDeamon] Waiting for progress of plugins...') - time.sleep(0.5) - ahenkdaemon.stop() - else: - self.logger.error('[AhenkDeamon] Unknown command error. Command:' + json_data['event']) - self.logger.debug('[AhenkDeamon] Processing of handled event is completed') - return True - else: - return False + self.logger.error('[AhenkDaemon] Unknown command error. Command:' + json_data['event']) + self.logger.debug('[AhenkDaemon] Processing of handled event is completed') + # return True + # else: + # return False if __name__ == '__main__': - ahenkdaemon = AhenkDeamon(System.Ahenk.pid_path()) + ahenk_daemon = AhenkDaemon(System.Ahenk.pid_path()) try: if len(sys.argv) == 2 and (sys.argv[1] in ('start', 'stop', 'restart', 'status')): if sys.argv[1] == 'start': if System.Ahenk.is_running() is True: - print('There is already running Ahenk service. It will be killed.[{}]'.format(str(System.Ahenk.get_pid_number()))) + print('There is already running Ahenk service. It will be killed.[{0}]'.format( + str(System.Ahenk.get_pid_number()))) System.Process.kill_by_pid(int(System.Ahenk.get_pid_number())) else: print('Ahenk starting...') - ahenkdaemon.run() + ahenk_daemon.run() elif sys.argv[1] == 'stop': if System.Ahenk.is_running() is True: raise SystemExit @@ -428,10 +485,10 @@ if __name__ == '__main__': elif sys.argv[1] == 'restart': if System.Ahenk.is_running() is True: print('Ahenk restarting...') - ahenkdaemon.restart() + ahenk_daemon.restart() else: print('Ahenk starting...') - ahenkdaemon.run() + ahenk_daemon.run() elif sys.argv[1] == 'status': print(Commander().status()) else: @@ -453,4 +510,4 @@ if __name__ == '__main__': if System.Ahenk.is_running() is True: os.kill(int(System.Ahenk.get_pid_number()), signal.SIGALRM) else: - ahenkdaemon.stop() + ahenk_daemon.stop() diff --git a/opt/ahenk/base/config/ConfigManager.py b/opt/ahenk/base/config/ConfigManager.py index 9d26d12..0b0da25 100644 --- a/opt/ahenk/base/config/ConfigManager.py +++ b/opt/ahenk/base/config/ConfigManager.py @@ -16,26 +16,26 @@ class ConfigManager(object): is configuration files folder path """ - def __init__(self, configurationFilePath=None, configurationFolderPath=None): - self.configurationFilePath = configurationFilePath - self.configurationFolderPath = configurationFolderPath + def __init__(self, configuration_file_path=None, configuration_folder_path=None): + self.configurationFilePath = configuration_file_path + self.configurationFolderPath = configuration_folder_path def read(self): - configFiles = [] + config_files = [] # Check if given ahenk configuration file exists # If file exists add it to configFiles array. # TODO must write config file validater !! if self.configurationFilePath: if os.path.exists(self.configurationFilePath): - configFiles.append(self.configurationFilePath) + config_files.append(self.configurationFilePath) if self.configurationFolderPath and os.path.exists(self.configurationFolderPath): files = [f for f in listdir(self.configurationFolderPath) if isfile(join(self.configurationFolderPath, f))] for f in files: - configFiles.append(join(self.configurationFolderPath, f)) + config_files.append(join(self.configurationFolderPath, f)) parser = SafeConfigParser() - configValues = parser.read(configFiles) + configValues = parser.read(config_files) return parser diff --git a/opt/ahenk/base/execution/ExecutionManager.py b/opt/ahenk/base/execution/ExecutionManager.py index a488e8c..0d1fba1 100644 --- a/opt/ahenk/base/execution/ExecutionManager.py +++ b/opt/ahenk/base/execution/ExecutionManager.py @@ -9,8 +9,11 @@ from base.file.file_transfer_manager import FileTransferManager from base.model.PluginBean import PluginBean from base.model.PolicyBean import PolicyBean from base.model.ProfileBean import ProfileBean +from base.model.Response import Response from base.model.TaskBean import TaskBean +from base.model.enum.MessageCode import MessageCode from base.model.enum.MessageType import MessageType +from base.model.enum.ContentType import ContentType from base.system.system import System from base.util.util import Util @@ -81,6 +84,7 @@ class ExecutionManager(object): self.logger.error( '[ExecutionManager] Plugin package could not fetch. Error Message: {}.'.format(str(e))) self.logger.error('[ExecutionManager] Plugin Installation is cancelling') + self.plugin_installation_failure(plugin_name, plugin_version) return try: @@ -88,6 +92,7 @@ class ExecutionManager(object): self.logger.debug('[ExecutionManager] Plugin installed.') except Exception as e: self.logger.error('[ExecutionManager] Could not install plugin. Error Message: {}'.format(str(e))) + self.plugin_installation_failure(plugin_name, plugin_version) return try: @@ -103,6 +108,57 @@ class ExecutionManager(object): '[ExecutionManager] A problem occurred while installing new Ahenk plugin. Error Message:{}'.format( str(e))) + def plugin_installation_failure(self, plugin_name, plugin_version): + + self.logger.warning('[ExecutionManager] {0} plugin installation failure '.format(plugin_name)) + + if plugin_name in self.plugin_manager.delayed_profiles.keys(): + profile = self.plugin_manager.delayed_profiles[plugin_name] + self.logger.warning('[ExecutionManager] An error message sending with related profile properties...') + related_policy = self.db_service.select('policy', ['version', 'execution_id'], + 'id={0}'.format(profile.get_id())) + data = dict() + data['message'] = "Profil işletilirken eklenti bulunamadı " + "ve eksik olan eklenti kurulmaya çalışırken hata ile karşılaşıldı. " + "İlgili eklenti Ahenk'e yüklendiğinde, başarısız olan bu profil " + "(Başka bir politika tarafından ezilmedikçe) " + "çalıştırılacaktır" + " Sorunu çözmek için Lider yapılandırma dosyasındaki eklenti dağıtım " + "bilgilerinin doğruluğundan ve belirtilen dizinde geçerli eklenti paketinin " + "bulunduğundan emin olun." + response = Response(type=MessageType.POLICY_STATUS.value, id=profile.get_id(), + code=MessageCode.POLICY_ERROR.value, + message="Profil işletilirken eklenti bulunamadı " + "ve eksik olan eklenti kurulurken hata oluştu", + execution_id=related_policy[0][1], policy_version=related_policy[0][0], + data=json.dumps(data), content_type=ContentType.APPLICATION_JSON.value) + messenger = Scope.getInstance().getMessenger() + messenger.send_direct_message(self.message_manager.policy_status_msg(response)) + self.logger.warning( + '[ExecutionManager] Error message was sent about {0} plugin installation failure while trying to run a profile') + + if plugin_name in self.plugin_manager.delayed_tasks.keys(): + task = self.plugin_manager.delayed_tasks[plugin_name] + self.logger.warning('[ExecutionManager] An error message sending with related task properties...') + + data = dict() + data['message'] = "Görev işletilirken eklenti bulunamadı " + "ve eksik olan eklenti kurulmaya çalışırken hata ile karşılaşıldı. " + "İlgili eklenti Ahenk'e yüklendiğinde, başarısız olan bu görev " + "çalıştırılacaktır" + " Sorunu çözmek için Lider yapılandırma dosyasındaki eklenti dağıtım " + "bilgilerinin doğruluğundan ve belirtilen dizinde geçerli eklenti paketinin " + "bulunduğundan emin olun." + response = Response(type=MessageType.TASK_STATUS.value, id=task.get_id(), + code=MessageCode.TASK_ERROR.value, + message="Görev işletilirken eklenti bulunamadı " + "ve eksik olan eklenti kurulmaya çalışırken oluştu.", + data=json.dumps(data), content_type=ContentType.APPLICATION_JSON.value) + messenger = Scope.getInstance().getMessenger() + messenger.send_direct_message(self.message_manager.task_status_msg(response)) + self.logger.warning( + '[ExecutionManager] Error message was sent about {0} plugin installation failure while trying to run a task') + def is_policy_executed(self, username): if username in self.policy_executed: return self.policy_executed[username]