太极0.5
确保其在Windows系统上顺利运行,我们将进行以下改进:
完善GUI:确保GUI在启动时正常显示,并将运行情况显示在GUI中。
完善沙箱和蜜罐功能:确保沙箱和蜜罐的创建、监控和引爆功能更加完善。
远程地址监控:增加对远程地址的连续监控,并将选中的远程地址放入次级蜜罐进行监控。
项目目录结构:提供项目目录结构和相关文档。
项目目录结构
code
SecuritySystem/
├── src/
│ ├── main.py
│ ├── utils.py
│ ├── gui.py
│ ├── models/
│ │ └── taiji_model.pkl
│ ├── logs/
│ │ └── security_system.log
│ ├── temp/
│ │ └── sandbox_*.txt
│ ├── blacklist.conf
│ └── suspicious_activities_report.txt
└── README.md
README.md
markdown
安全监控系统
目录结构
SecuritySystem/
├── src/
│ ├── main.py
│ ├── utils.py
│ ├── gui.py
│ ├── models/
│ │ └── taiji_model.pkl
│ ├── logs/
│ │ └── security_system.log
│ ├── temp/
│ │ └── sandbox_*.txt
│ ├── blacklist.conf
│ └── suspicious_activities_report.txt
└── README.md
code
安装依赖
确保你已经安装了所有必要的库。可以在命令提示符或PowerShell中运行以下命令:
pip install scapy whois numpy scikit-learn tensorflow keras geopy Evtx psutil
运行项目
打开命令提示符或PowerShell。
导航到项目根目录 SecuritySystem。
运行主程序:
sh
python src/main.py
功能说明
实时监控:显示可疑活动日志。
日志查看:显示生成的报告。
沙箱管理:显示已创建的沙箱。
反击操作:手动对指定IP地址进行反击操作。
网络连接监控:显示当前网络连接信息,并允许用户选择连接进行进一步操作。
自适应防护:使用机器学习模型进行自适应防护。
代码结构
main.py:主程序入口,负责初始化和运行GUI。
utils.py:包含各种辅助函数,如日志分析、流量抓取、地理位置获取等。
gui.py:包含GUI相关的代码,负责界面的创建和事件处理。
models/taiji_model.pkl:存储训练好的机器学习模型。
logs/security_system.log:存储程序运行的日志。
temp/sandbox_*.txt:存储沙箱的相关信息。
blacklist.conf:存储黑名单IP地址。
suspicious_activities_report.txt:存储生成的可疑活动报告。
code
main.py
import os
import sys
import subprocess
import re
import datetime
import threading
import tkinter as tk
from tkinter import messagebox, simpledialog, ttk
import scapy.all as scapy
import whois
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
import json
import random
import socket
import pickle
from geopy.geocoders import Nominatim
from collections import defaultdict
import Evtx.Evtx as evtx
import tensorflow as tf
import psutil
import logging
import time# 配置日志
logging.basicConfig(filename=os.path.join('C:', 'temp', 'security_system.log'), level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')# 确保 TensorFlow 使用 GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:try:tf.config.experimental.set_memory_growth(gpus[0], True)except RuntimeError as e:print(e)logging.error(f"TensorFlow GPU configuration error: {e}")# 全局变量
suspicious_activities = []
packets = []
geolocator_cache = {}
whois_cache = {}
taiji_shield = None# 配置防火墙规则
def configure_firewall():print("配置防火墙规则...")logging.info("配置防火墙规则...")# Windows 防火墙规则配置示例subprocess.run(["netsh", "advfirewall", "set", "currentprofile", "state", "on"])# 阻断已知恶意 IP 地址known_malicious_ips = ["192.168.1.100", "10.0.0.1"]for ip in known_malicious_ips:subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=in", "action=block", "remoteip=" + ip])subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=out", "action=block", "remoteip=" + ip])# 读取和解析系统日志
def analyze_logs(log_file):print(f"分析日志文件 {log_file}...")logging.info(f"分析日志文件 {log_file}...")suspicious_activities = []try:with evtx.Evtx(log_file) as log:for record in log.records():xml = record.xml()if "IPTables-Input" in xml or "IPTables-Output" in xml:match = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', xml)if match:ip_address = match.group(1)timestamp = re.search(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', xml)if timestamp:timestamp = timestamp.group(0)suspicious_activities.append((timestamp, ip_address, xml.strip()))except Exception as e:print(f"分析日志文件时发生错误: {e}")logging.error(f"分析日志文件时发生错误: {e}")return suspicious_activities# 使用 Scapy 抓取特定端口的流量
def capture_traffic(interface, port):print(f"抓取 {interface} 上的 {port} 端口流量...")logging.info(f"抓取 {interface} 上的 {port} 端口流量...")packets = scapy.sniff(iface=interface, filter=f"port {port}", count=100)return packets# 获取入侵者地理位置
def get_geolocation(ip_address, geolocator_cache):if ip_address in geolocator_cache:return geolocator_cache[ip_address]try:geolocator = Nominatim(user_agent="security_system")location = geolocator.geocode(ip_address)if location:geolocator_cache[ip_address] = f"{location.city}, {location.country}"return geolocator_cache[ip_address]else:geolocator_cache[ip_address] = "未知位置"return "未知位置"except Exception as e:geolocator_cache[ip_address] = f"获取地理位置失败: {str(e)}"logging.error(f"获取地理位置失败: {e}")return geolocator_cache[ip_address]# 验证 IP 地址
def verify_ip(ip_address, whois_cache):if ip_address in whois_cache:return whois_cache[ip_address]try:w = whois.whois(ip_address)if w and w.get('nets'):whois_cache[ip_address] = w.nets[0].get('description', "未知描述")return whois_cache[ip_address]else:whois_cache[ip_address] = "未知描述"return "未知描述"except Exception as e:whois_cache[ip_address] = f"验证 IP 失败: {str(e)}"logging.error(f"验证 IP 失败: {e}")return whois_cache[ip_address]# 生成报告
def generate_report(suspicious_activities, report_file, geolocator_cache, whois_cache):print(f"生成报告到 {report_file}...")logging.info(f"生成报告到 {report_file}...")os.makedirs(os.path.dirname(report_file), exist_ok=True)with open(report_file, 'w') as file:file.write("可疑活动报告\n")file.write("=" * 30 + "\n")file.write(f"生成时间: {datetime.datetime.now()}\n")file.write("\n")file.write("时间戳\tIP 地址\t地理位置\t描述\t日志条目\n")file.write("-" * 80 + "\n")for activity in suspicious_activities:geolocation = get_geolocation(activity[1], geolocator_cache)description = verify_ip(activity[1], whois_cache)file.write(f"{activity[0]}\t{activity[1]}\t{geolocation}\t{description}\t{activity[2]}\n")# 生成沙箱环境
def create_sandbox(ip_address):print(f"为 IP 地址 {ip_address} 创建沙箱...")logging.info(f"为 IP 地址 {ip_address} 创建沙箱...")sandbox_dir = os.path.join('C:', 'temp', f'sandbox_{ip_address}')os.makedirs(sandbox_dir, exist_ok=True)# 模拟多线程和多核处理def simulate_computation():for i in range(1000000):passthreads = []for _ in range(2):thread = threading.Thread(target=simulate_computation)threads.append(thread)thread.start()for thread in threads:thread.join()with open(os.path.join(sandbox_dir, "README.txt"), 'w') as file:file.write(f"沙箱环境用于 IP 地址: {ip_address}\n")file.write("此目录被隔离以防止潜在威胁。\n")return sandbox_dir# 自适应防护机制(太极护盾)
class TaiJiShield:def __init__(self):self.isolation_forest = IsolationForest(contamination=0.01)self.one_class_svm = OneClassSVM(nu=0.01, kernel='rbf', gamma='scale')self.scaler = StandardScaler()self.data = []self.model_trained = Falseself.model_path = os.path.join('C:', 'temp', 'taiji_model.pkl')def train(self, new_data):self.data.extend(new_data)self.data = np.array(self.data)self.data = self.scaler.fit_transform(self.data)self.isolation_forest.fit(self.data)self.one_class_svm.fit(self.data)self.model_trained = Trueself.save_model()def predict(self, data):if not self.model_trained:return 1, 1data = np.array(data).reshape(1, -1)data = self.scaler.transform(data)isolation_forest_pred = self.isolation_forest.predict(data)one_class_svm_pred = self.one_class_svm.predict(data)return isolation_forest_pred[0], one_class_svm_pred[0]def save_model(self):model_data = {'isolation_forest': self.isolation_forest,'one_class_svm': self.one_class_svm,'scaler': self.scaler}with open(self.model_path, 'wb') as file:pickle.dump(model_data, file)def load_model(self):if os.path.exists(self.model_path):with open(self.model_path, 'rb') as file:model_data = pickle.load(file)self.isolation_forest = model_data['isolation_forest']self.one_class_svm = model_data['one_class_svm']self.scaler = model_data['scaler']self.model_trained = True# 动态黑名单
def update_blacklist(ip_address):blacklist_path = os.path.join('C:', 'temp', 'blacklist.conf')with open(blacklist_path, 'a') as file:file.write(f"{ip_address}\n")subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=in", "action=block", "remoteip=" + ip_address])subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=out", "action=block", "remoteip=" + ip_address])# 部署蜜罐
def deploy_honeypot(port):def handle_client(client_socket):client_socket.send(b"Welcome to the honeypot!")client_socket.close()server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.bind(('0.0.0.0', port))server.listen(5)print(f"Honeypot listening on port {port}")logging.info(f"Honeypot listening on port {port}")while True:client_socket, addr = server.accept(