mirror of
https://github.com/Pardus-LiderAhenk/ahenk
synced 2024-11-22 15:32:19 +03:00
pid file isolated from event and events rw processes is running with thread safe. stop restart commands fixed. status and send commands added. and minor fixings...
This commit is contained in:
parent
ecfeadd424
commit
6fb7f89e87
3 changed files with 314 additions and 135 deletions
|
@ -3,17 +3,16 @@
|
|||
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
|
||||
# Author: Volkan Şahin <volkansah.in> <bm.volkansahin@gmail.com>
|
||||
|
||||
import configparser
|
||||
import os
|
||||
import queue
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import json
|
||||
|
||||
from base.system.system import System
|
||||
from base.Scope import Scope
|
||||
from base.command.commander import Commander
|
||||
from base.config.ConfigManager import ConfigManager
|
||||
from base.database.AhenkDbService import AhenkDbService
|
||||
from base.deamon.BaseDeamon import BaseDaemon
|
||||
|
@ -26,10 +25,10 @@ from base.messaging.Messaging import Messaging
|
|||
from base.plugin.plugin_manager_factory import PluginManagerFactory
|
||||
from base.registration.Registration import Registration
|
||||
from base.scheduler.scheduler_factory import SchedulerFactory
|
||||
from base.system.system import System
|
||||
from base.task.TaskManager import TaskManager
|
||||
|
||||
pidfilePath = '/var/run/ahenk.pid'
|
||||
configFilePath = '/etc/ahenk/ahenk.conf'
|
||||
ahenkdaemon = None
|
||||
|
||||
|
||||
class AhenkDeamon(BaseDaemon):
|
||||
|
@ -120,14 +119,15 @@ class AhenkDeamon(BaseDaemon):
|
|||
|
||||
def check_registration(self):
|
||||
# TODO get number of attemption
|
||||
max_attemp_number = 50
|
||||
max_attemp_number = int(System.Hardware.Network.interface_size()) * 3
|
||||
logger = Scope.getInstance().getLogger()
|
||||
try:
|
||||
while Scope.getInstance().getRegistration().is_registered() is False:
|
||||
max_attemp_number -= 1
|
||||
logger.debug('[AhenkDeamon] Ahenk is not registered. Attempting for registration')
|
||||
Scope.getInstance().getRegistration().registration_request()
|
||||
# TODO 'Could not reach Registration response from Lider. Be sure Lider is awake and it is connected to XMPP server!'
|
||||
|
||||
Scope.getInstance().getRegistration().registration_request()
|
||||
if max_attemp_number < 0:
|
||||
logger.warning('[AhenkDeamon] Number of Attempting for registration is over')
|
||||
self.registration_failed()
|
||||
|
@ -161,43 +161,44 @@ class AhenkDeamon(BaseDaemon):
|
|||
def run(self):
|
||||
print('Ahenk running...')
|
||||
|
||||
self.signal_number = 0
|
||||
globalscope = Scope()
|
||||
globalscope.setInstance(globalscope)
|
||||
|
||||
configfileFolderPath = '/etc/ahenk/config.d/'
|
||||
|
||||
# configuration manager must be first load
|
||||
self.init_config_manager(configFilePath, configfileFolderPath)
|
||||
self.init_config_manager(System.Ahenk.config_path(), configfileFolderPath)
|
||||
|
||||
# Logger must be second
|
||||
logger = self.init_logger()
|
||||
self.logger = self.init_logger()
|
||||
|
||||
self.init_event_manager()
|
||||
logger.info('[AhenkDeamon] Event Manager was set')
|
||||
self.logger.info('[AhenkDeamon] Event Manager was set')
|
||||
|
||||
self.init_ahenk_db()
|
||||
logger.info('[AhenkDeamon] DataBase Service was set')
|
||||
self.logger.info('[AhenkDeamon] DataBase Service was set')
|
||||
|
||||
self.init_messaging()
|
||||
logger.info('[AhenkDeamon] Message Manager was set')
|
||||
self.logger.info('[AhenkDeamon] Message Manager was set')
|
||||
|
||||
self.init_plugin_manager()
|
||||
logger.info('[AhenkDeamon] Plugin Manager was set')
|
||||
self.logger.info('[AhenkDeamon] Plugin Manager was set')
|
||||
|
||||
self.init_task_manager()
|
||||
logger.info('[AhenkDeamon] Task Manager was set')
|
||||
self.logger.info('[AhenkDeamon] Task Manager was set')
|
||||
|
||||
self.init_registration()
|
||||
logger.info('[AhenkDeamon] Registration was set')
|
||||
self.logger.info('[AhenkDeamon] Registration was set')
|
||||
|
||||
self.init_execution_manager()
|
||||
logger.info('[AhenkDeamon] Execution Manager was set')
|
||||
self.logger.info('[AhenkDeamon] Execution Manager was set')
|
||||
|
||||
self.check_registration()
|
||||
logger.info('[AhenkDeamon] Ahenk is registered')
|
||||
self.logger.info('[AhenkDeamon] Ahenk is registered')
|
||||
|
||||
messager = self.init_messager()
|
||||
logger.info('[AhenkDeamon] Messager was set')
|
||||
self.logger.info('[AhenkDeamon] Messager was set')
|
||||
|
||||
self.init_message_response_queue()
|
||||
|
||||
|
@ -205,153 +206,107 @@ class AhenkDeamon(BaseDaemon):
|
|||
# logger.debug('[AhenkDeamon] Attempting to registering ldap')
|
||||
# registration.ldap_registration_request() #TODO work on message
|
||||
|
||||
logger.info('[AhenkDeamon] LDAP registration of Ahenk is completed')
|
||||
self.logger.info('[AhenkDeamon] LDAP registration of Ahenk is completed')
|
||||
|
||||
# TODO###############
|
||||
cnfg = configparser.ConfigParser()
|
||||
cnfg.add_section('PID')
|
||||
cnfg.set('PID', 'pid_number', str(os.getpid()))
|
||||
|
||||
with open(pidfilePath, 'w') as config_file:
|
||||
cnfg.write(config_file)
|
||||
# TODO##############
|
||||
with open(System.Ahenk.pid_path(), 'w+') as config_file:
|
||||
config_file.write(str(os.getpid()))
|
||||
|
||||
try:
|
||||
signal.signal(signal.SIGALRM, self.signal_handler)
|
||||
logger.info('[AhenkDeamon] Signal handler is set up')
|
||||
signal.signal(signal.SIGALRM, self.run_command_from_fifo)
|
||||
self.logger.info('[AhenkDeamon] Signal handler is set up')
|
||||
except Exception as e:
|
||||
logger.error('[AhenkDeamon] Signal handler could not set up. Error Message: {} '.format(str(e)))
|
||||
self.logger.error('[AhenkDeamon] Signal handler could not set up. Error Message: {} '.format(str(e)))
|
||||
|
||||
messager.send_direct_message('test')
|
||||
|
||||
while True:
|
||||
if messager.is_connected() is False:
|
||||
logger.debug('reconnecting')
|
||||
self.logger.debug('reconnecting')
|
||||
Scope.getInstance().getLogger().warning('[AhenkDeamon] Connection is lost. Ahenk is trying for reconnection')
|
||||
messager = self.init_messager()
|
||||
time.sleep(1)
|
||||
|
||||
def signal_handler(self, num, stack):
|
||||
def run_command_from_fifo(self, num, stack):
|
||||
|
||||
# TODO######
|
||||
config = configparser.ConfigParser()
|
||||
config._interpolation = configparser.ExtendedInterpolation()
|
||||
config.read(pidfilePath)
|
||||
event = config.get('PID', 'event')
|
||||
# TODO######
|
||||
json_data = json.loads(Commander().get_event())
|
||||
|
||||
params = event.split()
|
||||
scope = Scope().getInstance()
|
||||
logger = scope.getLogger()
|
||||
plugin_manager = scope.getPluginManager()
|
||||
if json_data is not None:
|
||||
scope = Scope().getInstance()
|
||||
plugin_manager = scope.getPluginManager()
|
||||
|
||||
message_manager = scope.getMessageManager()
|
||||
messenger = scope.getMessager()
|
||||
message_manager = scope.getMessageManager()
|
||||
messenger = scope.getMessager()
|
||||
|
||||
logger.debug('[AhenkDeamon] Signal handled')
|
||||
self.logger.debug('[AhenkDeamon] Signal handled')
|
||||
self.logger.debug('[AhenkDeamon] Signal is :{}'.format(str(json_data['event'])))
|
||||
print('event:{}'.format(str(json_data['event'])))
|
||||
|
||||
if 'login' == str(params[0]):
|
||||
logger.debug('[AhenkDeamon] Signal is :{}'.format(str(params[0])))
|
||||
login_message = message_manager.login_msg(params[1])
|
||||
messenger.send_direct_message(login_message)
|
||||
get_policy_message = message_manager.policy_request_msg(params[1])
|
||||
messenger.send_direct_message(get_policy_message)
|
||||
logger.debug('[AhenkDeamon] login event is handled for user:' + params[1])
|
||||
elif 'logout' == str(params[0]):
|
||||
logger.debug('[AhenkDeamon] Signal is {}'.format(str(params[0])))
|
||||
message = message_manager.logout_msg(params[1])
|
||||
messenger.send_direct_message(message)
|
||||
plugin_manager.process_safe_mode(str(params[1]))
|
||||
logger.debug('[AhenkDeamon] logout event is handled for user:' + params[1])
|
||||
elif 'exit' == str(params[0]):
|
||||
logger.debug('[AhenkDeamon] Signal is {}'.format(str(params[0])))
|
||||
messenger.disconnect()
|
||||
# TODO kill thread
|
||||
subprocess.Popen('kill -9 ' + get_pid_number(), shell=True)
|
||||
print('stopping ahenk')
|
||||
if 'login' == str(json_data['event']):
|
||||
self.logger.info('[AhenkDeamon] login event is handled for user: {}'.format(json_data['username']))
|
||||
login_message = message_manager.login_msg(json_data['username'])
|
||||
messenger.send_direct_message(login_message)
|
||||
get_policy_message = message_manager.policy_request_msg(json_data['username'])
|
||||
messenger.send_direct_message(get_policy_message)
|
||||
elif 'logout' == str(json_data['event']):
|
||||
self.logger.info('[AhenkDeamon] logout event is handled for user: {}'.format(str(json_data['username'])))
|
||||
logout_message = message_manager.logout_msg(json_data['username'])
|
||||
messenger.send_direct_message(logout_message)
|
||||
plugin_manager.process_safe_mode(str(json_data['username']))
|
||||
elif 'send' == str(json_data['event']):
|
||||
self.logger.info('[AhenkDeamon] Sending message over ahenkd command. Response Message: {}'.format(str(json_data['message'])))
|
||||
message = str(json.dumps(json_data['message']))
|
||||
messenger.send_direct_message(message)
|
||||
else:
|
||||
self.logger.error('[AhenkDeamon] Unknown command error. Command:' + json_data['event'])
|
||||
|
||||
self.logger.debug('[AhenkDeamon] Processing of handled event is completed')
|
||||
return True
|
||||
else:
|
||||
logger.error('[AhenkDeamon] Unknown command error. Command:' + params[0])
|
||||
|
||||
logger.debug('[AhenkDeamon] Processing of handled event is completed')
|
||||
|
||||
|
||||
def get_pid_number():
|
||||
config = configparser.ConfigParser()
|
||||
config._interpolation = configparser.ExtendedInterpolation()
|
||||
config.read(pidfilePath)
|
||||
return config.get('PID', 'pid_number')
|
||||
|
||||
|
||||
def set_event(event_param):
|
||||
config = configparser.ConfigParser()
|
||||
config._interpolation = configparser.ExtendedInterpolation()
|
||||
config.read(pidfilePath)
|
||||
config.set('PID', 'event', event_param)
|
||||
|
||||
with open(pidfilePath, 'w') as conf_file:
|
||||
config.write(conf_file)
|
||||
|
||||
|
||||
def clean():
|
||||
try:
|
||||
config = configparser.ConfigParser()
|
||||
config._interpolation = configparser.ExtendedInterpolation()
|
||||
config.read(configFilePath)
|
||||
db_path = config.get('BASE', 'dbPath')
|
||||
|
||||
os.remove(db_path)
|
||||
|
||||
config.set('BASE', 'uid', '')
|
||||
config.set('BASE', 'password', '')
|
||||
|
||||
with open(configFilePath, 'w') as file:
|
||||
config.write(file)
|
||||
|
||||
file.close()
|
||||
|
||||
except Exception as e:
|
||||
print('Error while running clean command. Error Message {}'.format(str(e)))
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
ahenkdaemon = AhenkDeamon(pidfilePath)
|
||||
ahenkdaemon = AhenkDeamon(System.Ahenk.pid_path())
|
||||
try:
|
||||
if len(sys.argv) == 2:
|
||||
if len(sys.argv) == 2 and (sys.argv[1] == 'start' or sys.argv[1] == 'stop' or sys.argv[1] == 'restart' or sys.argv[1] == 'status'):
|
||||
if sys.argv[1] == 'start':
|
||||
print('starting')
|
||||
if System.Ahenk.is_running() is True:
|
||||
print('There is running Ahenk service. It will be killed.')
|
||||
print(str(System.Ahenk.get_pid_number()))
|
||||
System.Process.kill_by_pid(int(System.Ahenk.get_pid_number()))
|
||||
else:
|
||||
print('Ahenk starting...')
|
||||
ahenkdaemon.run()
|
||||
elif sys.argv[1] == 'stop':
|
||||
print('stopping')
|
||||
# TODO
|
||||
ahenkdaemon.stop()
|
||||
if System.Ahenk.is_running() is True:
|
||||
print('Ahenk stopping...')
|
||||
ahenkdaemon.stop()
|
||||
else:
|
||||
print('Ahenk not working!')
|
||||
elif sys.argv[1] == 'restart':
|
||||
# TODO
|
||||
print('restarting')
|
||||
ahenkdaemon.restart()
|
||||
if System.Ahenk.is_running() is True:
|
||||
print('Ahenk restarting...')
|
||||
ahenkdaemon.restart()
|
||||
else:
|
||||
print('Ahenk starting...')
|
||||
ahenkdaemon.run()
|
||||
elif sys.argv[1] == 'status':
|
||||
# TODO
|
||||
print('status')
|
||||
elif sys.argv[1] == 'clean':
|
||||
print('cleaning')
|
||||
clean()
|
||||
print(Commander().status())
|
||||
else:
|
||||
print('Unknown command. Usage : %s start|stop|restart|status' % sys.argv[0])
|
||||
print('Unknown command. Usage : %s start|stop|restart|status|clean' % sys.argv[0])
|
||||
sys.exit(2)
|
||||
|
||||
elif len(sys.argv) == 3:
|
||||
if sys.argv[1] == 'login' or sys.argv[1] == 'logout':
|
||||
print('event:' + str(sys.argv[1]))
|
||||
set_event(str(sys.argv[1]) + ' ' + sys.argv[2])
|
||||
os.kill(int(get_pid_number()), signal.SIGALRM)
|
||||
else:
|
||||
print('Unknown command. Usage : %s start|stop|restart|status' % sys.argv[0])
|
||||
sys.exit(2)
|
||||
sys.exit(0)
|
||||
else:
|
||||
print('Usage : %s start|stop|restart|status' % sys.argv[0])
|
||||
sys.exit(2)
|
||||
result = Commander().set_event(sys.argv)
|
||||
if result is None:
|
||||
print('Usage : {0} start|stop|restart|status|clean'.format(sys.argv[0]))
|
||||
sys.exit(2)
|
||||
elif result is True:
|
||||
if System.Ahenk.is_running() is True:
|
||||
os.kill(int(System.Ahenk.get_pid_number()), signal.SIGALRM)
|
||||
|
||||
|
||||
except(KeyboardInterrupt, SystemExit):
|
||||
if str(os.getpid()) == get_pid_number():
|
||||
set_event('exit true')
|
||||
os.kill(int(get_pid_number()), signal.SIGALRM)
|
||||
if System.Ahenk.is_running() is True:
|
||||
print('Ahenk will be closed.')
|
||||
ahenkdaemon.stop()
|
||||
|
|
189
opt/ahenk/base/command/commander.py
Normal file
189
opt/ahenk/base/command/commander.py
Normal file
|
@ -0,0 +1,189 @@
|
|||
import configparser
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import queue as Queue
|
||||
import threading
|
||||
|
||||
from base.command.fifo import Fifo
|
||||
from base.model.enum.ContentType import ContentType
|
||||
from base.model.enum.MessageCode import MessageCode
|
||||
from base.model.enum.MessageType import MessageType
|
||||
from base.system.system import System
|
||||
from base.util.util import Util
|
||||
|
||||
|
||||
class Commander(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def set_event(self, *args):
|
||||
params = args[0]
|
||||
data = {}
|
||||
|
||||
if System.Ahenk.is_running() is True:
|
||||
|
||||
if params[1] == 'clean':
|
||||
print('Ahenk stopping')
|
||||
System.Process.kill_by_pid(int(System.Ahenk.get_pid_number()))
|
||||
self.clean()
|
||||
return False
|
||||
|
||||
elif (params[1] == 'login' or params[1] == 'logout') and len(params) == 3:
|
||||
print('{1} {0}ing'.format(str(params[1]), str(params[2])))
|
||||
data['event'] = params[1]
|
||||
data['username'] = params[2]
|
||||
|
||||
elif params[1] == 'send' and len(params) > 5:
|
||||
data['event'] = params[1]
|
||||
response = {}
|
||||
response['timestamp'] = str(datetime.datetime.now().strftime("%d-%m-%Y %I:%M"))
|
||||
response['responseMessage'] = 'This content was sent via ahenk terminal command'
|
||||
|
||||
if params[2] == '-t':
|
||||
response['responseCode'] = MessageCode.TASK_PROCESSED.value
|
||||
response['type'] = MessageType.TASK_STATUS.value
|
||||
response['taskId'] = params[3]
|
||||
if params[4] == '-m':
|
||||
response['contentType'] = ContentType.TEXT_PLAIN.value
|
||||
response['responseData'] = params[5]
|
||||
elif params[4] == '-f':
|
||||
if os.path.exists(str(params[5])):
|
||||
response['contentType'] = self.get_relevant_type(str(params[5]))
|
||||
response['responseData'] = Util.read_file(str(params[5]), 'rb')
|
||||
else:
|
||||
print('Wrong or missing parameter. Usage: send -t <task_id> -m|-f <message_content>|<file_path>')
|
||||
return None
|
||||
|
||||
if len(params) > 6:
|
||||
if params[6] == '-e':
|
||||
response['responseCode'] = MessageCode.TASK_ERROR.value
|
||||
elif params[6] == '-w':
|
||||
response['responseCode'] = MessageCode.TASK_WARNING.value
|
||||
elif params[6] == '-s':
|
||||
response['responseCode'] = MessageCode.TASK_PROCESSED.value
|
||||
else:
|
||||
print('Wrong or missing parameter.(-e|-s|-w parameters are optional) Usage: send -t <task_id> -m|-f <message_content>|<file_path> -e|-s|-w')
|
||||
return None
|
||||
|
||||
elif params[2] == '-p' and len(params) > 7:
|
||||
response['responseCode'] = MessageCode.POLICY_PROCESSED.value
|
||||
response['type'] = MessageType.POLICY_STATUS.value
|
||||
response['policyVersion'] = params[3]
|
||||
|
||||
if params[4] == '-c':
|
||||
response['commandExecutionId'] = params[5]
|
||||
|
||||
if params[6] == '-m':
|
||||
response['contentType'] = ContentType.TEXT_PLAIN.value
|
||||
response['responseData'] = params[7]
|
||||
elif params[6] == '-f':
|
||||
if os.path.exists(str(params[7])):
|
||||
response['contentType'] = self.get_relevant_type(str(params[7]))
|
||||
response['responseData'] = Util.read_file(str(params[7]), 'rb')
|
||||
else:
|
||||
print('Wrong or missing parameter. Usage: send -p <policy_version> -c <command_execution_id> -m|-f <message_content>|<file_path>')
|
||||
return None
|
||||
|
||||
if len(params) > 8:
|
||||
if params[8] == '-e':
|
||||
response['responseCode'] = MessageCode.POLICY_ERROR.value
|
||||
elif params[8] == '-w':
|
||||
response['responseCode'] = MessageCode.POLICY_WARNING.value
|
||||
elif params[8] == '-s':
|
||||
response['responseCode'] = MessageCode.POLICY_PROCESSED.value
|
||||
else:
|
||||
print('Wrong or missing parameter.(-e|-s|-w parameters are optional) Usage: send -p <policy_version> -c <command_execution_id> -m|-f <message_content>|<file_path> -e|-s|-w')
|
||||
return None
|
||||
|
||||
else:
|
||||
print('Wrong or missing parameter. Usage: send -p <policy_version> -c <command_execution_id> -m|-f <message_content>|<file_path> -e|-s|-w')
|
||||
return None
|
||||
print('RESPONSE=' + str(response).replace("'", '"'))
|
||||
data['message'] = json.loads(str(response).replace("'", '"'))
|
||||
|
||||
else:
|
||||
print('Wrong or missing parameter. Usage : %s start|stop|restart|status|clean|send')
|
||||
return None
|
||||
|
||||
else:
|
||||
|
||||
if params[1] == 'clean':
|
||||
self.clean()
|
||||
|
||||
else:
|
||||
print('Ahenk not running!')
|
||||
return None
|
||||
|
||||
if len(data) > 0:
|
||||
fifo = Fifo()
|
||||
thread = threading.Thread(target=fifo.push(str(json.dumps(data)) + '\n'))
|
||||
thread.start()
|
||||
|
||||
return True
|
||||
|
||||
def get_relevant_type(self, extension):
|
||||
|
||||
extension = extension.lower()
|
||||
if extension == 'json':
|
||||
return ContentType.APPLICATION_JSON
|
||||
elif extension == 'txt':
|
||||
return ContentType.TEXT_PLAIN
|
||||
elif extension == 'dec':
|
||||
return ContentType.APPLICATION_MS_WORD
|
||||
elif extension == 'pdf':
|
||||
return ContentType.APPLICATION_PDF
|
||||
elif extension == 'xls':
|
||||
return ContentType.APPLICATION_VND_MS_EXCEL
|
||||
elif extension == 'jpeg' or extension == 'jpg':
|
||||
return ContentType.IMAGE_JPEG
|
||||
elif extension == 'png':
|
||||
return ContentType.IMAGE_PNG
|
||||
elif extension == 'html' or extension == 'htm':
|
||||
return ContentType.TEXT_HTML
|
||||
else:
|
||||
return ContentType.TEXT_PLAIN
|
||||
|
||||
def get_event(self):
|
||||
fifo = Fifo()
|
||||
queue = Queue.Queue()
|
||||
thread = threading.Thread(target=fifo.pull(queue))
|
||||
thread.start()
|
||||
thread.join()
|
||||
result = queue.get()
|
||||
if result is not None:
|
||||
return result
|
||||
else:
|
||||
return None
|
||||
|
||||
def clean(self):
|
||||
print('Ahenk cleaning..')
|
||||
try:
|
||||
config = configparser.ConfigParser()
|
||||
config._interpolation = configparser.ExtendedInterpolation()
|
||||
config.read(System.Ahenk.config_path())
|
||||
db_path = config.get('BASE', 'dbPath')
|
||||
|
||||
if os.path.exists(db_path):
|
||||
os.remove(db_path)
|
||||
|
||||
config.set('CONNECTION', 'uid', '')
|
||||
config.set('CONNECTION', 'password', '')
|
||||
|
||||
with open(System.Ahenk.config_path(), 'w') as file:
|
||||
config.write(file)
|
||||
file.close()
|
||||
print('Ahenk cleaned.')
|
||||
except Exception as e:
|
||||
print('Error while running clean command. Error Message {}'.format(str(e)))
|
||||
|
||||
def status(self):
|
||||
ahenk_state = False
|
||||
|
||||
if System.Ahenk.get_pid_number():
|
||||
ahenk_state = True
|
||||
return "Ahenk Active:{0}\nInstalled Plugins:{1}".format(ahenk_state, str(System.Ahenk.installed_plugins()))
|
||||
|
||||
def force_clean(self):
|
||||
# TODO
|
||||
pass
|
35
opt/ahenk/base/command/fifo.py
Normal file
35
opt/ahenk/base/command/fifo.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
import threading
|
||||
|
||||
|
||||
class Fifo(object):
|
||||
def __init__(self):
|
||||
self.lock = threading.Lock()
|
||||
self.path = '/tmp/liderahenk.fifo'
|
||||
|
||||
def push(self, content):
|
||||
file = None
|
||||
self.lock.acquire()
|
||||
try:
|
||||
file = open(self.path, 'a+')
|
||||
file.write(content)
|
||||
except Exception as e:
|
||||
print('Error:{}'.format(str(e)))
|
||||
finally:
|
||||
file.close()
|
||||
self.lock.release()
|
||||
|
||||
def pull(self, queue):
|
||||
result = None
|
||||
self.lock.acquire()
|
||||
try:
|
||||
lines = open(self.path, 'rb').readlines()
|
||||
if lines is not None and len(lines) > 0:
|
||||
result = lines[0].decode("unicode_escape")
|
||||
w_file = open(self.path, 'wb')
|
||||
w_file.writelines(lines[1:])
|
||||
w_file.close()
|
||||
except Exception as e:
|
||||
print('Error:{}'.format(str(e)))
|
||||
finally:
|
||||
self.lock.release()
|
||||
queue.put(result)
|
Loading…
Reference in a new issue