mirror of
https://github.com/Pardus-LiderAhenk/ahenk
synced 2024-12-23 16:52:25 +03:00
654 lines
20 KiB
Python
654 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
# Author: Volkan Şahin <volkansah.in> <bm.volkansahin@gmail.com>
|
|
|
|
import configparser
|
|
import fcntl
|
|
import glob
|
|
import os
|
|
import platform
|
|
import re
|
|
import socket
|
|
import struct
|
|
import netifaces
|
|
from uuid import getnode as get_mac
|
|
|
|
import cpuinfo
|
|
import psutil
|
|
|
|
from base.scope import Scope
|
|
from base.util.util import Util
|
|
|
|
|
|
class System:
|
|
def __init__(self):
|
|
scope = Scope().get_instance()
|
|
self.db_service = scope.get_db_service()
|
|
self.logger = scope.get_logger()
|
|
|
|
class BIOS(object):
|
|
@staticmethod
|
|
def vendor():
|
|
try:
|
|
result_code, p_out, p_err = Util.execute('dmidecode --string bios-vendor')
|
|
return int(result_code), str(p_out), str(p_err)
|
|
except:
|
|
raise
|
|
|
|
@staticmethod
|
|
def release_date():
|
|
try:
|
|
result_code, p_out, p_err = Util.execute('dmidecode --string bios-release-date')
|
|
return int(result_code), str(p_out), str(p_err)
|
|
except:
|
|
raise
|
|
|
|
@staticmethod
|
|
def version():
|
|
try:
|
|
result_code, p_out, p_err = Util.execute('dmidecode --string bios-version')
|
|
return int(result_code), str(p_out), str(p_err)
|
|
except:
|
|
raise
|
|
|
|
class Ahenk(object):
|
|
|
|
@staticmethod
|
|
def installed_plugins():
|
|
plugin_names = []
|
|
possible_plugins = os.listdir(System.Ahenk.plugins_path())
|
|
for plugin_name in possible_plugins:
|
|
location = os.path.join(System.Ahenk.plugins_path(), plugin_name)
|
|
if os.path.isdir(location) and System.Ahenk.module_name() + ".py" in os.listdir(location):
|
|
plugin_names.append(plugin_name)
|
|
return plugin_names
|
|
|
|
@staticmethod
|
|
def db_path():
|
|
config = configparser.ConfigParser()
|
|
config._interpolation = configparser.ExtendedInterpolation()
|
|
config.read(System.Ahenk.config_path())
|
|
return config.get('BASE', 'dbPath')
|
|
|
|
@staticmethod
|
|
def agreement_timeout():
|
|
config = configparser.ConfigParser()
|
|
config._interpolation = configparser.ExtendedInterpolation()
|
|
config.read(System.Ahenk.config_path())
|
|
return config.get('SESSION', 'agreement_timeout')
|
|
|
|
@staticmethod
|
|
def registration_timeout():
|
|
config = configparser.ConfigParser()
|
|
config._interpolation = configparser.ExtendedInterpolation()
|
|
config.read(System.Ahenk.config_path())
|
|
return config.get('SESSION', 'registration_timeout')
|
|
|
|
@staticmethod
|
|
def get_policy_timeout():
|
|
config = configparser.ConfigParser()
|
|
config._interpolation = configparser.ExtendedInterpolation()
|
|
config.read(System.Ahenk.config_path())
|
|
return config.get('SESSION', 'get_policy_timeout')
|
|
|
|
@staticmethod
|
|
def uid():
|
|
config = configparser.ConfigParser()
|
|
config._interpolation = configparser.ExtendedInterpolation()
|
|
config.read(System.Ahenk.config_path())
|
|
return config.get('CONNECTION', 'uid')
|
|
|
|
@staticmethod
|
|
def plugins_path():
|
|
config = configparser.ConfigParser()
|
|
config._interpolation = configparser.ExtendedInterpolation()
|
|
config.read(System.Ahenk.config_path())
|
|
return config.get('PLUGIN', 'pluginfolderpath')
|
|
|
|
@staticmethod
|
|
def module_name():
|
|
config = configparser.ConfigParser()
|
|
config._interpolation = configparser.ExtendedInterpolation()
|
|
config.read(System.Ahenk.config_path())
|
|
return config.get('PLUGIN', 'mainModuleName')
|
|
|
|
@staticmethod
|
|
def agreement():
|
|
config = configparser.ConfigParser()
|
|
config._interpolation = configparser.ExtendedInterpolation()
|
|
config.read(System.Ahenk.config_path())
|
|
return config.get('MACHINE', 'agreement')
|
|
|
|
@staticmethod
|
|
def dn():
|
|
system = System()
|
|
try:
|
|
dn = system.db_service.select_one_result('registration', 'dn', " registered='1'")
|
|
return dn
|
|
except:
|
|
return None
|
|
|
|
@staticmethod
|
|
def ip():
|
|
system = System()
|
|
try:
|
|
ip = system.db_service.select_one_result('session', 'ip')
|
|
return ip
|
|
except:
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_pid_number():
|
|
pid_number = None
|
|
try:
|
|
if os.path.exists(System.Ahenk.pid_path()):
|
|
file = open(System.Ahenk.pid_path(), 'r')
|
|
pid_number = file.read()
|
|
file.close()
|
|
return pid_number
|
|
except Exception as e:
|
|
return None
|
|
|
|
@staticmethod
|
|
def is_running():
|
|
try:
|
|
if System.Ahenk.get_pid_number() is not None:
|
|
return psutil.pid_exists(int(System.Ahenk.get_pid_number()))
|
|
else:
|
|
return False
|
|
except Exception as e:
|
|
return False
|
|
|
|
@staticmethod
|
|
def config_path():
|
|
return '/etc/ahenk/ahenk.conf'
|
|
|
|
@staticmethod
|
|
def pid_path():
|
|
return '/var/run/ahenk.pid'
|
|
|
|
@staticmethod
|
|
def fifo_file():
|
|
return '/tmp/liderahenk.fifo'
|
|
|
|
@staticmethod
|
|
def received_dir_path():
|
|
path = '/tmp/' # move this to properties
|
|
if Util.is_exist(path) is False:
|
|
Util.create_directory(path)
|
|
Util.set_permission(path, '777')
|
|
return path
|
|
|
|
class Process(object):
|
|
|
|
@staticmethod
|
|
def process_by_pid(pid):
|
|
return psutil.Process(pid)
|
|
|
|
@staticmethod
|
|
def pids():
|
|
return psutil.pids()
|
|
|
|
@staticmethod
|
|
def find_pids_by_name(p_name):
|
|
arr = []
|
|
for pid in psutil.pids():
|
|
if psutil.Process(id).name() == p_name:
|
|
arr.append(pid)
|
|
return arr
|
|
|
|
@staticmethod
|
|
def is_running(pid):
|
|
return psutil.pid_exists(pid)
|
|
|
|
@staticmethod
|
|
def kill_by_pid(pid):
|
|
return psutil.Process(pid).kill()
|
|
|
|
@staticmethod
|
|
def kill_by_pids(pids):
|
|
for pid in pids:
|
|
psutil.Process(pid).kill()
|
|
|
|
@staticmethod
|
|
def find_name_by_pid(pid):
|
|
return psutil.Process(pid).name()
|
|
|
|
@staticmethod
|
|
def path(pid):
|
|
return psutil.Process(pid).exe()
|
|
|
|
@staticmethod
|
|
def working_directory(pid):
|
|
return psutil.Process(pid).cwd()
|
|
|
|
@staticmethod
|
|
def command_line(pid):
|
|
return psutil.Process(pid).cmdline()
|
|
|
|
@staticmethod
|
|
def status(pid):
|
|
return psutil.Process(pid).status()
|
|
|
|
@staticmethod
|
|
def username(pid):
|
|
return psutil.Process(pid).username()
|
|
|
|
@staticmethod
|
|
def create_time(pid):
|
|
return psutil.Process(pid).create_time()
|
|
|
|
@staticmethod
|
|
def cpu_times(pid):
|
|
return psutil.Process(pid).cpu_times()
|
|
|
|
@staticmethod
|
|
def cpu_percent(pid):
|
|
return psutil.Process(pid).cpu_percent(interval=1.0)
|
|
|
|
@staticmethod
|
|
def memory_percent(pid):
|
|
return psutil.Process(pid).memory_percent()
|
|
|
|
@staticmethod
|
|
def open_files(pid):
|
|
return psutil.Process(pid).open_files()
|
|
|
|
@staticmethod
|
|
def connections(pid):
|
|
return psutil.Process(pid).connections()
|
|
|
|
@staticmethod
|
|
def threads(pid):
|
|
return psutil.Process(pid).threads()
|
|
|
|
@staticmethod
|
|
def nice(pid):
|
|
return psutil.Process(pid).nice()
|
|
|
|
@staticmethod
|
|
def environment(pid):
|
|
return psutil.Process(pid).environ()
|
|
|
|
@staticmethod
|
|
def details():
|
|
return psutil.test()
|
|
|
|
class Sessions(object):
|
|
|
|
@staticmethod
|
|
def user_name():
|
|
arr = []
|
|
for user in psutil.users():
|
|
if str(user[0]) is not 'None' and user[0] not in arr:
|
|
arr.append(user[0])
|
|
return arr
|
|
|
|
@staticmethod
|
|
def user_details():
|
|
return psutil.users()
|
|
|
|
@staticmethod
|
|
def display(username):
|
|
system = System()
|
|
display = system.db_service.select_one_result('session', 'display', " username='{0}'".format(username))
|
|
return display
|
|
|
|
@staticmethod
|
|
def desktop(username):
|
|
system = System()
|
|
desktop = system.db_service.select_one_result('session', 'desktop', " username='{0}'".format(username))
|
|
return desktop
|
|
|
|
staticmethod
|
|
def userip(username):
|
|
system = System()
|
|
userip = system.db_service.select_one_result('session', 'ip', " username='{0}'".format(username))
|
|
return userip
|
|
|
|
@staticmethod
|
|
def user_home_path(username):
|
|
# TODO temp
|
|
return '/home/{0}/'.format(str(username))
|
|
|
|
class Os(object):
|
|
|
|
@staticmethod
|
|
def architecture():
|
|
return platform.architecture()[0]
|
|
|
|
@staticmethod
|
|
def boot_time():
|
|
return psutil.boot_time()
|
|
|
|
@staticmethod
|
|
def file_format():
|
|
return platform.architecture()[1]
|
|
|
|
@staticmethod
|
|
def name():
|
|
return platform.system()
|
|
|
|
@staticmethod
|
|
def distribution_name():
|
|
return platform.linux_distribution()[0]
|
|
|
|
@staticmethod
|
|
def distribution_version():
|
|
return platform.linux_distribution()[1]
|
|
|
|
@staticmethod
|
|
def distribution_id():
|
|
return platform.linux_distribution()[2]
|
|
|
|
@staticmethod
|
|
def version():
|
|
return platform.version()
|
|
|
|
@staticmethod
|
|
def kernel_release():
|
|
return platform.release()
|
|
|
|
@staticmethod
|
|
def hostname():
|
|
return platform.node()
|
|
|
|
class Hardware(object):
|
|
|
|
class BaseBoard(object):
|
|
|
|
@staticmethod
|
|
def manufacturer():
|
|
try:
|
|
result_code, p_out, p_err = Util.execute('dmidecode --string baseboard-manufacturer')
|
|
return int(result_code), str(p_out), str(p_err)
|
|
except:
|
|
raise
|
|
|
|
@staticmethod
|
|
def product_name():
|
|
try:
|
|
result_code, p_out, p_err = Util.execute('dmidecode --string baseboard-product-name')
|
|
return int(result_code), str(p_out), str(p_err)
|
|
except:
|
|
raise
|
|
|
|
@staticmethod
|
|
def version():
|
|
try:
|
|
result_code, p_out, p_err = Util.execute('dmidecode --string baseboard-version')
|
|
return int(result_code), str(p_out), str(p_err)
|
|
except:
|
|
raise
|
|
|
|
@staticmethod
|
|
def serial_number():
|
|
try:
|
|
result_code, p_out, p_err = Util.execute('dmidecode --string baseboard-serial-number')
|
|
return int(result_code), str(p_out), str(p_err)
|
|
except:
|
|
raise
|
|
|
|
@staticmethod
|
|
def asset_tag():
|
|
try:
|
|
result_code, p_out, p_err = Util.execute('dmidecode --string baseboard-asset-tag')
|
|
return int(result_code), str(p_out), str(p_err)
|
|
except:
|
|
raise
|
|
|
|
class Memory(object):
|
|
|
|
@staticmethod
|
|
def total():
|
|
return int(int(psutil.virtual_memory()[0]) / (1024 * 1024))
|
|
|
|
@staticmethod
|
|
def available():
|
|
return int(int(psutil.virtual_memory()[1]) / (1024 * 1024))
|
|
|
|
@staticmethod
|
|
def percent():
|
|
return psutil.virtual_memory()[2]
|
|
|
|
@staticmethod
|
|
def used():
|
|
return int(int(psutil.virtual_memory()[3]) / (1024 * 1024))
|
|
|
|
@staticmethod
|
|
def free():
|
|
return int(int(psutil.virtual_memory()[4]) / (1024 * 1024))
|
|
|
|
class Disk(object):
|
|
|
|
@staticmethod
|
|
def total():
|
|
return int(int(psutil.disk_usage('/')[0]) / (1024 * 1024))
|
|
|
|
@staticmethod
|
|
def used():
|
|
return int(int(psutil.disk_usage('/')[1]) / (1024 * 1024))
|
|
|
|
@staticmethod
|
|
def free():
|
|
return int(int(psutil.disk_usage('/')[2]) / (1024 * 1024))
|
|
|
|
@staticmethod
|
|
def percent():
|
|
return psutil.disk_usage('/')[3]
|
|
|
|
@staticmethod
|
|
def partitions():
|
|
return psutil.disk_partitions()
|
|
|
|
class Network(object):
|
|
|
|
@staticmethod
|
|
def interface_size():
|
|
return len(psutil.net_io_counters(pernic=True))
|
|
|
|
@staticmethod
|
|
def io_counter_detail():
|
|
return psutil.net_io_counters(pernic=True)
|
|
|
|
@staticmethod
|
|
def interfaces():
|
|
arr = []
|
|
for iface in psutil.net_io_counters(pernic=True):
|
|
arr.append(str(iface))
|
|
return arr
|
|
|
|
@staticmethod
|
|
def ip_addresses():
|
|
arr = []
|
|
for iface in netifaces.interfaces():
|
|
link = None
|
|
try:
|
|
link = netifaces.ifaddresses(iface)[netifaces.AF_INET][0]
|
|
except:
|
|
link = None
|
|
if link is not None:
|
|
ip = link['addr']
|
|
if re.match(r'^((\d{1,2}|1\d{2}|2[0-4]\d|25[0-5])\.){3}(\d{1,2}|1\d{2}|2[0-4]\d|25[0-5])$',
|
|
ip) and str(ip) != 'localhost' and str(ip) != '127.0.0.1':
|
|
arr.append(ip)
|
|
return arr
|
|
|
|
@staticmethod
|
|
def getHwAddr(ifname):
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', ifname[:15]))
|
|
return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
|
|
|
|
@staticmethod
|
|
def getHwAddr(ifname):
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', ifname[:15]))
|
|
return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
|
|
|
|
@staticmethod
|
|
def mac_addresses():
|
|
mac = get_mac()
|
|
':'.join(("%012X" % mac)[i:i + 2] for i in range(0, 12, 2))
|
|
arr = []
|
|
for iface in psutil.net_io_counters(pernic=True):
|
|
try:
|
|
addr_list = psutil.net_if_addrs()
|
|
mac = addr_list[str(iface)][2][1]
|
|
if re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", mac.lower()) and str(
|
|
mac) != '00:00:00:00:00:00':
|
|
arr.append(mac.lower())
|
|
except Exception as e:
|
|
pass
|
|
|
|
return arr
|
|
|
|
@staticmethod
|
|
def screen_info_json_obj(info):
|
|
label_list = [
|
|
# 'Identifier',
|
|
'ModelName', 'VendorName', 'Monitor Manufactured', 'DisplaySize',
|
|
# 'Gamma',
|
|
# 'Horizsync',
|
|
# 'VertRefresh'
|
|
]
|
|
data = dict()
|
|
|
|
for line in info.splitlines():
|
|
line = line.strip().replace('"', '')
|
|
intersection = list(set(line.split(' ')).intersection(label_list))
|
|
if len(intersection) > 0:
|
|
data[str(intersection[0])] = line.split(intersection[0])[1].strip()
|
|
|
|
return data
|
|
|
|
@staticmethod
|
|
def monitors():
|
|
edid_list = glob.glob('/sys/class/drm/*/edid')
|
|
|
|
monitor_list = list()
|
|
for edid in edid_list:
|
|
result_code, p_out, p_err = Util.execute('parse-edid < {0}'.format(edid))
|
|
|
|
if result_code == 0:
|
|
monitor_list.append(System.Hardware.screen_info_json_obj(p_out))
|
|
|
|
return monitor_list
|
|
|
|
@staticmethod
|
|
def screens():
|
|
result_code, p_out, p_err = Util.execute('xrandr')
|
|
arr = []
|
|
if result_code == 0:
|
|
for line in p_out.splitlines():
|
|
if len(list(set(line.split(' ')).intersection(['connected']))) > 0:
|
|
arr.append(line)
|
|
return arr
|
|
|
|
@staticmethod
|
|
def usb_devices():
|
|
result_code, p_out, p_err = Util.execute('lsusb')
|
|
arr = []
|
|
if result_code == 0:
|
|
for line in p_out.splitlines():
|
|
if ':' in line and 'Device 001' not in line.split(':')[0]:
|
|
arr.append(line)
|
|
return arr
|
|
|
|
@staticmethod
|
|
def printers():
|
|
result_code, p_out, p_err = Util.execute('lpstat -a')
|
|
arr = None
|
|
if result_code == 0:
|
|
arr = p_out.splitlines()
|
|
return arr
|
|
|
|
@staticmethod
|
|
def system_definitions():
|
|
result_code, p_out, p_err = Util.execute('dmidecode -t system')
|
|
arr = []
|
|
if result_code == 0:
|
|
for line in p_out.splitlines():
|
|
line = line.strip()
|
|
if len(list(set(line.split(' ')).intersection(['Manufacturer:', 'Product']))) > 0:
|
|
arr.append(line)
|
|
return arr
|
|
|
|
@staticmethod
|
|
def machine_model():
|
|
try:
|
|
result_code, p_out, p_err = Util.execute('sudo dmidecode --string system-version')
|
|
return str(p_out)
|
|
except:
|
|
raise
|
|
|
|
@staticmethod
|
|
def machine_type():
|
|
config = configparser.ConfigParser()
|
|
config._interpolation = configparser.ExtendedInterpolation()
|
|
config.read(System.Ahenk.config_path())
|
|
return config.get('MACHINE', 'type')
|
|
|
|
@staticmethod
|
|
def interfaces_details():
|
|
return psutil.net_if_addrs()
|
|
|
|
@staticmethod
|
|
def ip_addresses():
|
|
arr = []
|
|
for iface in psutil.net_io_counters(pernic=True):
|
|
ip = psutil.net_if_addrs()[str(iface)][0][1]
|
|
if re.match(r'^((\d{1,2}|1\d{2}|2[0-4]\d|25[0-5])\.){3}(\d{1,2}|1\d{2}|2[0-4]\d|25[0-5])$', ip) and str(
|
|
ip) != 'localhost' and str(ip) != '127.0.0.1':
|
|
arr.append(ip)
|
|
return arr
|
|
|
|
class Cpu(object):
|
|
|
|
@staticmethod
|
|
def times():
|
|
return psutil.cpu_times()
|
|
|
|
@staticmethod
|
|
def architecture():
|
|
return platform.processor()
|
|
|
|
@staticmethod
|
|
def physical_core_count():
|
|
return psutil.cpu_count(logical=False)
|
|
|
|
@staticmethod
|
|
def logical_core_count():
|
|
return psutil.cpu_count(logical=True)
|
|
|
|
@staticmethod
|
|
def stats():
|
|
return psutil.cpu_stats()
|
|
|
|
@staticmethod
|
|
def vendor():
|
|
return cpuinfo.get_cpu_info()['vendor_id']
|
|
|
|
@staticmethod
|
|
def brand():
|
|
return cpuinfo.get_cpu_info()['brand']
|
|
|
|
@staticmethod
|
|
def hz_advertised():
|
|
return cpuinfo.get_cpu_info()['hz_advertised']
|
|
|
|
@staticmethod
|
|
def hz_actual():
|
|
return cpuinfo.get_cpu_info()['hz_actual']
|
|
|
|
@staticmethod
|
|
def bit():
|
|
return cpuinfo.get_cpu_info()['bits']
|
|
|
|
@staticmethod
|
|
def family():
|
|
return cpuinfo.get_cpu_info()['family']
|
|
|
|
@staticmethod
|
|
def model():
|
|
return cpuinfo.get_cpu_info()['model']
|