From 6fb7f89e873178d1714489df5e9691b1c47ff437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Volkan=20=C5=9Eahin?= Date: Mon, 6 Jun 2016 12:27:46 +0300 Subject: [PATCH] pid file isolated from event and events rw processes is running with thread safe. stop restart commands fixed. status and send commands added. and minor fixings... --- opt/ahenk/ahenkd.py | 225 +++++++++++----------------- opt/ahenk/base/command/commander.py | 189 +++++++++++++++++++++++ opt/ahenk/base/command/fifo.py | 35 +++++ 3 files changed, 314 insertions(+), 135 deletions(-) create mode 100644 opt/ahenk/base/command/commander.py create mode 100644 opt/ahenk/base/command/fifo.py diff --git a/opt/ahenk/ahenkd.py b/opt/ahenk/ahenkd.py index 2aeb197..b01d319 100755 --- a/opt/ahenk/ahenkd.py +++ b/opt/ahenk/ahenkd.py @@ -3,17 +3,16 @@ # Author: İsmail BAŞARAN # Author: Volkan Şahin -import configparser import os import queue import signal -import subprocess import sys import threading import time +import json -from base.system.system import System from base.Scope import Scope +from base.command.commander import Commander from base.config.ConfigManager import ConfigManager from base.database.AhenkDbService import AhenkDbService from base.deamon.BaseDeamon import BaseDaemon @@ -26,10 +25,10 @@ from base.messaging.Messaging import Messaging from base.plugin.plugin_manager_factory import PluginManagerFactory from base.registration.Registration import Registration from base.scheduler.scheduler_factory import SchedulerFactory +from base.system.system import System from base.task.TaskManager import TaskManager -pidfilePath = '/var/run/ahenk.pid' -configFilePath = '/etc/ahenk/ahenk.conf' +ahenkdaemon = None class AhenkDeamon(BaseDaemon): @@ -120,14 +119,15 @@ class AhenkDeamon(BaseDaemon): def check_registration(self): # TODO get number of attemption - max_attemp_number = 50 + max_attemp_number = int(System.Hardware.Network.interface_size()) * 3 logger = Scope.getInstance().getLogger() try: while Scope.getInstance().getRegistration().is_registered() is False: max_attemp_number -= 1 logger.debug('[AhenkDeamon] Ahenk is not registered. Attempting for registration') - Scope.getInstance().getRegistration().registration_request() + # TODO 'Could not reach Registration response from Lider. Be sure Lider is awake and it is connected to XMPP server!' + Scope.getInstance().getRegistration().registration_request() if max_attemp_number < 0: logger.warning('[AhenkDeamon] Number of Attempting for registration is over') self.registration_failed() @@ -161,43 +161,44 @@ class AhenkDeamon(BaseDaemon): def run(self): print('Ahenk running...') + self.signal_number = 0 globalscope = Scope() globalscope.setInstance(globalscope) configfileFolderPath = '/etc/ahenk/config.d/' # configuration manager must be first load - self.init_config_manager(configFilePath, configfileFolderPath) + self.init_config_manager(System.Ahenk.config_path(), configfileFolderPath) # Logger must be second - logger = self.init_logger() + self.logger = self.init_logger() self.init_event_manager() - logger.info('[AhenkDeamon] Event Manager was set') + self.logger.info('[AhenkDeamon] Event Manager was set') self.init_ahenk_db() - logger.info('[AhenkDeamon] DataBase Service was set') + self.logger.info('[AhenkDeamon] DataBase Service was set') self.init_messaging() - logger.info('[AhenkDeamon] Message Manager was set') + self.logger.info('[AhenkDeamon] Message Manager was set') self.init_plugin_manager() - logger.info('[AhenkDeamon] Plugin Manager was set') + self.logger.info('[AhenkDeamon] Plugin Manager was set') self.init_task_manager() - logger.info('[AhenkDeamon] Task Manager was set') + self.logger.info('[AhenkDeamon] Task Manager was set') self.init_registration() - logger.info('[AhenkDeamon] Registration was set') + self.logger.info('[AhenkDeamon] Registration was set') self.init_execution_manager() - logger.info('[AhenkDeamon] Execution Manager was set') + self.logger.info('[AhenkDeamon] Execution Manager was set') self.check_registration() - logger.info('[AhenkDeamon] Ahenk is registered') + self.logger.info('[AhenkDeamon] Ahenk is registered') messager = self.init_messager() - logger.info('[AhenkDeamon] Messager was set') + self.logger.info('[AhenkDeamon] Messager was set') self.init_message_response_queue() @@ -205,153 +206,107 @@ class AhenkDeamon(BaseDaemon): # logger.debug('[AhenkDeamon] Attempting to registering ldap') # registration.ldap_registration_request() #TODO work on message - logger.info('[AhenkDeamon] LDAP registration of Ahenk is completed') + self.logger.info('[AhenkDeamon] LDAP registration of Ahenk is completed') - # TODO############### - cnfg = configparser.ConfigParser() - cnfg.add_section('PID') - cnfg.set('PID', 'pid_number', str(os.getpid())) - - with open(pidfilePath, 'w') as config_file: - cnfg.write(config_file) - # TODO############## + with open(System.Ahenk.pid_path(), 'w+') as config_file: + config_file.write(str(os.getpid())) try: - signal.signal(signal.SIGALRM, self.signal_handler) - logger.info('[AhenkDeamon] Signal handler is set up') + signal.signal(signal.SIGALRM, self.run_command_from_fifo) + self.logger.info('[AhenkDeamon] Signal handler is set up') except Exception as e: - logger.error('[AhenkDeamon] Signal handler could not set up. Error Message: {} '.format(str(e))) + self.logger.error('[AhenkDeamon] Signal handler could not set up. Error Message: {} '.format(str(e))) messager.send_direct_message('test') while True: if messager.is_connected() is False: - logger.debug('reconnecting') + self.logger.debug('reconnecting') Scope.getInstance().getLogger().warning('[AhenkDeamon] Connection is lost. Ahenk is trying for reconnection') messager = self.init_messager() time.sleep(1) - def signal_handler(self, num, stack): + def run_command_from_fifo(self, num, stack): - # TODO###### - config = configparser.ConfigParser() - config._interpolation = configparser.ExtendedInterpolation() - config.read(pidfilePath) - event = config.get('PID', 'event') - # TODO###### + json_data = json.loads(Commander().get_event()) - params = event.split() - scope = Scope().getInstance() - logger = scope.getLogger() - plugin_manager = scope.getPluginManager() + if json_data is not None: + scope = Scope().getInstance() + plugin_manager = scope.getPluginManager() - message_manager = scope.getMessageManager() - messenger = scope.getMessager() + message_manager = scope.getMessageManager() + messenger = scope.getMessager() - logger.debug('[AhenkDeamon] Signal handled') + self.logger.debug('[AhenkDeamon] Signal handled') + self.logger.debug('[AhenkDeamon] Signal is :{}'.format(str(json_data['event']))) + print('event:{}'.format(str(json_data['event']))) - if 'login' == str(params[0]): - logger.debug('[AhenkDeamon] Signal is :{}'.format(str(params[0]))) - login_message = message_manager.login_msg(params[1]) - messenger.send_direct_message(login_message) - get_policy_message = message_manager.policy_request_msg(params[1]) - messenger.send_direct_message(get_policy_message) - logger.debug('[AhenkDeamon] login event is handled for user:' + params[1]) - elif 'logout' == str(params[0]): - logger.debug('[AhenkDeamon] Signal is {}'.format(str(params[0]))) - message = message_manager.logout_msg(params[1]) - messenger.send_direct_message(message) - plugin_manager.process_safe_mode(str(params[1])) - logger.debug('[AhenkDeamon] logout event is handled for user:' + params[1]) - elif 'exit' == str(params[0]): - logger.debug('[AhenkDeamon] Signal is {}'.format(str(params[0]))) - messenger.disconnect() - # TODO kill thread - subprocess.Popen('kill -9 ' + get_pid_number(), shell=True) - print('stopping ahenk') + if 'login' == str(json_data['event']): + self.logger.info('[AhenkDeamon] login event is handled for user: {}'.format(json_data['username'])) + login_message = message_manager.login_msg(json_data['username']) + messenger.send_direct_message(login_message) + get_policy_message = message_manager.policy_request_msg(json_data['username']) + messenger.send_direct_message(get_policy_message) + elif 'logout' == str(json_data['event']): + self.logger.info('[AhenkDeamon] logout event is handled for user: {}'.format(str(json_data['username']))) + logout_message = message_manager.logout_msg(json_data['username']) + messenger.send_direct_message(logout_message) + plugin_manager.process_safe_mode(str(json_data['username'])) + elif 'send' == str(json_data['event']): + self.logger.info('[AhenkDeamon] Sending message over ahenkd command. Response Message: {}'.format(str(json_data['message']))) + message = str(json.dumps(json_data['message'])) + messenger.send_direct_message(message) + else: + self.logger.error('[AhenkDeamon] Unknown command error. Command:' + json_data['event']) + + self.logger.debug('[AhenkDeamon] Processing of handled event is completed') + return True else: - logger.error('[AhenkDeamon] Unknown command error. Command:' + params[0]) - - logger.debug('[AhenkDeamon] Processing of handled event is completed') - - -def get_pid_number(): - config = configparser.ConfigParser() - config._interpolation = configparser.ExtendedInterpolation() - config.read(pidfilePath) - return config.get('PID', 'pid_number') - - -def set_event(event_param): - config = configparser.ConfigParser() - config._interpolation = configparser.ExtendedInterpolation() - config.read(pidfilePath) - config.set('PID', 'event', event_param) - - with open(pidfilePath, 'w') as conf_file: - config.write(conf_file) - - -def clean(): - try: - config = configparser.ConfigParser() - config._interpolation = configparser.ExtendedInterpolation() - config.read(configFilePath) - db_path = config.get('BASE', 'dbPath') - - os.remove(db_path) - - config.set('BASE', 'uid', '') - config.set('BASE', 'password', '') - - with open(configFilePath, 'w') as file: - config.write(file) - - file.close() - - except Exception as e: - print('Error while running clean command. Error Message {}'.format(str(e))) + return False if __name__ == '__main__': - ahenkdaemon = AhenkDeamon(pidfilePath) + ahenkdaemon = AhenkDeamon(System.Ahenk.pid_path()) try: - if len(sys.argv) == 2: + if len(sys.argv) == 2 and (sys.argv[1] == 'start' or sys.argv[1] == 'stop' or sys.argv[1] == 'restart' or sys.argv[1] == 'status'): if sys.argv[1] == 'start': - print('starting') + if System.Ahenk.is_running() is True: + print('There is running Ahenk service. It will be killed.') + print(str(System.Ahenk.get_pid_number())) + System.Process.kill_by_pid(int(System.Ahenk.get_pid_number())) + else: + print('Ahenk starting...') ahenkdaemon.run() elif sys.argv[1] == 'stop': - print('stopping') - # TODO - ahenkdaemon.stop() + if System.Ahenk.is_running() is True: + print('Ahenk stopping...') + ahenkdaemon.stop() + else: + print('Ahenk not working!') elif sys.argv[1] == 'restart': - # TODO - print('restarting') - ahenkdaemon.restart() + if System.Ahenk.is_running() is True: + print('Ahenk restarting...') + ahenkdaemon.restart() + else: + print('Ahenk starting...') + ahenkdaemon.run() elif sys.argv[1] == 'status': - # TODO - print('status') - elif sys.argv[1] == 'clean': - print('cleaning') - clean() + print(Commander().status()) else: - print('Unknown command. Usage : %s start|stop|restart|status' % sys.argv[0]) + print('Unknown command. Usage : %s start|stop|restart|status|clean' % sys.argv[0]) sys.exit(2) - - elif len(sys.argv) == 3: - if sys.argv[1] == 'login' or sys.argv[1] == 'logout': - print('event:' + str(sys.argv[1])) - set_event(str(sys.argv[1]) + ' ' + sys.argv[2]) - os.kill(int(get_pid_number()), signal.SIGALRM) - else: - print('Unknown command. Usage : %s start|stop|restart|status' % sys.argv[0]) - sys.exit(2) - sys.exit(0) else: - print('Usage : %s start|stop|restart|status' % sys.argv[0]) - sys.exit(2) + result = Commander().set_event(sys.argv) + if result is None: + print('Usage : {0} start|stop|restart|status|clean'.format(sys.argv[0])) + sys.exit(2) + elif result is True: + if System.Ahenk.is_running() is True: + os.kill(int(System.Ahenk.get_pid_number()), signal.SIGALRM) + + except(KeyboardInterrupt, SystemExit): - if str(os.getpid()) == get_pid_number(): - set_event('exit true') - os.kill(int(get_pid_number()), signal.SIGALRM) + if System.Ahenk.is_running() is True: + print('Ahenk will be closed.') + ahenkdaemon.stop() diff --git a/opt/ahenk/base/command/commander.py b/opt/ahenk/base/command/commander.py new file mode 100644 index 0000000..3904f4b --- /dev/null +++ b/opt/ahenk/base/command/commander.py @@ -0,0 +1,189 @@ +import configparser +import datetime +import json +import os +import queue as Queue +import threading + +from base.command.fifo import Fifo +from base.model.enum.ContentType import ContentType +from base.model.enum.MessageCode import MessageCode +from base.model.enum.MessageType import MessageType +from base.system.system import System +from base.util.util import Util + + +class Commander(object): + def __init__(self): + pass + + def set_event(self, *args): + params = args[0] + data = {} + + if System.Ahenk.is_running() is True: + + if params[1] == 'clean': + print('Ahenk stopping') + System.Process.kill_by_pid(int(System.Ahenk.get_pid_number())) + self.clean() + return False + + elif (params[1] == 'login' or params[1] == 'logout') and len(params) == 3: + print('{1} {0}ing'.format(str(params[1]), str(params[2]))) + data['event'] = params[1] + data['username'] = params[2] + + elif params[1] == 'send' and len(params) > 5: + data['event'] = params[1] + response = {} + response['timestamp'] = str(datetime.datetime.now().strftime("%d-%m-%Y %I:%M")) + response['responseMessage'] = 'This content was sent via ahenk terminal command' + + if params[2] == '-t': + response['responseCode'] = MessageCode.TASK_PROCESSED.value + response['type'] = MessageType.TASK_STATUS.value + response['taskId'] = params[3] + if params[4] == '-m': + response['contentType'] = ContentType.TEXT_PLAIN.value + response['responseData'] = params[5] + elif params[4] == '-f': + if os.path.exists(str(params[5])): + response['contentType'] = self.get_relevant_type(str(params[5])) + response['responseData'] = Util.read_file(str(params[5]), 'rb') + else: + print('Wrong or missing parameter. Usage: send -t -m|-f |') + return None + + if len(params) > 6: + if params[6] == '-e': + response['responseCode'] = MessageCode.TASK_ERROR.value + elif params[6] == '-w': + response['responseCode'] = MessageCode.TASK_WARNING.value + elif params[6] == '-s': + response['responseCode'] = MessageCode.TASK_PROCESSED.value + else: + print('Wrong or missing parameter.(-e|-s|-w parameters are optional) Usage: send -t -m|-f | -e|-s|-w') + return None + + elif params[2] == '-p' and len(params) > 7: + response['responseCode'] = MessageCode.POLICY_PROCESSED.value + response['type'] = MessageType.POLICY_STATUS.value + response['policyVersion'] = params[3] + + if params[4] == '-c': + response['commandExecutionId'] = params[5] + + if params[6] == '-m': + response['contentType'] = ContentType.TEXT_PLAIN.value + response['responseData'] = params[7] + elif params[6] == '-f': + if os.path.exists(str(params[7])): + response['contentType'] = self.get_relevant_type(str(params[7])) + response['responseData'] = Util.read_file(str(params[7]), 'rb') + else: + print('Wrong or missing parameter. Usage: send -p -c -m|-f |') + return None + + if len(params) > 8: + if params[8] == '-e': + response['responseCode'] = MessageCode.POLICY_ERROR.value + elif params[8] == '-w': + response['responseCode'] = MessageCode.POLICY_WARNING.value + elif params[8] == '-s': + response['responseCode'] = MessageCode.POLICY_PROCESSED.value + else: + print('Wrong or missing parameter.(-e|-s|-w parameters are optional) Usage: send -p -c -m|-f | -e|-s|-w') + return None + + else: + print('Wrong or missing parameter. Usage: send -p -c -m|-f | -e|-s|-w') + return None + print('RESPONSE=' + str(response).replace("'", '"')) + data['message'] = json.loads(str(response).replace("'", '"')) + + else: + print('Wrong or missing parameter. Usage : %s start|stop|restart|status|clean|send') + return None + + else: + + if params[1] == 'clean': + self.clean() + + else: + print('Ahenk not running!') + return None + + if len(data) > 0: + fifo = Fifo() + thread = threading.Thread(target=fifo.push(str(json.dumps(data)) + '\n')) + thread.start() + + return True + + def get_relevant_type(self, extension): + + extension = extension.lower() + if extension == 'json': + return ContentType.APPLICATION_JSON + elif extension == 'txt': + return ContentType.TEXT_PLAIN + elif extension == 'dec': + return ContentType.APPLICATION_MS_WORD + elif extension == 'pdf': + return ContentType.APPLICATION_PDF + elif extension == 'xls': + return ContentType.APPLICATION_VND_MS_EXCEL + elif extension == 'jpeg' or extension == 'jpg': + return ContentType.IMAGE_JPEG + elif extension == 'png': + return ContentType.IMAGE_PNG + elif extension == 'html' or extension == 'htm': + return ContentType.TEXT_HTML + else: + return ContentType.TEXT_PLAIN + + def get_event(self): + fifo = Fifo() + queue = Queue.Queue() + thread = threading.Thread(target=fifo.pull(queue)) + thread.start() + thread.join() + result = queue.get() + if result is not None: + return result + else: + return None + + def clean(self): + print('Ahenk cleaning..') + try: + config = configparser.ConfigParser() + config._interpolation = configparser.ExtendedInterpolation() + config.read(System.Ahenk.config_path()) + db_path = config.get('BASE', 'dbPath') + + if os.path.exists(db_path): + os.remove(db_path) + + config.set('CONNECTION', 'uid', '') + config.set('CONNECTION', 'password', '') + + with open(System.Ahenk.config_path(), 'w') as file: + config.write(file) + file.close() + print('Ahenk cleaned.') + except Exception as e: + print('Error while running clean command. Error Message {}'.format(str(e))) + + def status(self): + ahenk_state = False + + if System.Ahenk.get_pid_number(): + ahenk_state = True + return "Ahenk Active:{0}\nInstalled Plugins:{1}".format(ahenk_state, str(System.Ahenk.installed_plugins())) + + def force_clean(self): + # TODO + pass diff --git a/opt/ahenk/base/command/fifo.py b/opt/ahenk/base/command/fifo.py new file mode 100644 index 0000000..f1ba82c --- /dev/null +++ b/opt/ahenk/base/command/fifo.py @@ -0,0 +1,35 @@ +import threading + + +class Fifo(object): + def __init__(self): + self.lock = threading.Lock() + self.path = '/tmp/liderahenk.fifo' + + def push(self, content): + file = None + self.lock.acquire() + try: + file = open(self.path, 'a+') + file.write(content) + except Exception as e: + print('Error:{}'.format(str(e))) + finally: + file.close() + self.lock.release() + + def pull(self, queue): + result = None + self.lock.acquire() + try: + lines = open(self.path, 'rb').readlines() + if lines is not None and len(lines) > 0: + result = lines[0].decode("unicode_escape") + w_file = open(self.path, 'wb') + w_file.writelines(lines[1:]) + w_file.close() + except Exception as e: + print('Error:{}'.format(str(e))) + finally: + self.lock.release() + queue.put(result)