当前位置: 首页 > news >正文

太极0.5

确保其在Windows系统上顺利运行,我们将进行以下改进:

完善GUI:确保GUI在启动时正常显示,并将运行情况显示在GUI中。
完善沙箱和蜜罐功能:确保沙箱和蜜罐的创建、监控和引爆功能更加完善。
远程地址监控:增加对远程地址的连续监控,并将选中的远程地址放入次级蜜罐进行监控。
项目目录结构:提供项目目录结构和相关文档。
项目目录结构
code
SecuritySystem/
├── src/
│ ├── main.py
│ ├── utils.py
│ ├── gui.py
│ ├── models/
│ │ └── taiji_model.pkl
│ ├── logs/
│ │ └── security_system.log
│ ├── temp/
│ │ └── sandbox_*.txt
│ ├── blacklist.conf
│ └── suspicious_activities_report.txt
└── README.md
README.md
markdown

安全监控系统

目录结构

SecuritySystem/
├── src/
│ ├── main.py
│ ├── utils.py
│ ├── gui.py
│ ├── models/
│ │ └── taiji_model.pkl
│ ├── logs/
│ │ └── security_system.log
│ ├── temp/
│ │ └── sandbox_*.txt
│ ├── blacklist.conf
│ └── suspicious_activities_report.txt
└── README.md

code

安装依赖

确保你已经安装了所有必要的库。可以在命令提示符或PowerShell中运行以下命令:

pip install scapy whois numpy scikit-learn tensorflow keras geopy Evtx psutil
运行项目
打开命令提示符或PowerShell。
导航到项目根目录 SecuritySystem。
运行主程序:
sh
python src/main.py
功能说明
实时监控:显示可疑活动日志。
日志查看:显示生成的报告。
沙箱管理:显示已创建的沙箱。
反击操作:手动对指定IP地址进行反击操作。
网络连接监控:显示当前网络连接信息,并允许用户选择连接进行进一步操作。
自适应防护:使用机器学习模型进行自适应防护。
代码结构
main.py:主程序入口,负责初始化和运行GUI。
utils.py:包含各种辅助函数,如日志分析、流量抓取、地理位置获取等。
gui.py:包含GUI相关的代码,负责界面的创建和事件处理。
models/taiji_model.pkl:存储训练好的机器学习模型。
logs/security_system.log:存储程序运行的日志。
temp/sandbox_*.txt:存储沙箱的相关信息。
blacklist.conf:存储黑名单IP地址。
suspicious_activities_report.txt:存储生成的可疑活动报告。
code

main.py

import os
import sys
import subprocess
import re
import datetime
import threading
import tkinter as tk
from tkinter import messagebox, simpledialog, ttk
import scapy.all as scapy
import whois
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
import json
import random
import socket
import pickle
from geopy.geocoders import Nominatim
from collections import defaultdict
import Evtx.Evtx as evtx
import tensorflow as tf
import psutil
import logging
import time# 配置日志
logging.basicConfig(filename=os.path.join('C:', 'temp', 'security_system.log'), level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')# 确保 TensorFlow 使用 GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:try:tf.config.experimental.set_memory_growth(gpus[0], True)except RuntimeError as e:print(e)logging.error(f"TensorFlow GPU configuration error: {e}")# 全局变量
suspicious_activities = []
packets = []
geolocator_cache = {}
whois_cache = {}
taiji_shield = None# 配置防火墙规则
def configure_firewall():print("配置防火墙规则...")logging.info("配置防火墙规则...")# Windows 防火墙规则配置示例subprocess.run(["netsh", "advfirewall", "set", "currentprofile", "state", "on"])# 阻断已知恶意 IP 地址known_malicious_ips = ["192.168.1.100", "10.0.0.1"]for ip in known_malicious_ips:subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=in", "action=block", "remoteip=" + ip])subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=out", "action=block", "remoteip=" + ip])# 读取和解析系统日志
def analyze_logs(log_file):print(f"分析日志文件 {log_file}...")logging.info(f"分析日志文件 {log_file}...")suspicious_activities = []try:with evtx.Evtx(log_file) as log:for record in log.records():xml = record.xml()if "IPTables-Input" in xml or "IPTables-Output" in xml:match = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', xml)if match:ip_address = match.group(1)timestamp = re.search(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', xml)if timestamp:timestamp = timestamp.group(0)suspicious_activities.append((timestamp, ip_address, xml.strip()))except Exception as e:print(f"分析日志文件时发生错误: {e}")logging.error(f"分析日志文件时发生错误: {e}")return suspicious_activities# 使用 Scapy 抓取特定端口的流量
def capture_traffic(interface, port):print(f"抓取 {interface} 上的 {port} 端口流量...")logging.info(f"抓取 {interface} 上的 {port} 端口流量...")packets = scapy.sniff(iface=interface, filter=f"port {port}", count=100)return packets# 获取入侵者地理位置
def get_geolocation(ip_address, geolocator_cache):if ip_address in geolocator_cache:return geolocator_cache[ip_address]try:geolocator = Nominatim(user_agent="security_system")location = geolocator.geocode(ip_address)if location:geolocator_cache[ip_address] = f"{location.city}, {location.country}"return geolocator_cache[ip_address]else:geolocator_cache[ip_address] = "未知位置"return "未知位置"except Exception as e:geolocator_cache[ip_address] = f"获取地理位置失败: {str(e)}"logging.error(f"获取地理位置失败: {e}")return geolocator_cache[ip_address]# 验证 IP 地址
def verify_ip(ip_address, whois_cache):if ip_address in whois_cache:return whois_cache[ip_address]try:w = whois.whois(ip_address)if w and w.get('nets'):whois_cache[ip_address] = w.nets[0].get('description', "未知描述")return whois_cache[ip_address]else:whois_cache[ip_address] = "未知描述"return "未知描述"except Exception as e:whois_cache[ip_address] = f"验证 IP 失败: {str(e)}"logging.error(f"验证 IP 失败: {e}")return whois_cache[ip_address]# 生成报告
def generate_report(suspicious_activities, report_file, geolocator_cache, whois_cache):print(f"生成报告到 {report_file}...")logging.info(f"生成报告到 {report_file}...")os.makedirs(os.path.dirname(report_file), exist_ok=True)with open(report_file, 'w') as file:file.write("可疑活动报告\n")file.write("=" * 30 + "\n")file.write(f"生成时间: {datetime.datetime.now()}\n")file.write("\n")file.write("时间戳\tIP 地址\t地理位置\t描述\t日志条目\n")file.write("-" * 80 + "\n")for activity in suspicious_activities:geolocation = get_geolocation(activity[1], geolocator_cache)description = verify_ip(activity[1], whois_cache)file.write(f"{activity[0]}\t{activity[1]}\t{geolocation}\t{description}\t{activity[2]}\n")# 生成沙箱环境
def create_sandbox(ip_address):print(f"为 IP 地址 {ip_address} 创建沙箱...")logging.info(f"为 IP 地址 {ip_address} 创建沙箱...")sandbox_dir = os.path.join('C:', 'temp', f'sandbox_{ip_address}')os.makedirs(sandbox_dir, exist_ok=True)# 模拟多线程和多核处理def simulate_computation():for i in range(1000000):passthreads = []for _ in range(2):thread = threading.Thread(target=simulate_computation)threads.append(thread)thread.start()for thread in threads:thread.join()with open(os.path.join(sandbox_dir, "README.txt"), 'w') as file:file.write(f"沙箱环境用于 IP 地址: {ip_address}\n")file.write("此目录被隔离以防止潜在威胁。\n")return sandbox_dir# 自适应防护机制(太极护盾)
class TaiJiShield:def __init__(self):self.isolation_forest = IsolationForest(contamination=0.01)self.one_class_svm = OneClassSVM(nu=0.01, kernel='rbf', gamma='scale')self.scaler = StandardScaler()self.data = []self.model_trained = Falseself.model_path = os.path.join('C:', 'temp', 'taiji_model.pkl')def train(self, new_data):self.data.extend(new_data)self.data = np.array(self.data)self.data = self.scaler.fit_transform(self.data)self.isolation_forest.fit(self.data)self.one_class_svm.fit(self.data)self.model_trained = Trueself.save_model()def predict(self, data):if not self.model_trained:return 1, 1data = np.array(data).reshape(1, -1)data = self.scaler.transform(data)isolation_forest_pred = self.isolation_forest.predict(data)one_class_svm_pred = self.one_class_svm.predict(data)return isolation_forest_pred[0], one_class_svm_pred[0]def save_model(self):model_data = {'isolation_forest': self.isolation_forest,'one_class_svm': self.one_class_svm,'scaler': self.scaler}with open(self.model_path, 'wb') as file:pickle.dump(model_data, file)def load_model(self):if os.path.exists(self.model_path):with open(self.model_path, 'rb') as file:model_data = pickle.load(file)self.isolation_forest = model_data['isolation_forest']self.one_class_svm = model_data['one_class_svm']self.scaler = model_data['scaler']self.model_trained = True# 动态黑名单
def update_blacklist(ip_address):blacklist_path = os.path.join('C:', 'temp', 'blacklist.conf')with open(blacklist_path, 'a') as file:file.write(f"{ip_address}\n")subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=in", "action=block", "remoteip=" + ip_address])subprocess.run(["netsh", "advfirewall", "firewall", "add", "rule", "name=BlockMaliciousIP", "dir=out", "action=block", "remoteip=" + ip_address])# 部署蜜罐
def deploy_honeypot(port):def handle_client(client_socket):client_socket.send(b"Welcome to the honeypot!")client_socket.close()server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.bind(('0.0.0.0', port))server.listen(5)print(f"Honeypot listening on port {port}")logging.info(f"Honeypot listening on port {port}")while True:client_socket, addr = server.accept(

http://www.mrgr.cn/news/56027.html

相关文章:

  • 如何将 Docker 镜像的 tar 文件迁移到另一台服务器并运行容器
  • 基于SSM大学校医院信息管理系统的设计
  • Unity中通过给定的顶点数组生成凸面体的方法参考
  • 第十八课:Python学习之多态
  • unity学习笔记-安装与部署
  • gbn,sr和tcp的区别
  • 如何开发电商平台?直播带货系统源码的核心技术解析
  • 基于SSM网络在线考试系统的设计
  • CentOS7上下载安装 Docker Compose
  • R语言机器学习算法实战系列(六)K-邻近算法 (K-Nearest Neighbors)
  • 解决:Cannot find bean with qualifier ‘xxx‘
  • GSM850分几个Channel,为什么这样分?
  • 多品牌NVR管理工具/设备EasyNVR多个NVR同时管理实现技术赋能车载监控行业
  • 大范围实景三维智能调色 | 模方自动化匀色解决方案
  • 【Linux】文件IO深度解析:文件描述符与重定向的奥秘
  • 人工智能的时代,用好这些AI工具,闲暇时间月入过万不是梦!
  • 【AIGC】ChatGPT提示词Prompt精确控制指南:Sam Altman与Ilya Sutskever的建议详解
  • 动态规划——石子合并问题
  • C++加密解密问题解惑答疑
  • 赢得3K下载!专为RAG打造的数据清洗利器
  • 【sshpass】sshpass安装使用
  • 企业文件怎么管控?这几个软件你一定要知道!
  • DBeaver + Oracle 数据库修改CLOB类型字段内容
  • 梦熊 CSP—S模拟赛 T2youyou不喜欢夏天
  • 蒙提霍尔问题
  • Claude Financial Data Analyst:基于Claude的金融数据分析工具!免费开源!