mirror of
https://github.com/Pardus-LiderAhenk/ahenk
synced 2024-11-22 15:32:19 +03:00
file sending available on core control for task responses which has data attr
This commit is contained in:
parent
ecc05da83b
commit
fc51797468
3 changed files with 39 additions and 17 deletions
|
@ -212,13 +212,11 @@ class ExecutionManager(object):
|
||||||
|
|
||||||
def execute_task(self, arg):
|
def execute_task(self, arg):
|
||||||
|
|
||||||
str_task = json.loads(arg)['task']
|
json_task = json.loads(arg)['task']
|
||||||
json_task = json.loads(str_task)
|
json_task = json.loads(json_task)
|
||||||
|
json_server_conf = json.dumps(json.loads(arg)['fileServerConf'])
|
||||||
file_server_conf = None
|
|
||||||
|
|
||||||
task = self.json_to_task_bean(json_task, file_server_conf)
|
|
||||||
|
|
||||||
|
task = self.json_to_task_bean(json_task, json_server_conf)
|
||||||
self.logger.debug('[ExecutionManager] Adding new task...Task is:{}'.format(task.get_command_cls_id()))
|
self.logger.debug('[ExecutionManager] Adding new task...Task is:{}'.format(task.get_command_cls_id()))
|
||||||
|
|
||||||
self.task_manager.addTask(task)
|
self.task_manager.addTask(task)
|
||||||
|
@ -227,7 +225,7 @@ class ExecutionManager(object):
|
||||||
def json_to_task_bean(self, json_data, file_server_conf=None):
|
def json_to_task_bean(self, json_data, file_server_conf=None):
|
||||||
plu = json_data['plugin']
|
plu = json_data['plugin']
|
||||||
plugin = PluginBean(p_id=plu['id'], active=plu['active'], create_date=plu['createDate'], deleted=plu['deleted'], description=plu['description'], machine_oriented=plu['machineOriented'], modify_date=plu['modifyDate'], name=plu['name'], policy_plugin=plu['policyPlugin'], user_oriented=plu['userOriented'], version=plu['version'], task_plugin=plu['taskPlugin'], x_based=plu['xBased'])
|
plugin = PluginBean(p_id=plu['id'], active=plu['active'], create_date=plu['createDate'], deleted=plu['deleted'], description=plu['description'], machine_oriented=plu['machineOriented'], modify_date=plu['modifyDate'], name=plu['name'], policy_plugin=plu['policyPlugin'], user_oriented=plu['userOriented'], version=plu['version'], task_plugin=plu['taskPlugin'], x_based=plu['xBased'])
|
||||||
return TaskBean(_id=json_data['id'], create_date=json_data['createDate'], modify_date=json_data['modifyDate'], command_cls_id=json_data['commandClsId'], parameter_map=json_data['parameterMap'], deleted=json_data['deleted'], plugin=plugin, cron_str=json_data['cronExpression'], file_server=json.dumps(file_server_conf))
|
return TaskBean(_id=json_data['id'], create_date=json_data['createDate'], modify_date=json_data['modifyDate'], command_cls_id=json_data['commandClsId'], parameter_map=json_data['parameterMap'], deleted=json_data['deleted'], plugin=plugin, cron_str=json_data['cronExpression'], file_server=str(file_server_conf))
|
||||||
|
|
||||||
def move_file(self, arg):
|
def move_file(self, arg):
|
||||||
default_file_path = System.Ahenk.received_dir_path()
|
default_file_path = System.Ahenk.received_dir_path()
|
||||||
|
|
|
@ -3,10 +3,14 @@
|
||||||
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
|
# Author: İsmail BAŞARAN <ismail.basaran@tubitak.gov.tr> <basaran.ismaill@gmail.com>
|
||||||
|
|
||||||
import threading
|
import threading
|
||||||
|
import json
|
||||||
from base.Scope import Scope
|
from base.Scope import Scope
|
||||||
from base.model.Response import Response
|
from base.model.Response import Response
|
||||||
|
from base.file.file_transfer_manager import FileTransferManager
|
||||||
from base.model.enum.MessageType import MessageType
|
from base.model.enum.MessageType import MessageType
|
||||||
|
from base.model.enum.MessageCode import MessageCode
|
||||||
|
from base.model.enum.ContentType import ContentType
|
||||||
|
from base.system.system import System
|
||||||
|
|
||||||
|
|
||||||
class Context(object):
|
class Context(object):
|
||||||
|
@ -72,11 +76,30 @@ class Plugin(threading.Thread):
|
||||||
|
|
||||||
self.logger.debug('[Plugin] Creating response')
|
self.logger.debug('[Plugin] Creating response')
|
||||||
|
|
||||||
if self.context.get('responseCode') is not None:
|
if self.context.data is not None and self.context.get('responseCode') is not None:
|
||||||
response = Response(type=MessageType.TASK_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType'))
|
response = Response(type=MessageType.TASK_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType'))
|
||||||
# self.response_queue.put(self.messaging.response_msg(response)) #TODO DEBUG
|
|
||||||
self.logger.debug('[Plugin] Sending response')
|
if response.get_data():
|
||||||
Scope.getInstance().getMessenger().send_direct_message(self.messaging.task_status_msg(response)) # TODO REMOVE
|
if response.get_content_type() not in (ContentType.TEXT_PLAIN.value, ContentType.APPLICATION_JSON.value):
|
||||||
|
|
||||||
|
file_manager = FileTransferManager(json.loads(item_obj.get_file_server())['protocol'], json.loads(item_obj.get_file_server())['parameterMap'])
|
||||||
|
file_manager.transporter.connect()
|
||||||
|
md5 = str(json.loads(response.get_data())['md5'])
|
||||||
|
success = file_manager.transporter.send_file(System.Ahenk.received_dir_path() + md5, md5)
|
||||||
|
file_manager.transporter.disconnect()
|
||||||
|
|
||||||
|
self.logger.debug('[Plugin] Sending response')
|
||||||
|
|
||||||
|
message = self.messaging.task_status_msg(response)
|
||||||
|
if success is False:
|
||||||
|
response = Response(type=MessageType.TASK_STATUS.value, id=item_obj.get_id(), code=MessageCode.TASK_ERROR.value, message='[Ahenk Core] Task processed successfully but file transfer not completed. Check defined server conf')
|
||||||
|
message = self.messaging.task_status_msg(response)
|
||||||
|
Scope.getInstance().getMessenger().send_direct_message(message)
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.logger.debug('[Plugin] Sending response')
|
||||||
|
Scope.getInstance().getMessenger().send_direct_message(self.messaging.task_status_msg(response))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self.logger.error('[Plugin] There is no Response. Plugin must create response after run a task!')
|
self.logger.error('[Plugin] There is no Response. Plugin must create response after run a task!')
|
||||||
|
|
||||||
|
@ -95,8 +118,9 @@ class Plugin(threading.Thread):
|
||||||
self.logger.debug('[Plugin] Handling profile')
|
self.logger.debug('[Plugin] Handling profile')
|
||||||
policy_module.handle_policy(profile_data, self.context)
|
policy_module.handle_policy(profile_data, self.context)
|
||||||
|
|
||||||
if self.context.get('responseCode') is not None:
|
if self.context.data is not None and self.context.get('responseCode') is not None:
|
||||||
self.logger.debug('[Plugin] Creating response')
|
self.logger.debug('[Plugin] Creating response')
|
||||||
|
|
||||||
response = Response(type=MessageType.POLICY_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType'), execution_id=execution_id, policy_version=policy_ver)
|
response = Response(type=MessageType.POLICY_STATUS.value, id=item_obj.get_id(), code=self.context.get('responseCode'), message=self.context.get('responseMessage'), data=self.context.get('responseData'), content_type=self.context.get('contentType'), execution_id=execution_id, policy_version=policy_ver)
|
||||||
# self.response_queue.put(self.messaging.response_msg(response)) #TODO DEBUG
|
# self.response_queue.put(self.messaging.response_msg(response)) #TODO DEBUG
|
||||||
self.logger.debug('[Plugin] Sending response')
|
self.logger.debug('[Plugin] Sending response')
|
||||||
|
|
|
@ -21,7 +21,7 @@ class TaskManager(object):
|
||||||
def addTask(self, task):
|
def addTask(self, task):
|
||||||
try:
|
try:
|
||||||
self.saveTask(task)
|
self.saveTask(task)
|
||||||
if task.get_cron_str() == None or task.get_cron_str() == '':
|
if task.get_cron_str() is None or task.get_cron_str() == '':
|
||||||
self.logger.debug('[TaskManager] Adding task ... ')
|
self.logger.debug('[TaskManager] Adding task ... ')
|
||||||
self.pluginManager.processTask(task)
|
self.pluginManager.processTask(task)
|
||||||
else:
|
else:
|
||||||
|
@ -38,11 +38,11 @@ class TaskManager(object):
|
||||||
|
|
||||||
def saveTask(self, task):
|
def saveTask(self, task):
|
||||||
try:
|
try:
|
||||||
task_cols = ['id', 'create_date', 'modify_date', 'command_cls_id', 'parameter_map', 'deleted', 'plugin','cron_expr']
|
task_cols = ['id', 'create_date', 'modify_date', 'command_cls_id', 'parameter_map', 'deleted', 'plugin', 'cron_expr', 'file_server']
|
||||||
plu_cols = ['active', 'create_date', 'deleted', 'description', 'machine_oriented', 'modify_date', 'name', 'policy_plugin', 'user_oriented', 'version','task_plugin','x_based']
|
plu_cols = ['active', 'create_date', 'deleted', 'description', 'machine_oriented', 'modify_date', 'name', 'policy_plugin', 'user_oriented', 'version', 'task_plugin', 'x_based']
|
||||||
plugin_args = [str(task.get_plugin().get_active()), str(task.get_plugin().get_create_date()), str(task.get_plugin().get_deleted()), str(task.get_plugin().get_description()), str(task.get_plugin().get_machine_oriented()), str(task.get_plugin().get_modify_date()), str(task.get_plugin().get_name()), str(task.get_plugin().get_policy_plugin()), str(task.get_plugin().get_user_oriented()), str(task.get_plugin().get_version()), str(task.get_plugin().get_task_plugin()), str(task.get_plugin().get_x_based())]
|
plugin_args = [str(task.get_plugin().get_active()), str(task.get_plugin().get_create_date()), str(task.get_plugin().get_deleted()), str(task.get_plugin().get_description()), str(task.get_plugin().get_machine_oriented()), str(task.get_plugin().get_modify_date()), str(task.get_plugin().get_name()), str(task.get_plugin().get_policy_plugin()), str(task.get_plugin().get_user_oriented()), str(task.get_plugin().get_version()), str(task.get_plugin().get_task_plugin()), str(task.get_plugin().get_x_based())]
|
||||||
plugin_id = self.db_service.update('plugin', plu_cols, plugin_args)
|
plugin_id = self.db_service.update('plugin', plu_cols, plugin_args)
|
||||||
values = [str(task.get_id()), str(task.get_create_date()), str(task.get_modify_date()), str(task.get_command_cls_id()), str(task.get_parameter_map()), str(task.get_deleted()), str(plugin_id),str(task.get_cron_str())]
|
values = [str(task.get_id()), str(task.get_create_date()), str(task.get_modify_date()), str(task.get_command_cls_id()), str(task.get_parameter_map()), str(task.get_deleted()), str(plugin_id), str(task.get_cron_str()), str(task.get_file_server())]
|
||||||
self.db_service.update('task', task_cols, values)
|
self.db_service.update('task', task_cols, values)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error("[TaskManager] Exception occurred while saving task. Error Message: {}".format(str(e)))
|
self.logger.error("[TaskManager] Exception occurred while saving task. Error Message: {}".format(str(e)))
|
||||||
|
|
Loading…
Reference in a new issue