Use std library whenever possible

Do not blindly use external commands. Only use them as a last resort.

Signed-off-by: Cihangir Akturk <cihangir.akturk@tubitak.gov.tr>
This commit is contained in:
Cihangir Akturk 2018-08-30 20:18:27 +03:00 committed by Cihangir Akturk
parent f4419cfe50
commit a1d7aa3fd4
2 changed files with 86 additions and 15 deletions

View file

@ -14,6 +14,9 @@ from base.timer.setup_timer import SetupTimer
from base.timer.timer import Timer
from base.util.util import Util
from helper import system as sysx
import pwd
class Registration:
def __init__(self):
@ -185,18 +188,12 @@ class Registration:
System.Process.kill_by_pid(int(System.Ahenk.get_pid_number()))
def disable_local_users(self):
command_users = 'awk -F: \'{print $1 ":" $6 ":" $7}\' /etc/passwd | grep /bin/bash'
command_user_disable = 'passwd -l {}'
command_logout_user = 'pkill -u {}'
result_code, p_out, p_err = self.util.execute(command_users)
lines = p_out.split('\n')
lines.pop()
self.logger.debug("will be disabled: "+str(lines))
for line in lines:
detail = line.split(':')
if detail[0] != 'root':
self.util.execute(command_user_disable.format(detail[0]))
self.util.execute(command_logout_user.format(detail[0]))
self.logger.debug('{0} has been disabled and killed all processes for {0}'.format(detail[0]))
else:
self.logger.info("Ahenk has only root user")
passwd_cmd = 'passwd -l {}'
for p in pwd.getpwall():
if not sysx.shell_is_interactive(p.pw_shell):
continue
if p.pw_uid == 0:
continue
self.logger.debug("User: '{0}' will be disabled".format(p.pw_name))
self.util.execute(passwd_cmd.format(p.pw_name))
sysx.killuserprocs(p.pw_uid)

74
src/helper/system.py Normal file
View file

@ -0,0 +1,74 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os, signal
class ProcEntry(object):
def __init__(self, name, pid, cmdline, euid, egid):
super(ProcEntry, self).__init__()
self.name = name
self.pid = pid
self.cmdline = cmdline
self.euid = euid
self.egid = egid
def __str__(self):
return 'name: {0}, pid: {1}, uid: {2}, gid: {3}, cmd: {4}' \
.format(self.name, self.pid, self.euid, self.egid, self.cmdline)
class ProcParseError(Exception):
def __init__(self, msg):
super(Exception, self).__init__(msg)
def proclist():
def raise_if_less(l, n):
if len(l) < n:
raise ProcParseError('too few fields, expected at least ' + str(n))
for pid in [pid for pid in os.listdir('/proc') if pid.isdigit()]:
p = os.path.join('/proc', pid, 'cmdline')
cmdline = open(p).read()
p = os.path.join('/proc', pid, 'status')
euid = None
egid = None
name = None
for lin in open(p):
if lin.startswith('Name:'):
s = lin.split()
raise_if_less(s, 2)
name = s[1]
elif lin.startswith('Uid:'):
uid_line = lin.split()
raise_if_less(uid_line, 3)
euid = int(uid_line[2])
elif lin.startswith('Gid:'):
gid_line = lin.split()
raise_if_less(gid_line, 3)
egid = int(gid_line[2])
yield ProcEntry(name, int(pid), cmdline, euid, egid)
PATH_SHELLS='/etc/shells'
def login_shells():
valid = lambda s: s.rstrip(' \n') and not s.lstrip(' \t').startswith('#')
return [lin.rstrip('\n') for lin in open(PATH_SHELLS).readlines()
if valid(lin)]
def shell_is_interactive(sh):
shells = ['sh', 'bash', 'dash', 'zsh', 'fish', 'ksh', 'csh', 'tcsh']
return any(s == os.path.basename(sh) for s in shells)
def killuserprocs(uid):
for p in proclist():
if p.euid == uid:
try:
os.kill(p.pid, signal.SIGTERM)
except ProcessLookupError as e:
# The process might have died immediately, up till now, even
# before we had a chance to send a signal to it.
pass