This commit is contained in:
emrekgn 2016-10-25 14:25:11 +03:00
commit 4a2fc67eac
8 changed files with 148 additions and 79 deletions

View file

@ -28,13 +28,29 @@ class AhenkDbService(object):
def initialize_table(self):
self.check_and_create_table('task', ['id INTEGER', 'create_date TEXT', 'modify_date TEXT', 'command_cls_id TEXT', 'parameter_map BLOB', 'deleted INTEGER', 'plugin TEXT', 'cron_expr TEXT','file_server TEXT'])
self.check_and_create_table('policy', ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'type TEXT', 'version TEXT', 'name TEXT', 'execution_id TEXT'])
self.check_and_create_table('profile', ['id INTEGER', 'create_date TEXT', 'label TEXT', 'description TEXT', 'overridable INTEGER', 'active TEXT', 'deleted TEXT', 'profile_data TEXT', 'modify_date TEXT', 'plugin TEXT'])
self.check_and_create_table('plugin', ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'active TEXT', 'create_date TEXT', 'deleted TEXT', 'description TEXT', 'machine_oriented TEXT', 'modify_date TEXT', 'name TEXT', 'policy_plugin TEXT', 'user_oriented TEXT', 'version TEXT', 'task_plugin TEXT', 'x_based TEXT'])
self.check_and_create_table('registration', ['jid TEXT', 'password TEXT', 'registered INTEGER', 'dn TEXT', 'params TEXT', 'timestamp TEXT'])
self.check_and_create_table('contract', ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'content BLOB', 'title TEXT', 'timestamp TEXT'])
self.check_and_create_table('agreement', ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'contract_id TEXT', 'username TEXT', 'timestamp TEXT', 'choice TEXT'])
self.check_and_create_table('task',
['id INTEGER', 'create_date TEXT', 'modify_date TEXT', 'command_cls_id TEXT',
'parameter_map BLOB', 'deleted INTEGER', 'plugin TEXT', 'cron_expr TEXT',
'file_server TEXT'])
self.check_and_create_table('policy',
['id INTEGER PRIMARY KEY AUTOINCREMENT', 'type TEXT', 'version TEXT', 'name TEXT',
'execution_id TEXT'])
self.check_and_create_table('profile', ['id INTEGER', 'create_date TEXT', 'label TEXT', 'description TEXT',
'overridable INTEGER', 'active TEXT', 'deleted TEXT',
'profile_data TEXT', 'modify_date TEXT', 'plugin TEXT'])
self.check_and_create_table('plugin',
['id INTEGER PRIMARY KEY AUTOINCREMENT', 'active TEXT', 'create_date TEXT',
'deleted TEXT', 'description TEXT', 'machine_oriented TEXT', 'modify_date TEXT',
'name TEXT', 'policy_plugin TEXT', 'user_oriented TEXT', 'version TEXT',
'task_plugin TEXT', 'x_based TEXT'])
self.check_and_create_table('registration',
['jid TEXT', 'password TEXT', 'registered INTEGER', 'dn TEXT', 'params TEXT',
'timestamp TEXT'])
self.check_and_create_table('contract', ['id INTEGER PRIMARY KEY AUTOINCREMENT', 'content BLOB', 'title TEXT',
'timestamp TEXT'])
self.check_and_create_table('agreement',
['id INTEGER PRIMARY KEY AUTOINCREMENT', 'contract_id TEXT', 'username TEXT',
'timestamp TEXT', 'choice TEXT'])
self.check_and_create_table('session', ['username TEXT', 'display TEXT', 'desktop TEXT', 'timestamp TEXT'])
def get_cols(self, table_name):
@ -44,6 +60,14 @@ class AhenkDbService(object):
return ['content', 'title', 'timestamp']
elif table_name == 'session':
return ['username', 'display', 'desktop', 'timestamp']
elif table_name == 'task':
return ['id', 'create_date', 'modify_date', 'command_cls_id', 'parameter_map', 'deleted', 'plugin',
'cron_expr', 'file_server']
elif table_name == 'plugin':
return ['id', 'active', 'create_date', 'deleted', 'description', 'machine_oriented', 'modify_date', 'name',
'policy_plugin', 'user_oriented', 'version', 'task_plugin', 'x_based']
else:
return None
def connect(self):
try:
@ -94,7 +118,8 @@ class AhenkDbService(object):
self.logger.warning('Could not update table cursor is None! Table Name : {0}'.format(str(table_name)))
return None
except Exception as e:
self.logger.error('Updating table error ! Table Name : {0} Error Mesage: {1}'.format(str(table_name),str(e)))
self.logger.error(
'Updating table error ! Table Name : {0} Error Mesage: {1}'.format(str(table_name), str(e)))
finally:
self.lock.release()

View file

@ -4,16 +4,17 @@
import json
from base.scope import Scope
from base.file.file_transfer_manager import FileTransferManager
from base.model.enum.content_type import ContentType
from base.model.enum.message_code import MessageCode
from base.model.enum.message_type import MessageType
from base.model.plugin_bean import PluginBean
from base.model.policy_bean import PolicyBean
from base.model.profile_bean import ProfileBean
from base.model.response import Response
from base.model.task_bean import TaskBean
from base.model.enum.message_code import MessageCode
from base.model.enum.message_type import MessageType
from base.model.enum.content_type import ContentType
from base.scheduler.custom.schedule_job import ScheduleTaskJob
from base.scope import Scope
from base.system.system import System
from base.util.util import Util
@ -41,6 +42,7 @@ class ExecutionManager(object):
self.event_manager.register_event(MessageType.EXECUTE_POLICY.value, self.execute_policy)
self.event_manager.register_event(MessageType.INSTALL_PLUGIN.value, self.install_plugin)
self.event_manager.register_event(MessageType.RESPONSE_AGREEMENT.value, self.agreement_update)
self.event_manager.register_event(MessageType.UPDATE_SCHEDULED_TASK.value, self.update_scheduled_task)
def agreement_update(self, arg):
@ -170,6 +172,44 @@ class ExecutionManager(object):
self.logger.debug('Executing active policies for {0} user...'.format(username))
self.task_manager.addPolicy(self.get_active_policies(username))
def update_scheduled_task(self, arg):
self.logger.debug('Working on scheduled task ...')
update_scheduled_json = json.loads(arg)
scheduler = Scope.get_instance().get_scheduler()
if str(update_scheduled_json['cronExpression']).lower() == 'none' or update_scheduled_json[
'cronExpression'] is None:
self.logger.debug('Scheduled task will be removed')
scheduler.remove_job(int(update_scheduled_json['taskId']))
self.logger.debug('Task removed from scheduled database')
self.db_service.update('task', ['deleted'], ['True'],
'id={0}'.format(update_scheduled_json['taskId']))
self.logger.debug('Task table updated.')
else:
self.logger.debug('Scheduled task cron expression will be updated.')
self.db_service.update('task', ['cron_expr'], [str(update_scheduled_json['cronExpression'])],
'id={0}'.format(update_scheduled_json['taskId']))
self.logger.debug('Task table updated.')
scheduler.remove_job(str(update_scheduled_json['taskId']))
self.logger.debug('Previous scheduled task removed.')
scheduler.add_job(ScheduleTaskJob(self.get_task_bean_by_id(update_scheduled_json['taskId'])))
self.logger.debug('New scheduled task added')
def get_task_bean_by_id(self, task_id):
task_row = self.db_service.select('task', self.db_service.get_cols('task'), 'id={0}'.format(task_id))[0]
task = TaskBean(task_row[0], task_row[1], task_row[2], task_row[3], task_row[4], task_row[5],
self.get_plugin_bean_by_id(task_row[6]),
task_row[7], task_row[8])
return task
def get_plugin_bean_by_id(self, plugin_id):
plugin_row = self.db_service.select('plugin', self.db_service.get_cols('plugin'), 'id={0}'.format(plugin_id))[0]
plugin = PluginBean(plugin_row[0], plugin_row[1], plugin_row[2], plugin_row[3], plugin_row[4], plugin_row[5],
plugin_row[6], plugin_row[7], plugin_row[8], plugin_row[11], plugin_row[9], plugin_row[10],
plugin_row[12])
return plugin
def execute_policy(self, arg):
self.logger.debug('Updating policies...')
policy = self.json_to_PolicyBean(json.loads(arg))

View file

@ -46,7 +46,10 @@ class Mail:
self.server.quit()
self.logger.debug('Disconnected')
def send_mail(self, subject, message, files=[]):
def send_mail(self, subject, message, files=None):
if files is None:
files = []
msg = MIMEMultipart()
msg['Date'] = formatdate(localtime=True)
@ -54,7 +57,7 @@ class Mail:
msg.attach(MIMEText(message))
#TODO files attachment max size
# TODO files attachment max size
if files is not None:
for f in files:
part = MIMEBase('application', "octet-stream")

View file

@ -22,3 +22,4 @@ class MessageType(Enum):
UNREGISTER = 'UNREGISTER'
TASK_STATUS = 'TASK_STATUS'
RESPONSE_AGREEMENT = 'RESPONSE_AGREEMENT'
UPDATE_SCHEDULED_TASK = 'UPDATE_SCHEDULED_TASK'

View file

@ -60,10 +60,10 @@ class Plugin(threading.Thread):
Plugin class responsible for processing TASK or USER PLUGIN PROFILE.
"""
def __init__(self, name, InQueue):
def __init__(self, name, in_ueue):
threading.Thread.__init__(self)
self.name = name
self.InQueue = InQueue
self.in_queue = in_ueue
scope = Scope.get_instance()
self.logger = scope.get_logger()
@ -79,7 +79,7 @@ class Plugin(threading.Thread):
while self.keep_run:
try:
try:
item_obj = self.InQueue.get(block=True)
item_obj = self.in_queue.get(block=True)
obj_name = item_obj.obj_name
except Exception as e:
self.logger.error(
@ -88,6 +88,7 @@ class Plugin(threading.Thread):
if obj_name == "TASK":
self.logger.debug('[Plugin] Executing task')
command = Scope.get_instance().get_plugin_manager().find_command(self.getName(),
item_obj.get_plugin().get_version(),
item_obj.get_command_cls_id().lower())
self.context.put('task_id', item_obj.get_id())
@ -158,7 +159,7 @@ class Plugin(threading.Thread):
self.logger.debug('[Plugin] Executing profile')
profile_data = item_obj.get_profile_data()
policy_module = Scope.get_instance().get_plugin_manager().find_policy_module(
item_obj.get_plugin().get_name())
item_obj.get_plugin().get_name(), item_obj.get_plugin().get_version())
self.context.put('username', item_obj.get_username())
execution_id = self.get_execution_id(item_obj.get_id())

View file

@ -25,13 +25,13 @@ class PluginManager(object):
def __init__(self):
super(PluginManager, self).__init__()
self.scope = Scope.get_instance()
self.configManager = self.scope.get_configuration_manager()
self.config_manager = self.scope.get_configuration_manager()
self.db_service = self.scope.get_db_service()
self.message_manager = self.scope.get_message_manager()
self.logger = self.scope.get_logger()
self.plugins = []
self.pluginQueueDict = dict()
self.plugin_queue_dict = dict()
# self.listener = \
self.install_listener()
@ -44,7 +44,7 @@ class PluginManager(object):
self.plugins = []
self.logger.debug('Lookup for possible plugins...')
try:
possible_plugins = os.listdir(self.configManager.get("PLUGIN", "pluginFolderPath"))
possible_plugins = os.listdir(self.config_manager.get("PLUGIN", "pluginFolderPath"))
self.logger.debug('Possible plugins: {0} '.format(str(possible_plugins)))
for plugin_name in possible_plugins:
try:
@ -59,8 +59,8 @@ class PluginManager(object):
def load_single_plugin(self, plugin_name):
# TODO check already loaded plugin
location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), plugin_name)
if not os.path.isdir(location) or not self.configManager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir(
location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name)
if not os.path.isdir(location) or not self.config_manager.get("PLUGIN", "mainModuleName") + ".py" in os.listdir(
location):
self.logger.debug(
'It is not a plugin location ! There is no main module - {0}'.format(str(location)))
@ -70,8 +70,8 @@ class PluginManager(object):
'{0} plugin was already loaded. Reloading {0} plugin'.format(plugin_name))
# self.reload_single_plugin(plugin_name)
else:
self.pluginQueueDict[plugin_name] = PluginQueue()
plugin = Plugin(plugin_name, self.pluginQueueDict[plugin_name])
self.plugin_queue_dict[plugin_name] = PluginQueue()
plugin = Plugin(plugin_name, self.plugin_queue_dict[plugin_name])
plugin.setDaemon(True)
plugin.start()
self.plugins.append(plugin)
@ -79,22 +79,22 @@ class PluginManager(object):
# active init mode
mode = InitMode()
self.pluginQueueDict[plugin_name].put(mode, 1)
self.plugin_queue_dict[plugin_name].put(mode, 1)
if plugin_name in self.delayed_profiles:
self.pluginQueueDict[plugin_name].put(self.delayed_profiles[plugin_name], 1)
self.plugin_queue_dict[plugin_name].put(self.delayed_profiles[plugin_name], 1)
del self.delayed_profiles[plugin_name]
self.logger.debug('Delayed profile was found for this plugin. It will be run.')
if plugin_name in self.delayed_tasks:
self.pluginQueueDict[plugin_name].put(self.delayed_tasks[plugin_name], 1)
self.plugin_queue_dict[plugin_name].put(self.delayed_tasks[plugin_name], 1)
del self.delayed_tasks[plugin_name]
self.logger.debug('Delayed task was found for this plugin. It will be run.')
def reload_plugins(self):
try:
self.logger.info('Reloading plugins...')
for p_queue in self.pluginQueueDict:
self.pluginQueueDict[p_queue].put(ShutdownMode(), 1)
for p_queue in self.plugin_queue_dict:
self.plugin_queue_dict[p_queue].put(ShutdownMode(), 1)
self.plugins = []
self.load_plugins()
self.logger.info('Plugin reloaded successfully.')
@ -116,10 +116,10 @@ class PluginManager(object):
def remove_plugins(self):
try:
self.logger.debug('Removing all plugins...')
for p_queue in self.pluginQueueDict:
self.pluginQueueDict[p_queue].put(ShutdownMode(), 1)
for p_queue in self.plugin_queue_dict:
self.plugin_queue_dict[p_queue].put(ShutdownMode(), 1)
self.plugins = []
self.pluginQueueDict = dict()
self.plugin_queue_dict = dict()
self.logger.debug('All plugins were removed successfully.')
except Exception as e:
self.logger.error(
@ -130,8 +130,8 @@ class PluginManager(object):
self.logger.debug('Trying to remove {0} plugin...'.format(plugin_name))
if self.is_plugin_loaded(plugin_name):
self.logger.debug('{0} plugin is killing...'.format(plugin_name))
self.pluginQueueDict[plugin_name].put(ShutdownMode(), 1)
del self.pluginQueueDict[plugin_name]
self.plugin_queue_dict[plugin_name].put(ShutdownMode(), 1)
del self.plugin_queue_dict[plugin_name]
for plugin in self.plugins:
if plugin.name == plugin_name:
@ -144,13 +144,13 @@ class PluginManager(object):
'A problem occurred while removing {0} plugin. Error Message :{1}.'.format(plugin_name,
str(e)))
def find_command(self, pluginName, commandId):
location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), pluginName)
if os.path.isdir(location) and commandId + ".py" in os.listdir(location):
info = imp.find_module(commandId, [location])
return imp.load_module(commandId, *info)
def find_command(self, plugin_name, version, command_id):
location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name)
if os.path.isdir(location) and command_id + ".py" in os.listdir(location):
info = imp.find_module(command_id, [location])
return imp.load_module(command_id, *info)
else:
self.logger.warning('Command id -' + commandId + ' - not found')
self.logger.warning('Command id -' + command_id + ' - not found')
return None
def process_task(self, task):
@ -163,12 +163,13 @@ class PluginManager(object):
try:
plugin_name = task.get_plugin().get_name().lower()
plugin_ver = task.get_plugin().get_version()
if plugin_name in self.pluginQueueDict:
self.pluginQueueDict[plugin_name].put(task, 1)
if self.does_plugin_exist(plugin_name, plugin_ver) and plugin_name in self.plugin_queue_dict:
self.plugin_queue_dict[plugin_name].put(task, 1)
else:
self.logger.warning(
'{0} plugin not found. Task was delayed. Ahenk will request plugin from Lider if distribution available'.format(
plugin_name))
plugin_name, plugin_ver))
self.delayed_tasks[plugin_name] = task
msg = self.message_manager.missing_plugin_message(PluginBean(name=plugin_name, version=plugin_ver))
self.messenger.send_direct_message(msg)
@ -177,7 +178,7 @@ class PluginManager(object):
'Exception occurred while processing task. Error Message: {0}'.format(str(e)))
def find_policy_module(self, plugin_name):
location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), plugin_name)
location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name)
if os.path.isdir(location) and "policy.py" in os.listdir(location):
info = imp.find_module("policy", [location])
return imp.load_module("policy", *info)
@ -229,12 +230,12 @@ class PluginManager(object):
plugin = profile.get_plugin()
plugin_name = plugin.get_name()
plugin_ver = plugin.get_version()
if plugin_name in self.pluginQueueDict:
self.pluginQueueDict[plugin_name].put(profile, 1)
if self.does_plugin_exist(plugin_name, plugin_ver) and plugin_name in self.plugin_queue_dict:
self.plugin_queue_dict[plugin_name].put(profile, 1)
else:
self.logger.warning(
'{0} plugin not found. Profile was delayed. Ahenk will request plugin from Lider if distribution available'.format(
plugin_name))
'{0} plugin {1} version not found. Profile was delayed. Ahenk will request plugin from Lider if distribution available'.format(
plugin_name, plugin_ver))
self.delayed_profiles[plugin_name] = profile
msg = self.message_manager.missing_plugin_message(PluginBean(name=plugin_name, version=plugin_ver))
self.scope.get_messenger().send_direct_message(msg)
@ -242,17 +243,15 @@ class PluginManager(object):
self.logger.error(
'Exception occurred while processing profile. Error Message: {0}'.format(str(e)))
def check_plugin_exists(self, plugin_name, version=None):
criteria = ' name=\'' + plugin_name + '\''
if version is not None:
criteria += ' and version=\'' + str(version) + '\''
result = self.db_service.select('plugin', 'name', criteria)
if result is None:
return False
else:
return True
def does_plugin_exist(self, name, version):
location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), name)
main = self.config_manager.get("PLUGIN", "mainModuleName")
if os.path.isdir(location) and main + ".py" in os.listdir(location):
info = imp.find_module(main, [location])
main_py = imp.load_module(main, *info)
if main_py.info() is None or main_py.info()['version'] == version:
return True
return False
def process_mode(self, mode_type, username=None):
mode = None
@ -271,9 +270,9 @@ class PluginManager(object):
if mode is not None:
self.logger.info('{0} mode is running'.format(mode_type))
for plugin_name in self.pluginQueueDict:
for plugin_name in self.plugin_queue_dict:
try:
self.pluginQueueDict[plugin_name].put(mode, 1)
self.plugin_queue_dict[plugin_name].put(mode, 1)
except Exception as e:
self.logger.error(
'Exception occurred while switching safe mode. Error Message : {0}'.format(
@ -281,7 +280,7 @@ class PluginManager(object):
def find_module(self, mode, plugin_name):
mode = mode.lower().replace('_mode', '')
location = os.path.join(self.configManager.get("PLUGIN", "pluginFolderPath"), plugin_name)
location = os.path.join(self.config_manager.get("PLUGIN", "pluginFolderPath"), plugin_name)
if os.path.isdir(location) and (mode + ".py") in os.listdir(location):
info = imp.find_module(mode, [location])
@ -297,7 +296,7 @@ class PluginManager(object):
def is_plugin_loaded(self, plugin_name):
try:
if self.pluginQueueDict[plugin_name] is not None:
if self.plugin_queue_dict[plugin_name] is not None:
return True
else:
return False
@ -309,4 +308,4 @@ class PluginManager(object):
pass
def printQueueSize(self):
print("size " + str(len(self.pluginQueueDict)))
print("size " + str(len(self.plugin_queue_dict)))

View file

@ -23,26 +23,26 @@ class ScheduleTaskJob(object):
self.months = self.conv_to_set(cron_sj[3])
self.dow = self.conv_to_set(cron_sj[4])
self.action = self.process_task
self.logger.debug('[ScheduleTaskJob] Instance created.')
self.logger.debug('Instance created.')
except Exception as e:
self.logger.error(
'[ScheduleTaskJob] A problem occurred while creating instance of ScheduleTaskJob. Error Message : {0}'.format(
'A problem occurred while creating instance of ScheduleTaskJob. Error Message : {0}'.format(
str(e)))
def process_task(self):
try:
self.logger.debug('[ScheduleTaskJob] Running scheduled task now...')
self.logger.debug('Running scheduled task now...')
self.plugin_manager.process_task(self.task)
self.logger.debug('[ScheduleTaskJob] Scheduled Task was executed.')
self.logger.debug('Scheduled Task was executed.')
# There is no any single shot task
# if self.is_single_shot():
# Scope.getInstance().get_scheduler().remove_job(self.task.get_id())
except Exception as e:
self.logger.error(
'[ScheduleTaskJob] A problem occurred while running scheduled task. Error Message: {0}'.format(str(e)))
'A problem occurred while running scheduled task. Error Message: {0}'.format(str(e)))
def parse_cron_str(self, cron_str):
self.logger.debug('[ScheduleTaskJob] Parsing cron string...')
self.logger.debug('Parsing cron string...')
try:
cron_exp_arr = cron_str.split(" ")
cron_sj = []
@ -65,19 +65,19 @@ class ScheduleTaskJob(object):
elif count == 3:
cron_sj.append(range(0, 7, range_val))
else:
self.logger.warning('[ScheduleTaskJob] Unsupported expression.')
self.logger.warning('Unsupported expression.')
elif ',' in exp:
cron_sj.append("(" + str(exp) + ")")
else:
self.logger.warning('[ScheduleTaskJob] Unsupported expression.')
count = count + 1
self.logger.warning('Unsupported expression.')
count += 1
return cron_sj
except Exception as e:
self.logger.error(
'[ScheduleTaskJob] A problem occurred while parsing cron expression. Error Message: {0}'.format(str(e)))
'A problem occurred while parsing cron expression. Error Message: {0}'.format(str(e)))
def conv_to_set(self, obj):
self.logger.debug('[ScheduleTaskJob] Converting {0} to set'.format(str(obj)))
self.logger.debug('Converting {0} to set'.format(str(obj)))
if str(obj).isdigit():
return set([int(obj)])

View file

@ -10,15 +10,15 @@ from base.task.task_job import TaskJob
class TaskInQueue(threading.Thread):
"""docstring for TaskInQueue"""
def __init__(self, inQueue):
def __init__(self, in_queue):
super(TaskInQueue, self).__init__()
self.inQueue = inQueue
self.in_queue = in_queue
def run(self):
# Add task to db. Adding task to db important because task can be lost when processing.
# Call plugin manager and process message inside task job
try:
task = self.inQueue.get()
task = self.in_queue.get()
print(task)
# Can be validate task before processing
job = TaskJob(task)