Ahenk/usr/share/ahenk/base/util/util.py

507 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: Volkan Şahin <volkansah.in> <bm.volkansahin@gmail.com>
import datetime
import grp
import hashlib
import json
import os
import pwd
import shutil
import stat
import subprocess
import uuid
import locale
from base.scope import Scope
from os.path import expanduser
class Util:
def __init__(self):
super().__init__()
scope = Scope().get_instance()
@staticmethod
def get_ask_path_file():
return '/usr/share/ahenk/base/agreement/'
@staticmethod
def close_session(username):
Util.execute('pkill -9 -u {0}'.format(username))
@staticmethod
def shutdown():
Util.execute('reboot')
@staticmethod
def create_file(full_path):
try:
if os.path.exists(full_path):
return None
else:
file = open(full_path, 'w')
file.close()
return True
except:
raise
@staticmethod
def delete_folder(full_path):
try:
shutil.rmtree(full_path)
except:
raise
@staticmethod
def delete_file(full_path):
try:
if Util.is_exist(full_path):
os.remove(full_path)
except:
raise
@staticmethod
def rename_file(old_full_path, new_full_path):
try:
os.rename(old_full_path, new_full_path)
except:
raise
@staticmethod
def copy_file(source_full_path, destination_full_path):
try:
shutil.copy2(source_full_path, destination_full_path)
except:
raise
@staticmethod
def move(source_full_path, destination_full_path):
try:
shutil.move(source_full_path, destination_full_path)
except:
raise
@staticmethod
def get_size(full_path):
# byte
try:
return os.path.getsize(full_path)
except:
raise
@staticmethod
def link_path(source_path, destination_path):
try:
os.symlink(source_path, destination_path)
except:
raise
@staticmethod
def read_file(full_path, mode='r'):
content = None
try:
with open(full_path, mode) as f:
content = f.read()
except:
raise
finally:
return content
@staticmethod
def read_file_by_line(full_path, mode='r'):
line_list = list()
with open(full_path, mode) as f:
lines = f.readlines()
for line in lines:
line_list.append(line)
return line_list
@staticmethod
def write_file(full_path, content, mode='w+'):
file = None
try:
file = open(full_path, mode)
file.write(content)
except:
raise
finally:
file.close()
@staticmethod
def make_executable(full_path):
try:
st = os.stat(full_path)
os.chmod(full_path, st.st_mode | stat.S_IEXEC)
except:
raise
@staticmethod
def change_owner(full_path, user_name=None, group_name=None):
try:
shutil.chown(full_path, user_name, group_name)
except:
raise
@staticmethod
def get_executable_path(app_name):
path = None
try:
path = shutil.which(app_name)
except:
raise
finally:
return path
@staticmethod
def execute(command, stdin=None, env=None, cwd=None, shell=True, result=True, as_user=None, ip=None):
try:
if ip:
command = 'ssh root@{0} "{1}"'.format(ip, command)
Scope.get_instance().get_logger().debug('Executing command: ' +str(command))
elif as_user:
command = 'su - {0} -c "{1}"'.format(as_user, command)
Scope.get_instance().get_logger().debug('Executing command: ' +str(command))
process = subprocess.Popen(command, stdin=stdin, env=env, cwd=cwd, stderr=subprocess.PIPE,
stdout=subprocess.PIPE, shell=shell)
Scope.get_instance().get_logger().debug('Executing command: ' + str(command))
if result is True:
result_code = process.wait()
p_out = process.stdout.read().decode("unicode_escape")
p_err = process.stderr.read().decode("unicode_escape")
return result_code, p_out, p_err
else:
return None, None, None
except Exception as e:
return 1, 'Could not execute command: {0}. Error Message: {1}'.format(command, str(e)), ''
@staticmethod
def scopy_from_remote(source_path, destination_path, ip):
command = 'scp -r root@' + ip + ':' + source_path + ' ' + destination_path
process = subprocess.Popen(command, stderr=subprocess.PIPE,stdout=subprocess.PIPE, shell=True)
result_code = process.wait()
p_out = process.stdout.read().decode("unicode_escape")
p_err = process.stderr.read().decode("unicode_escape")
return result_code, p_out, p_err
@staticmethod
def execute_script(script_path, parameters=None):
command = []
if os.path.exists(script_path):
command.append(script_path)
else:
raise Exception('[Util] Script is required')
if parameters is not None:
for p in parameters:
command.append(p)
return subprocess.check_call(command)
@staticmethod
def is_exist(full_path):
try:
return os.path.exists(full_path)
except:
raise
@staticmethod
def create_directory(dir_path):
try:
return os.makedirs(dir_path)
except:
raise
@staticmethod
def string_to_json(string):
try:
return json.loads(string)
except:
raise
# TODO json abilities
@staticmethod
def file_owner(full_path):
try:
st = os.stat(full_path)
uid = st.st_uid
return pwd.getpwuid(uid)[0]
except:
raise
@staticmethod
def file_group(full_path):
try:
st = os.stat(full_path)
gid = st.st_gid
# return grp.getgrgid(gid)[0]
return gid
except:
raise
@staticmethod
def install_with_dpkg(full_path):
command_dpkg = 'dpkg -i {0}'
command_dep = 'apt -f install -y'
commands = [command_dpkg.format(full_path),command_dep]
for cmd in commands:
try:
process = subprocess.Popen(cmd, shell=True)
process.wait()
except:
raise
@staticmethod
def install_with_apt_get(package_name, package_version=None):
if package_version is not None:
command = 'apt-get install --yes --force-yes {0}={1}'.format(package_name, package_version)
else:
command = 'apt-get install --yes --force-yes {0}'.format(package_name)
return Util.execute(command)
@staticmethod
def uninstall_package(package_name, package_version=None):
if package_version is not None:
command = 'apt-get purge --yes --force-yes {0}={1}'.format(package_name, package_version)
else:
command = 'apt-get purge --yes --force-yes {0}'.format(package_name)
return Util.execute(command)
@staticmethod
def is_installed(package_name):
result_code, p_out, p_err = Util.execute('dpkg -s {0}'.format(package_name))
try:
lines = str(p_out).split('\n')
for line in lines:
if len(line) > 1:
if line.split(None, 1)[0].lower() == 'status:':
if 'installed' in line.split(None, 1)[1].lower():
return True
return False
except Exception as e:
return False
@staticmethod
def get_md5_file(file_path):
hash_md5 = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return str(hash_md5.hexdigest())
@staticmethod
def get_md5_text(content):
hash_md5 = hashlib.md5()
hash_md5.update(content.encode())
return str(hash_md5.hexdigest())
@staticmethod
def timestamp():
return str(datetime.datetime.now().strftime("%d-%m-%Y %I:%M"))
@staticmethod
def generate_uuid():
return str(uuid.uuid4())
@staticmethod
def get_language():
locale_info = locale.getdefaultlocale()
return locale_info[0]
@staticmethod
def set_permission(path, permission_code):
Util.execute('chmod -R {0} {1}'.format(permission_code, path))
@staticmethod
def has_attr_json(arg, attr_name):
for j in json.loads(json.dumps(arg)):
if attr_name in j:
return True
return False
@staticmethod
def remove_package(package_name, package_version):
command = "sudo apt-get --yes --force-yes purge {0}={1}".format(package_name, package_version)
result_code, p_out, p_err = Util.execute(command)
return result_code, p_out, p_err
@staticmethod
def send_notify(title, body, display, user, icon=None, timeout=5000):
inner_command = 'notify-send "{0}" "{1}" -t {2}'.format(title, body, timeout)
if icon:
inner_command += ' -i {0}'.format(icon)
if user != 'root':
Util.execute('export DISPLAY={0}; su - {1} -c \'{2}\''.format(display, user, inner_command))
@staticmethod
def show_message(username, display, message='', title=''):
ask_path = Util.get_ask_path_file()+ 'confirm.py'
Scope.get_instance().get_logger().debug('DISPLAYYYY --------->>>>>>>>: ' + str(display))
if display is None:
display_number = Util.get_username_display()
else:
display_number = display
try:
if Util.get_desktop_env() == "gnome":
display_number = Util.get_username_display_gnome(username)
if username is not None:
command = 'su - {0} -c \'python3 {1} \"{2}\" \"{3}\" \"{4}\"\''.format(username, ask_path, message,
title, display_number)
result_code, p_out, p_err = Util.execute(command)
if p_out.strip() == 'Y':
return True
elif p_out.strip() == 'N':
return False
else:
return None
else:
return None
except Exception as e :
print("Error when showing message " + str(e))
return None
@staticmethod
def show_registration_message(login_user_name, message, title, host=None):
ask_path = Util.get_ask_path_file() + 'ahenkmessage.py'
# display_number = ":0"
display_number = Util.get_username_display()
if Util.get_desktop_env() == "gnome":
display_number = Util.get_username_display_gnome(login_user_name)
if host is None:
command = 'su - {0} -c \"python3 {1} \'{2}\' \'{3}\' \'{4}\' \"'.format(login_user_name,
ask_path, message, title, display_number)
else:
command = 'su - {0} -c \"python3 {1} \'{2}\' \'{3}\' \'{4}\' \'{5}\' \"'.format(login_user_name,
ask_path,
message, title,
host, display_number)
result_code, p_out, p_err = Util.execute(command)
pout = str(p_out).replace('\n', '')
return pout
@staticmethod
def show_unregistration_message(login_user_name,display_number,message,title):
ask_path = Util.get_ask_path_file()+ 'unregistrationmessage.py'
if Util.get_desktop_env() == "gnome":
display_number = Util.get_username_display_gnome(login_user_name)
command = 'su - {0} -c \"python3 {1} \'{2}\' \'{3}\' \'{4}\' \"'.format(login_user_name, ask_path, message, title, display_number)
result_code, p_out, p_err = Util.execute(command)
pout = str(p_out).replace('\n', '')
return pout
@staticmethod
def get_username_display():
result_code, p_out, p_err = Util.execute("who | awk '{print $1, $5}' | sed 's/(://' | sed 's/)//'", result=True)
result = []
lines = str(p_out).split('\n')
for line in lines:
arr = line.split(' ')
if len(arr) > 1 and str(arr[1]).isnumeric() is True:
result.append(line)
params = str(result[0]).split(' ')
display_number = params[1]
display_number = ":"+str(display_number)
return display_number
@staticmethod
def get_username_display_gnome(user):
result_code, p_out, p_err = Util.execute("who | awk '{print $1, $5}' | sed 's/(://' | sed 's/)//'", result=True)
display_number = None
result = []
lines = str(p_out).split('\n')
for line in lines:
arr = line.split(' ')
if len(arr) > 1 and str(arr[1]).isnumeric() is True:
result.append(line)
for res in result:
arr = res.split(" ")
username = arr[0]
if username == user:
display_number = ":" + arr[1]
# print("--->>> dis no: " + str(display_number))
return display_number
@staticmethod
def get_desktop_env():
xfce4_session = "/usr/bin/xfce4-session"
gnome_session = "/usr/bin/gnome-session"
desktop_env = None
result_code, p_out, p_err = Util.execute("ls {}".format(gnome_session))
if result_code == 0:
desktop_env = "gnome"
result_code, p_out, p_err = Util.execute("ls {}".format(xfce4_session))
if result_code == 0:
desktop_env = "xfce"
return desktop_env
# return home directory for user. "/home/username"
@staticmethod
def get_homedir(user):
username = user
try:
return expanduser("~{0}".format(username))
except:
raise
# return username from ahenk.db. if domain is not null return username is DOMAIN\\username
@staticmethod
def get_username():
user_name = Scope.get_instance().get_db_service().select_one_result('session', 'username', " 1=1 order by id desc ")
domain = Scope.get_instance().get_db_service().select_one_result('session', 'domain', " 1=1 order by id desc ")
if domain:
user_name = "{0}\\{1}".format(domain, user_name)
return user_name
# as_user is the user that run command. Return as_user for execute method. if domain is not null return as_user is DOMAIN\\\\username
@staticmethod
def get_as_user():
as_user = Scope.get_instance().get_db_service().select_one_result('session', 'username', " 1=1 order by id desc ")
domain = Scope.get_instance().get_db_service().select_one_result('session', 'domain', " 1=1 order by id desc ")
if domain:
as_user = "{0}\\\\{1}".format(domain, as_user)
return as_user
# return gid_number of username
@staticmethod
def get_gid_number(username):
try:
return pwd.getpwnam(username).pw_gid
except:
raise
@staticmethod
def get_agent_version():
result_code, result, p_err = Util.execute('dpkg -s {} | grep Version'.format("ahenk"))
data = result.split(': ')
if data[0] == 'Version':
version = data[1].strip('\n')
return version
else:
return None
# return active user in sessions list
@staticmethod
def get_active_user():
result_code, p_out, p_err = Util.execute("for sessionid in $(loginctl list-sessions --no-legend | awk '{ print $1 }'); do loginctl show-session -p Id -p Name -p User -p State -p Type -p Remote $sessionid | sort; done | awk -F= '/Name/ { name = $2 } /User/ { user = $2 } /State/ { state = $2 } /Type/ { type = $2 } /Remote/ { remote = $2 } /User/ && remote == \"no\" && state == \"active\" && (type == \"x11\" || type == \"wayland\") { print name }\'")
p_out = str(p_out).rstrip()
return p_out