Ahenk/usr/share/ahenk/plugins/manage-root/set_root_password.py

115 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Author: Tuncay Çolak <tuncay.colak@tubitak.gov.tr> <tncyclk05@gmail.com>
# Author: Hasan Kara <h.kara27@gmail.com>
import subprocess
from base.plugin.abstract_plugin import AbstractPlugin
from base.model.enum.content_type import ContentType
import json
import datetime
class RootPassword(AbstractPlugin):
def __init__(self, task, context):
super(RootPassword, self).__init__()
self.task = task
self.context = context
self.message_code = self.get_message_code()
self.logger = self.get_logger()
self.create_shadow_password = 'mkpasswd {}'
self.change_password = 'usermod -p {0} {1}'
self.username= 'root'
def save_mail(self, status):
cols = ['command', 'mailstatus', 'timestamp'];
values = ['set_root_password', status, self.timestamp()]
self.db_service.update('mail', cols, values)
def set_mail(self,mail_content):
if mail_content.__contains__('{date}'):
mail_content = str(mail_content).replace('{date}', str(datetime.date.today()));
if mail_content.__contains__('{ahenk}'):
mail_content = str(mail_content).replace('{ahenk}', str(self.Ahenk.dn()));
self.context.set_mail_content(mail_content)
def handle_task(self):
lockRootUser = self.task['lockRootUser']
password = self.task['RootPassword']
rootEntity = self.task['rootEntity']
self.logger.debug('[Root Pass] password: ' + str("**********"))
mail_send = False
mail_subject = ''
mail_content = ''
if 'mailSend' in self.task:
mail_send = self.task['mailSend'];
if 'mailSubject' in self.task:
mail_subject = self.task['mailSubject'];
if 'mailContent' in self.task:
mail_content = self.task['mailContent'];
try:
if lockRootUser:
self.logger.info("Locking root user")
result_code, p_out, p_err = self.execute_command("passwd -l root")
self.context.create_response(code=self.message_code.TASK_PROCESSED.value,
message='Root kullanıcısı başarıyla kilitlendi.',
data=json.dumps({'Result': p_out}),
content_type=self.get_content_type().APPLICATION_JSON.value)
else:
if str(password).strip() != '':
result_code, p_out, p_err = self.execute_command(self.create_shadow_password.format(password))
shadow_password = p_out.strip()
self.execute_command(self.change_password.format('\'{}\''.format(shadow_password), self.username))
self.set_mail(mail_content)
self.context.create_response(code=self.message_code.TASK_PROCESSED.value,
message='Parola Başarı ile değiştirildi.',
data=json.dumps({
'Result': 'Parola Başarı ile değiştirildi.',
'mail_content': str(self.context.get_mail_content()),
'mail_subject': str(self.context.get_mail_subject()),
'mail_send': self.context.is_mail_send(),
'rootEntity': rootEntity
}),
content_type=ContentType.APPLICATION_JSON.value)
self.logger.debug('Changed password.')
except Exception as e:
self.logger.error('Error: {0}'.format(str(e)))
mail_content = 'Root Parolası değiştirlirken hata oluştu.'
self.context.create_response(code=self.message_code.TASK_ERROR.value,
message='Parola değiştirilirken hata oluştu.',
data=json.dumps({
'Result': 'Parola değiştirilirken hata oluştu.',
'mail_content': str(self.context.get_mail_content()),
'mail_subject': str(self.context.get_mail_subject()),
'mail_send': self.context.is_mail_send(),
'rootEntity': rootEntity
}),
content_type=ContentType.APPLICATION_JSON.value)
## this methode is only for manage-root password plugin
def execute_command(self, command, stdin=None, env=None, cwd=None, shell=True, result=True):
try:
process = subprocess.Popen(command, stdin=stdin, env=env, cwd=cwd, stderr=subprocess.PIPE,
stdout=subprocess.PIPE, shell=shell)
self.logger.debug('Executing command for manage-root')
if result is True:
result_code = process.wait()
p_out = process.stdout.read().decode("unicode_escape")
p_err = process.stderr.read().decode("unicode_escape")
return result_code, p_out, p_err
else:
return None, None, None
except Exception as e:
return 1, 'Could not execute command'
def handle_task(task, context):
clz = RootPassword(task, context)
clz.handle_task()