Ahenk/opt/ahenk/base/plugin/Plugin.py

162 lines
7.9 KiB
Python
Raw Normal View History

2016-02-29 10:48:00 +02:00
#!/usr/bin/python3
2016-02-18 16:38:30 +02:00
# -*- coding: utf-8 -*-
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
2016-05-26 11:19:34 +03:00
2016-04-12 17:30:53 +03:00
import threading
import json
from base.Scope import Scope
2016-04-12 17:30:53 +03:00
from base.model.Response import Response
from base.file.file_transfer_manager import FileTransferManager
from base.model.enum.MessageType import MessageType
from base.model.enum.MessageCode import MessageCode
from base.model.enum.ContentType import ContentType
from base.system.system import System
2016-03-30 18:51:50 +03:00
2016-02-18 16:38:30 +02:00
class Context(object):
def __init__(self):
self.data = {}
2016-04-25 17:12:45 +03:00
self.scope = Scope().getInstance()
2016-03-30 18:51:50 +03:00
def put(self, var_name, data):
self.data[var_name] = data
2016-03-30 18:51:50 +03:00
def get(self, var_name):
return self.data[var_name]
def empty_data(self):
self.data = {}
def create_response(self, code, message=None, data=None, content_type=None):
2016-05-09 17:50:12 +03:00
self.data['responseCode'] = code
self.data['responseMessage'] = message
self.data['responseData'] = data
self.data['contentType'] = content_type
2016-03-30 18:51:50 +03:00
class Plugin(threading.Thread):
"""
This is a thread inherit class and have a queue.
Plugin class responsible for processing TASK or USER PLUGIN PROFILE.
"""
def __init__(self, name, InQueue):
2016-03-23 10:04:31 +02:00
threading.Thread.__init__(self)
2016-02-18 16:38:30 +02:00
self.name = name
self.InQueue = InQueue
2016-03-30 18:51:50 +03:00
2016-03-23 10:04:31 +02:00
scope = Scope.getInstance()
self.logger = scope.getLogger()
2016-03-30 18:51:50 +03:00
self.response_queue = scope.getResponseQueue()
self.messaging = scope.getMessageManager()
2016-04-12 17:30:53 +03:00
self.db_service = scope.getDbService()
# self.messager = None
2016-03-30 18:51:50 +03:00
self.keep_run = True
self.context = Context()
2016-03-23 10:04:31 +02:00
def run(self):
2016-05-09 14:13:41 +03:00
while self.keep_run:
2016-03-23 10:04:31 +02:00
try:
try:
item_obj = self.InQueue.get(block=True)
obj_name = item_obj.obj_name
except Exception as e:
2016-05-26 11:19:34 +03:00
self.logger.error('[Plugin] A problem occurred while executing process. Error Message: {}'.format(str(e)))
2016-03-29 11:52:18 +03:00
if obj_name == "TASK":
self.logger.debug('[Plugin] Executing task')
command = Scope.getInstance().getPluginManager().findCommand(self.getName(), item_obj.get_command_cls_id().lower())
self.context.put('task_id', item_obj.get_command_cls_id().lower())
task_data = item_obj.get_parameter_map()
2016-05-26 11:19:34 +03:00
self.logger.debug('[Plugin] Handling task')
command.handle_task(task_data, self.context)
2016-05-26 11:19:34 +03:00
self.logger.debug('[Plugin] Creating response')
if self.context.data is not None and self.context.get('responseCode') is not None:
response = Response(type=MessageType.TASK_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType'))
if response.get_data():
if response.get_content_type() not in (ContentType.TEXT_PLAIN.value, ContentType.APPLICATION_JSON.value):
file_manager = FileTransferManager(json.loads(item_obj.get_file_server())['protocol'], json.loads(item_obj.get_file_server())['parameterMap'])
file_manager.transporter.connect()
md5 = str(json.loads(response.get_data())['md5'])
success = file_manager.transporter.send_file(System.Ahenk.received_dir_path() + md5, md5)
file_manager.transporter.disconnect()
self.logger.debug('[Plugin] Sending response')
message = self.messaging.task_status_msg(response)
if success is False:
response = Response(type=MessageType.TASK_STATUS.value, id=item_obj.get_id(), code=MessageCode.TASK_ERROR.value, message='[Ahenk Core] Task processed successfully but file transfer not completed. Check defined server conf')
message = self.messaging.task_status_msg(response)
Scope.getInstance().getMessenger().send_direct_message(message)
else:
self.logger.debug('[Plugin] Sending response')
Scope.getInstance().getMessenger().send_direct_message(self.messaging.task_status_msg(response))
else:
self.logger.error('[Plugin] There is no Response. Plugin must create response after run a task!')
2016-03-29 11:52:18 +03:00
elif obj_name == "PROFILE":
self.logger.debug('[Plugin] Executing profile')
2016-04-06 14:28:29 +03:00
profile_data = item_obj.get_profile_data()
2016-05-09 17:50:12 +03:00
policy_module = Scope.getInstance().getPluginManager().findPolicyModule(item_obj.get_plugin().get_name())
self.context.put('username', item_obj.get_username())
2016-05-09 17:50:12 +03:00
execution_id = self.get_execution_id(item_obj.get_id())
policy_ver = self.get_policy_version(item_obj.get_id())
self.context.put('policy_version', policy_ver)
self.context.put('execution_id', execution_id)
self.logger.debug('[Plugin] Handling profile')
policy_module.handle_policy(profile_data, self.context)
if self.context.data is not None and self.context.get('responseCode') is not None:
self.logger.debug('[Plugin] Creating response')
response = Response(type=MessageType.POLICY_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType'), execution_id=execution_id, policy_version=policy_ver)
# self.response_queue.put(self.messaging.response_msg(response)) #TODO DEBUG
self.logger.debug('[Plugin] Sending response')
Scope.getInstance().getMessenger().send_direct_message(self.messaging.policy_status_msg(response)) # TODO REMOVE
else:
self.logger.error('[Plugin] There is no Response. Plugin must create response after run a policy!')
elif obj_name == "KILL_SIGNAL":
self.keep_run = False
2016-05-26 11:19:34 +03:00
self.logger.debug('[Plugin] Killing queue ! Plugin Name: {}'.format(str(self.name)))
2016-04-20 16:30:29 +03:00
elif obj_name == "SAFE_MODE":
2016-04-25 17:12:45 +03:00
username = item_obj.username
2016-04-20 16:30:29 +03:00
safe_mode_module = Scope.getInstance().getPluginManager().find_safe_mode_module(self.name)
2016-04-25 17:12:45 +03:00
safe_mode_module.handle_safe_mode(username, self.context)
2016-03-29 11:52:18 +03:00
else:
2016-05-26 11:19:34 +03:00
self.logger.warning("[Plugin] Not supported object type: {}".format(str(obj_name)))
2016-05-09 17:50:12 +03:00
# Empty context for next use
2016-05-09 17:50:12 +03:00
self.context.empty_data()
2016-03-23 10:04:31 +02:00
except Exception as e:
2016-04-25 17:12:45 +03:00
self.logger.error("[Plugin] Plugin running exception. Exception Message: {} ".format(str(e)))
2016-02-18 16:38:30 +02:00
2016-04-12 17:30:53 +03:00
def get_execution_id(self, profile_id):
try:
return self.db_service.select_one_result('policy', 'execution_id', ' id={}'.format(profile_id))
except Exception as e:
2016-04-25 17:12:45 +03:00
self.logger.error("[Plugin] A problem occurred while getting execution id. Exception Message: {} ".format(str(e)))
2016-04-12 17:30:53 +03:00
return None
def get_policy_version(self, profile_id):
try:
return self.db_service.select_one_result('policy', 'version', ' id={}'.format(profile_id))
except Exception as e:
2016-04-25 17:12:45 +03:00
self.logger.error("[Plugin] A problem occurred while getting policy version . Exception Message: {} ".format(str(e)))
2016-04-12 17:30:53 +03:00
return None
2016-02-18 16:38:30 +02:00
def getName(self):
return self.name