#!/usr/bin/env python3 """ Fork bomb detector and deactvator :(){ :|:& };: while true; do bash -c ':(){ :|:& };: &'; done perl -MPOSIX -e '$pid=fork; exit if $pid; setid; fork while(1)' while true; do (sleep 9999 &); done perl -e "fork while fork" & echo '. ./_&. ./_'>_;. ./_ import os while(1): os.fork() """ import os from operator import itemgetter import argparse from time import sleep, time from signal import SIGKILL, SIGSTOP, SIGCONT self_pid = str(os.getpid()) with open('/proc/self/status') as file: status_list = file.readlines() status_names = [] for s in status_list: status_names.append(s.split(':')[0]) ppid_index = status_names.index('PPid') #################################################################################################### # FUNCS def rline1(path): """read 1st line from path.""" try: with open(path) as f: for line in f: return line[:-1] except FileNotFoundError: return '' except ProcessLookupError: return '' # имя через пид def pid_to_name(pid): try: with open('/proc/' + pid + '/status') as f: for line in f: return line[:-1].split('\t')[1] except FileNotFoundError: return '' except ProcessLookupError: return '' def pid_to_ppid(pid): try: with open('/proc/' + pid + '/status') as f: for n, line in enumerate(f): if n is ppid_index: return line.split('\t')[1].strip() except FileNotFoundError: return '' except ProcessLookupError: return '' def pid_to_cmdline(pid): try: with open('/proc/' + pid + '/cmdline') as file: try: return file.readlines()[0].replace('\x00', ' ').strip() except IndexError: return '' except FileNotFoundError: return '' except ProcessLookupError: return '' def pid_to_state(pid): try: return rline1('/proc/' + pid + '/stat').rpartition(')')[2][1] except FileNotFoundError: return '' except ProcessLookupError: return '' except IndexError: return '' def find_bomb_pattern(pid_set): print('find bomb pattern') t0 = time() name_dict = dict() for pid in pid_set: name = pid_to_name(pid) if name == '': continue if name not in name_dict: name_dict[name] = 1 else: name_dict[name] += 1 x = name_dict.items() y = sorted(x, key=itemgetter(1)) print(x) print(y) print(time() - t0) return y[-1][0] def stop_the_world1(bomb_name): # мониторим ресурсы быстро без слипа. нахуй сеты. с конца и до первого . О вариант: делаем слепок пидов. Словарь или хз. stop_counter = 0 for pid in os.listdir('/proc'): if pid[0].isdecimal() is False: continue name = pid_to_name(pid) if name == bomb_name: try: os.kill(int(pid), SIGSTOP) # со слипом это не поможет. Нужно детектить и убивать родителя. Родителя убивать безусловно. А бомбы именно морозить. except FileNotFoundError: pass except ProcessLookupError: pass # print('stop PID', pid) stop_counter += 1 print('total stopped:', stop_counter) return stop_counter def stop_the_world(bomb_name): print('Start stop the world', bomb_name) time0 = time() x0 = -1 while True: x = stop_the_world1(bomb_name) if x == x0: stop_the_world1(bomb_name) print('total stop time:', round(time() - time0, 2)) return None x0 = x def kill_stopped(bomb_name): print('Kill stopped procs') t0 = time() kill_counter = 0 for pid in os.listdir('/proc'): if pid[0].isdecimal() is False: continue name = pid_to_name(pid) try: state = pid_to_state(pid) except FileNotFoundError: state = '' except ProcessLookupError: state = '' ppid = pid_to_ppid(pid) if name == bomb_name and state == 'T' and pid != self_pid and pid != self_ppid and pid != self2_ppid: try: os.kill(int(pid), SIGKILL) except FileNotFoundError: pass except ProcessLookupError: pass kill_counter += 1 print('Number of killed procs:', kill_counter, '\nKill time:', round(time() - t0, 2)) def cont_stopped(bomb_name): print('Cont stopped procs') t0 = time() cont_counter = 0 for pid in os.listdir('/proc'): if pid[0].isdecimal() is False: continue state = pid_to_state(pid) name = pid_to_name(pid) if state == 'T' and name == bomb_name: try: os.kill(int(pid), SIGCONT) print(pid, name, ": CONT_THIS") except FileNotFoundError: pass except ProcessLookupError: pass cont_counter += 1 print('Cont procs:', cont_counter) #################################################################################################### print(self_pid, 'SELF PID', pid_to_name(self_pid)) self_ppid = pid_to_ppid(self_pid) print(self_ppid, 'SELF PPID', pid_to_name(self_ppid)) self2_ppid = pid_to_ppid(self_ppid) print(self2_ppid, 'SELF PPPID', pid_to_name(self2_ppid)) print('Fork bomb detecting started!') pid_set = set(os.listdir('/proc')) while True: sleep(1) #print(pid_set) new_set = set(os.listdir('/proc')) #print(new_set) delta = new_set - pid_set #print(delta) pid_set = new_set if len(delta) > 500: bomb_name = find_bomb_pattern(delta) stop_the_world(bomb_name) kill_stopped(bomb_name) cont_stopped(bomb_name)