Ahenk/opt/ahenk/base/util/util.py

340 lines
9.8 KiB
Python
Raw Normal View History

2016-05-25 17:10:27 +03:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: Volkan Şahin <volkansah.in> <bm.volkansahin@gmail.com>
2016-09-21 11:46:46 +03:00
import datetime
2016-06-06 12:20:31 +03:00
import grp
2016-09-21 11:46:46 +03:00
import hashlib
2016-05-25 17:10:27 +03:00
import json
2016-06-06 12:20:31 +03:00
import os
import pwd
2016-05-25 17:10:27 +03:00
import shutil
import stat
import subprocess
import uuid
2017-08-27 09:45:05 +03:00
from base.scope import Scope
2016-05-25 17:10:27 +03:00
2016-06-30 15:42:42 +03:00
2016-05-25 17:10:27 +03:00
class Util:
def __init__(self):
super().__init__()
2016-08-11 18:26:52 +03:00
@staticmethod
def close_session(username):
Util.execute('pkill -9 -u {0}'.format(username))
2016-05-25 17:10:27 +03:00
@staticmethod
def create_file(full_path):
try:
if os.path.exists(full_path):
return None
else:
file = open(full_path, 'w')
file.close()
return True
except:
raise
@staticmethod
def delete_folder(full_path):
try:
shutil.rmtree(full_path)
except:
raise
@staticmethod
def delete_file(full_path):
try:
2016-09-21 11:46:46 +03:00
if Util.is_exist(full_path):
os.remove(full_path)
2016-05-25 17:10:27 +03:00
except:
raise
@staticmethod
def rename_file(old_full_path, new_full_path):
try:
os.rename(old_full_path, new_full_path)
except:
raise
@staticmethod
def copy_file(source_full_path, destination_full_path):
try:
shutil.copy2(source_full_path, destination_full_path)
except:
raise
@staticmethod
def move(source_full_path, destination_full_path):
try:
shutil.move(source_full_path, destination_full_path)
except:
raise
@staticmethod
def get_size(full_path):
# byte
try:
return os.path.getsize(full_path)
except:
raise
@staticmethod
def link_path(source_path, destination_path):
try:
os.symlink(source_path, destination_path)
except:
raise
@staticmethod
def read_file(full_path, mode='r'):
content = None
2016-05-25 17:10:27 +03:00
try:
2016-06-30 15:42:42 +03:00
with open(full_path, mode) as f:
content = f.read()
2016-05-25 17:10:27 +03:00
except:
raise
finally:
return content
2016-05-25 17:10:27 +03:00
@staticmethod
def read_file_by_line(full_path, mode='r'):
line_list = list()
with open(full_path, mode) as f:
lines = f.readlines()
for line in lines:
line_list.append(line)
return line_list
2016-05-25 17:10:27 +03:00
@staticmethod
2016-06-30 14:51:45 +03:00
def write_file(full_path, content, mode='w+'):
2016-05-25 17:10:27 +03:00
file = None
try:
file = open(full_path, mode)
file.write(content)
except:
raise
finally:
file.close()
@staticmethod
def make_executable(full_path):
try:
st = os.stat(full_path)
os.chmod(full_path, st.st_mode | stat.S_IEXEC)
except:
raise
@staticmethod
def change_owner(full_path, user_name=None, group_name=None):
try:
shutil.chown(full_path, user_name, group_name)
except:
raise
@staticmethod
def execute(command, stdin=None, env=None, cwd=None, shell=True, result=True, as_user=None, ip=None):
2016-05-26 17:24:29 +03:00
try:
if ip:
2017-08-23 22:43:42 +03:00
command = 'ssh root@{0} "{1}"'.format(ip, command)
2017-08-27 09:45:05 +03:00
Scope.get_instance().get_logger().debug('Executing command: ' +str(command))
elif as_user:
command = 'su - {0} -c "{1}"'.format(as_user, command)
2017-08-27 09:45:05 +03:00
Scope.get_instance().get_logger().debug('Executing command: ' +str(command))
process = subprocess.Popen(command, stdin=stdin, env=env, cwd=cwd, stderr=subprocess.PIPE,
stdout=subprocess.PIPE, shell=shell)
2016-05-26 17:24:29 +03:00
if result is True:
result_code = process.wait()
p_out = process.stdout.read().decode("unicode_escape")
p_err = process.stderr.read().decode("unicode_escape")
2016-05-26 17:24:29 +03:00
return result_code, p_out, p_err
else:
return None, None, None
2016-05-26 17:24:29 +03:00
except Exception as e:
return 1, 'Could not execute command: {0}. Error Message: {1}'.format(command, str(e)), ''
2016-05-25 17:10:27 +03:00
@staticmethod
def scopy_from_remote(source_path, destination_path, ip):
command = 'scp -r root@' + ip + ':' + source_path + ' ' + destination_path
process = subprocess.Popen(command, stderr=subprocess.PIPE,stdout=subprocess.PIPE, shell=True)
result_code = process.wait()
p_out = process.stdout.read().decode("unicode_escape")
p_err = process.stderr.read().decode("unicode_escape")
return result_code, p_out, p_err
2016-05-25 17:10:27 +03:00
@staticmethod
def execute_script(script_path, parameters=None):
command = []
if os.path.exists(script_path):
command.append(script_path)
else:
raise Exception('[Util] Script is required')
2017-08-27 10:06:02 +03:00
if parameters is not None:
2016-05-25 17:10:27 +03:00
for p in parameters:
command.append(p)
return subprocess.check_call(command)
@staticmethod
def is_exist(full_path):
try:
return os.path.exists(full_path)
except:
raise
@staticmethod
def create_directory(dir_path):
try:
return os.makedirs(dir_path)
2016-05-25 17:10:27 +03:00
except:
raise
@staticmethod
def string_to_json(string):
try:
return json.loads(string)
except:
raise
# TODO json abilities
@staticmethod
def file_owner(full_path):
try:
st = os.stat(full_path)
uid = st.st_uid
return pwd.getpwuid(uid)[0]
except:
raise
@staticmethod
def file_group(full_path):
try:
st = os.stat(full_path)
gid = st.st_uid
return grp.getgrgid(gid)[0]
except:
2016-05-26 17:24:29 +03:00
raise
@staticmethod
def install_with_gdebi(full_path):
try:
process = subprocess.Popen('gdebi -n ' + full_path, shell=True)
process.wait()
except:
raise
@staticmethod
2016-07-20 16:23:50 +03:00
def install_with_apt_get(package_name, package_version=None):
if package_version is not None:
command = 'apt-get install --yes --force-yes {0}={1}'.format(package_name, package_version)
else:
command = 'apt-get install --yes --force-yes {0}'.format(package_name)
return Util.execute(command)
2016-07-20 16:23:50 +03:00
@staticmethod
def uninstall_package(package_name, package_version=None):
if package_version is not None:
command = 'apt-get purge --yes --force-yes {0}={1}'.format(package_name, package_version)
else:
command = 'apt-get purge --yes --force-yes {0}'.format(package_name)
return Util.execute(command)
@staticmethod
def is_installed(package_name):
result_code, p_out, p_err = Util.execute('dpkg -s {0}'.format(package_name))
try:
2016-06-21 17:47:25 +03:00
lines = str(p_out).split('\n')
for line in lines:
2016-06-21 17:47:25 +03:00
if len(line) > 1:
if line.split(None, 1)[0].lower() == 'status:':
if 'installed' in line.split(None, 1)[1].lower():
return True
return False
except Exception as e:
return False
2016-06-21 17:47:25 +03:00
2016-06-16 16:39:47 +03:00
@staticmethod
2016-08-01 20:07:37 +03:00
def get_md5_file(file_path):
2016-06-16 16:39:47 +03:00
hash_md5 = hashlib.md5()
2016-08-01 20:07:37 +03:00
with open(file_path, 'rb') as f:
2016-06-16 16:39:47 +03:00
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return str(hash_md5.hexdigest())
@staticmethod
def get_md5_text(content):
hash_md5 = hashlib.md5()
hash_md5.update(content.encode())
return str(hash_md5.hexdigest())
@staticmethod
def timestamp():
return str(datetime.datetime.now().strftime("%d-%m-%Y %I:%M"))
@staticmethod
def generate_uuid():
2016-06-30 14:51:45 +03:00
return str(uuid.uuid4())
2016-06-30 15:42:42 +03:00
@staticmethod
def set_permission(path, permission_code):
2016-07-29 19:06:23 +03:00
Util.execute('chmod -R {0} {1}'.format(permission_code, path))
2016-07-01 16:57:12 +03:00
@staticmethod
def has_attr_json(arg, attr_name):
for j in json.loads(json.dumps(arg)):
if attr_name in j:
return True
return False
2016-07-20 16:16:19 +03:00
@staticmethod
def remove_package(package_name, package_version):
command = "sudo apt-get --yes --force-yes purge {0}={1}".format(package_name, package_version)
result_code, p_out, p_err = Util.execute(command)
return result_code, p_out, p_err
@staticmethod
def send_notify(title, body, display, user, icon=None, timeout=5000):
inner_command = 'notify-send "{0}" "{1}" -t {2}'.format(title, body, timeout)
if icon:
inner_command += ' -i {0}'.format(icon)
if user != 'root':
Util.execute('export DISPLAY={0}; su - {1} -c \'{2}\''.format(display, user, inner_command))
@staticmethod
def ask_permission(display, username, message, title):
ask_path = '/usr/share/ahenk/base/agreement/confirm.py'
try:
if username is not None:
command = 'export DISPLAY={0};su - {1} -c \'python3 {2} \"{3}\" \"{4}\"\''.format(display, username,
ask_path,
message,
title)
result_code, p_out, p_err = Util.execute(command)
if p_out.strip() == 'Y':
return True
elif p_out.strip() == 'N':
return False
else:
return None
else:
return None
except Exception:
return None