import matplotlib
import matplotlib.pyplot as plt
import re
import json
import random
from collections import Counter
from datetime import datetime
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
# 优化字体配置,使用 Windows 系统更通用的字体
plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei", "SimSun", "Arial"]
# 设置字体查找的回退机制
plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题
matplotlib.use('Agg') # 使用非交互式后端
class ApacheLogAnalyzer:
def __init__(self, log_file_path=None):
self.log_file_path = log_file_path
self.logs = []
self.parsed_logs = []
self.hourly_traffic = None
self.ip_counts = None
self.status_code_counts = None
self.request_method_counts = None
self.most_requested_paths = None
self.analysis_report = "" # 存储文字分析报告
def load_logs(self):
try:
with open(self.log_file_path, 'r', encoding='utf-8') as file:
self.logs = file.readlines()
print(f"成功加载 {len(self.logs)} 条日志记录")
return True
except Exception as e:
print(f"加载日志文件失败: {str(e)}")
# 加载失败时使用模拟数据
self._generate_sample_logs()
return False
def _generate_sample_logs(self):
"""当无法加载实际日志时,生成模拟日志数据"""
print("正在生成模拟日志数据...")
sample_logs = []
sample_ips = ["192.168.1.1", "10.0.0.1", "172.16.0.1", "192.168.0.1", "10.10.10.1"]
sample_paths = ["/index.html", "/about.html", "/contact.html", "/products.html", "/blog/post1.html"]
sample_methods = ["GET", "POST", "PUT", "DELETE"]
sample_status = [200, 404, 500, 301, 403]
# 生成24小时的模拟数据
for hour in range(24):
# 每个小时生成随机数量的请求
requests_count = random.randint(10, 50)
for _ in range(requests_count):
ip = random.choice(sample_ips)
method = random.choice(sample_methods)
path = random.choice(sample_paths)
status = random.choice(sample_status)
size = random.randint(100, 5000)
# 构建模拟日志行
log_line = f"{ip} - - [{hour}:{random.randint(0,59)}:{random.randint(0,59)} +0000] "
log_line += f'"{method} {path} HTTP/1.1" {status} {size}'
sample_logs.append(log_line)
self.logs = sample_logs
print(f"生成了 {len(self.logs)} 条模拟日志")
def parse_logs(self):
# 优化的正则表达式,处理各种可能的日志格式
log_pattern = re.compile(r'(\S+) - - \[(.*?)\] "(.*?)" (\d+) (\S+)')
self.parsed_logs = []
for log in self.logs:
match = log_pattern.match(log)
if match:
ip, timestamp_str, request, status_code, response_size = match.groups()
# 解析请求方法和路径
request_parts = request.split()
if len(request_parts) >= 2:
method = request_parts[0]
path = request_parts[1]
else:
method = "UNKNOWN"
path = request
# 解析时间戳
try:
# 处理常见的Apache日志时间格式
timestamp = datetime.strptime(timestamp_str.split()[0], '%d/%b/%Y:%H:%M:%S')
hour = f"{timestamp.hour:02d}:00"
except:
hour = "unknown"
# 处理响应大小
try:
response_size = int(response_size) if response_size != '-' else 0
except:
response_size = 0
self.parsed_logs.append({
'ip': ip,
'timestamp': timestamp_str,
'hour': hour,
'request': request,
'method': method,
'path': path,
'status_code': int(status_code),
'response_size': response_size
})
print(f"成功解析 {len(self.parsed_logs)} 条日志")
return len(self.parsed_logs) > 0
def analyze_traffic_by_hour(self):
hourly_traffic = Counter()
# 确保有0-23时的所有数据点,即使没有访问
for hour in range(24):
hourly_traffic[f"{hour:02d}:00"] = 0
for log in self.parsed_logs:
if log['hour'] != "unknown":
hourly_traffic[log['hour']] += 1
self.hourly_traffic = hourly_traffic
return hourly_traffic
def analyze_ip_addresses(self, top_n=10):
ip_counter = Counter(log['ip'] for log in self.parsed_logs)
self.ip_counts = ip_counter.most_common(top_n)
return self.ip_counts
def analyze_status_codes(self):
status_counter = Counter(log['status_code'] for log in self.parsed_logs)
self.status_code_counts = status_counter
return status_counter
def analyze_request_methods(self):
method_counter = Counter(log['method'] for log in self.parsed_logs)
self.request_method_counts = method_counter
return method_counter
def analyze_requested_paths(self, top_n=10):
path_counter = Counter(log['path'] for log in self.parsed_logs)
self.most_requested_paths = path_counter.most_common(top_n)
return self.most_requested_paths
def generate_hourly_traffic_chart(self, hourly_traffic=None):
# 如果没有提供hourly_traffic,使用类实例的属性或生成示例数据
if hourly_traffic is None:
if self.hourly_traffic:
hourly_traffic = self.hourly_traffic
else:
print("警告: 没有找到小时流量数据,使用示例数据生成图表")
# 生成24小时的示例数据
hours = [f'{h:02d}:00' for h in range(24)]
hourly_traffic = {hour: random.randint(10, 100) for hour in hours}
# 确保数据是按小时顺序排序的
sorted_hours = sorted(hourly_traffic.keys())
traffic_values = [hourly_traffic[hour] for hour in sorted_hours]
plt.figure(figsize=(12, 6))
plt.bar(sorted_hours, traffic_values, color='skyblue')
plt.title('每小时访问量趋势', fontsize=16)
plt.xlabel('小时', fontsize=12)
plt.ylabel('访问次数', fontsize=12)
plt.xticks(rotation=45)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.savefig('hourly_traffic.png', dpi=300, bbox_inches='tight')
print("已生成每小时访问量趋势图: hourly_traffic.png")
def generate_ip_address_chart(self):
if not self.ip_counts:
print("警告: 没有IP地址分析数据,使用示例数据生成图表")
# 生成示例IP数据
self.ip_counts = [(f"192.168.1.{i}", random.randint(50, 200)) for i in range(1, 11)]
ips, counts = zip(*self.ip_counts)
plt.figure(figsize=(12, 6))
plt.barh(ips, counts, color='lightgreen')
plt.title('访问量最多的IP地址', fontsize=16)
plt.xlabel('访问次数', fontsize=12)
plt.ylabel('IP地址', fontsize=12)
plt.tight_layout()
plt.savefig('top_ip_addresses.png', dpi=300, bbox_inches='tight')
print("已生成IP地址分布图: top_ip_addresses.png")
def generate_status_code_chart(self):
if not self.status_code_counts:
print("警告: 没有状态码分析数据,使用示例数据生成图表")
# 生成示例状态码数据
self.status_code_counts = {200: random.randint(1000, 5000),
404: random.randint(100, 500),
500: random.randint(10, 100),
301: random.randint(50, 200),
403: random.randint(20, 80)}
status_codes = list(self.status_code_counts.keys())
counts = list(self.status_code_counts.values())
plt.figure(figsize=(10, 6))
plt.pie(counts, labels=status_codes, autopct='%1.1f%%', startangle=90)
plt.title('HTTP状态码分布', fontsize=16)
plt.axis('equal')
plt.tight_layout()
plt.savefig('status_code_distribution.png', dpi=300, bbox_inches='tight')
print("已生成状态码分布图: status_code_distribution.png")
def generate_request_method_chart(self):
if not self.request_method_counts:
print("警告: 没有请求方法分析数据,使用示例数据生成图表")
# 生成示例请求方法数据
self.request_method_counts = {"GET": random.randint(1000, 5000),
"POST": random.randint(500, 2000),
"PUT": random.randint(100, 500),
"DELETE": random.randint(50, 200)}
methods = list(self.request_method_counts.keys())
counts = list(self.request_method_counts.values())
plt.figure(figsize=(10, 6))
plt.bar(methods, counts, color='lightcoral')
plt.title('HTTP请求方法分布', fontsize=16)
plt.xlabel('请求方法', fontsize=12)
plt.ylabel('请求次数', fontsize=12)
plt.tight_layout()
plt.savefig('request_method_distribution.png', dpi=300, bbox_inches='tight')
print("已生成请求方法分布图: request_method_distribution.png")
def generate_requested_paths_chart(self):
if not self.most_requested_paths:
print("警告: 没有请求路径分析数据,使用示例数据生成图表")
# 生成示例请求路径数据
paths = ["/index.html", "/about.html", "/contact.html", "/products.html", "/blog/"]
self.most_requested_paths = [(path, random.randint(100, 1000)) for path in paths]
paths, counts = zip(*self.most_requested_paths)
# 截断过长的路径以便显示
truncated_paths = [path[:30] + '...' if len(path) > 30 else path for path in paths]
plt.figure(figsize=(12, 6))
plt.barh(truncated_paths, counts, color='lightblue')
plt.title('访问量最多的页面路径', fontsize=16)
plt.xlabel('访问次数', fontsize=12)
plt.ylabel('页面路径', fontsize=12)
plt.tight_layout()
plt.savefig('most_requested_paths.png', dpi=300, bbox_inches='tight')
print("已生成页面访问分布图: most_requested_paths.png")
def save_analysis_results(self):
results = {
'total_logs': len(self.logs),
'parsed_logs': len(self.parsed_logs),
'hourly_traffic': dict(self.hourly_traffic) if self.hourly_traffic else {},
'top_ip_addresses': dict(self.ip_counts) if self.ip_counts else {},
'status_code_distribution': dict(self.status_code_counts) if self.status_code_counts else {},
'request_method_distribution': dict(self.request_method_counts) if self.request_method_counts else {},
'most_requested_paths': dict(self.most_requested_paths) if self.most_requested_paths else {}
}
with open('analysis_results.json', 'w', encoding='utf-8') as file:
json.dump(results, file, ensure_ascii=False, indent=2)
print("分析结果已保存到 analysis_results.json")
def generate_text_report(self):
"""生成文字形式的分析报告"""
report = ["====== Apache日志分析报告 ======"]
# 基本统计
report.append(f"\n1. 基本统计")
report.append(f" - 总日志条数: {len(self.logs)}")
report.append(f" - 成功解析条数: {len(self.parsed_logs)}")
report.append(f" - 解析率: {len(self.parsed_logs)/len(self.logs)*100:.2f}%" if self.logs else " - 解析率: 0%")
# 流量分析
if self.hourly_traffic:
total_requests = sum(self.hourly_traffic.values())
peak_hour = max(self.hourly_traffic.items(), key=lambda x: x[1])
quiet_hour = min(self.hourly_traffic.items(), key=lambda x: x[1])
report.append(f"\n2. 流量分析")
report.append(f" - 总请求数: {total_requests}")
report.append(f" - 峰值时段: {peak_hour[0]} ({peak_hour[1]}次请求)")
report.append(f" - 低谷时段: {quiet_hour[0]} ({quiet_hour[1]}次请求)")
# 计算每小时平均请求数
avg_requests_per_hour = total_requests / 24 if total_requests else 0
report.append(f" - 每小时平均请求数: {avg_requests_per_hour:.2f}")
# IP地址分析
if self.ip_counts:
report.append(f"\n3. IP地址分析")
report.append(f" - 访问量最多的5个IP地址:")
for ip, count in self.ip_counts[:5]:
report.append(f" * {ip}: {count}次访问")
# 计算IP多样性 (唯一IP数量)
unique_ips = len(set(log['ip'] for log in self.parsed_logs)) if self.parsed_logs else 0
report.append(f" - 唯一IP地址数量: {unique_ips}")
# HTTP状态码分析
if self.status_code_counts:
total_codes = sum(self.status_code_counts.values())
report.append(f"\n4. HTTP状态码分析")
# 按状态码类别分组
status_categories = {
'2xx成功': sum(count for code, count in self.status_code_counts.items() if 200 <= code < 300),
'3xx重定向': sum(count for code, count in self.status_code_counts.items() if 300 <= code < 400),
'4xx客户端错误': sum(count for code, count in self.status_code_counts.items() if 400 <= code < 500),
'5xx服务器错误': sum(count for code, count in self.status_code_counts.items() if 500 <= code < 600)
}
for category, count in status_categories.items():
if count > 0:
percentage = count / total_codes * 100
report.append(f" - {category}: {count}次 ({percentage:.2f}%)")
# 列出常见状态码
common_codes = [code for code, count in self.status_code_counts.items() if count > 0]
if common_codes:
report.append(f" - 出现的状态码: {', '.join(map(str, common_codes))}")
# 请求方法分析
if self.request_method_counts:
total_methods = sum(self.request_method_counts.values())
report.append(f"\n5. HTTP请求方法分析")
for method, count in sorted(self.request_method_counts.items(), key=lambda x: x[1], reverse=True):
percentage = count / total_methods * 100
report.append(f" - {method}: {count}次 ({percentage:.2f}%)")
# 请求路径分析
if self.most_requested_paths:
report.append(f"\n6. 页面访问分析")
report.append(f" - 访问量最多的5个页面:")
for path, count in self.most_requested_paths[:5]:
# 截断过长的路径
display_path = path[:50] + '...' if len(path) > 50 else path
report.append(f" * {display_path}: {count}次访问")
# 异常检测
report.append("\n7. 异常检测")
# 检查404错误过多的情况
if self.status_code_counts and self.status_code_counts.get(404, 0) > len(self.parsed_logs) * 0.1:
report.append(f" ! 警告: 404错误占比过高 ({self.status_code_counts[404]/len(self.parsed_logs)*100:.2f}%),可能存在大量无效链接")
else:
report.append(f" - 404错误比例正常")
# 检查5xx错误
if self.status_code_counts:
server_errors = sum(count for code, count in self.status_code_counts.items() if 500 <= code < 600)
if server_errors > 0:
report.append(f" ! 警告: 发现{server_errors}次服务器错误(5xx),需要检查服务器健康状况")
else:
report.append(f" - 未发现服务器错误(5xx)")
report.append("\n====== 分析报告结束 ======")
# 保存报告
self.analysis_report = "\n".join(report)
# 写入文件
with open('analysis_report.txt', 'w', encoding='utf-8') as file:
file.write(self.analysis_report)
print("分析报告已保存到 analysis_report.txt")
return self.analysis_report
def run_full_analysis(self):
print("开始日志分析...")
# 加载日志
if not self.load_logs():
print("使用示例数据继续分析")
# 解析日志
if not self.parse_logs():
print("日志解析失败,使用预生成的示例数据")
# 设置一些示例数据以便生成图表
self._setup_sample_analysis_data()
# 执行各项分析
hourly_traffic = self.analyze_traffic_by_hour()
self.analyze_ip_addresses()
self.analyze_status_codes()
self.analyze_request_methods()
self.analyze_requested_paths()
# 生成所有图表
self.generate_hourly_traffic_chart(hourly_traffic)
self.generate_ip_address_chart()
self.generate_status_code_chart()
self.generate_request_method_chart()
self.generate_requested_paths_chart()
# 生成文字分析报告
self.generate_text_report()
# 保存分析结果
self.save_analysis_results()
print("日志分析完成!")
def _setup_sample_analysis_data(self):
"""设置示例分析数据,确保图表能够生成"""
# 示例小时流量数据
hours = [f'{h:02d}:00' for h in range(24)]
self.hourly_traffic = {hour: random.randint(10, 100) for hour in hours}
# 示例IP数据
self.ip_counts = [(f"192.168.1.{i}", random.randint(50, 200)) for i in range(1, 11)]
# 示例状态码数据
self.status_code_counts = {200: random.randint(1000, 5000),
404: random.randint(100, 500),
500: random.randint(10, 100),
301: random.randint(50, 200),
403: random.randint(20, 80)}
# 示例请求方法数据
self.request_method_counts = {"GET": random.randint(1000, 5000),
"POST": random.randint(500, 2000),
"PUT": random.randint(100, 500),
"DELETE": random.randint(50, 200)}
# 示例请求路径数据
paths = ["/index.html", "/about.html", "/contact.html", "/products.html", "/blog/"]
self.most_requested_paths = [(path, random.randint(100, 1000)) for path in paths]
class LogAnalyzerGUI:
def __init__(self, root):
self.root = root
self.root.title("Apache日志分析工具")
self.root.geometry("800x600") # 增大窗口尺寸以容纳更多内容
# 设置中文字体
self.style = ttk.Style()
self.style.configure("TButton", font=('SimHei', 10))
self.style.configure("TLabel", font=('SimHei', 10))
self.style.configure("TText", font=('SimHei', 10))
self.log_file_path = None
self.analyzer = None
self.create_widgets()
def create_widgets(self):
# 创建顶部框架用于选择文件
top_frame = ttk.Frame(self.root, padding="10")
top_frame.pack(fill=tk.X)
self.file_label = ttk.Label(top_frame, text="未选择日志文件")
self.file_label.pack(side=tk.LEFT, padx=(0, 10))
select_file_btn = ttk.Button(top_frame, text="选择日志文件", command=self.select_log_file)
select_file_btn.pack(side=tk.LEFT)
# 创建中间框架用于分析按钮
middle_frame = ttk.Frame(self.root, padding="10")
middle_frame.pack(fill=tk.X)
analyze_btn = ttk.Button(middle_frame, text="开始分析", command=self.start_analysis)
analyze_btn.pack(fill=tk.X)
# 创建结果标签页
self.notebook = ttk.Notebook(self.root)
self.notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# 创建日志输出标签页
log_frame = ttk.Frame(self.notebook)
self.notebook.add(log_frame, text="操作日志")
self.log_text = tk.Text(log_frame, wrap=tk.WORD, height=15)
self.log_text.pack(fill=tk.BOTH, expand=True)
# 添加滚动条到日志文本框
log_scrollbar = ttk.Scrollbar(self.log_text, command=self.log_text.yview)
log_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.log_text.config(yscrollcommand=log_scrollbar.set)
# 创建分析报告标签页
report_frame = ttk.Frame(self.notebook)
self.notebook.add(report_frame, text="分析报告")
self.report_text = tk.Text(report_frame, wrap=tk.WORD, height=15)
self.report_text.pack(fill=tk.BOTH, expand=True)
# 添加滚动条到报告文本框
report_scrollbar = ttk.Scrollbar(self.report_text, command=self.report_text.yview)
report_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.report_text.config(yscrollcommand=report_scrollbar.set)
# 重定向stdout到日志文本框
import sys
sys.stdout = TextRedirector(self.log_text, "stdout")
def select_log_file(self):
file_path = filedialog.askopenfilename(
title="选择Apache日志文件",
filetypes=[("日志文件", "*.log"), ("所有文件", "*.*")]
)
if file_path:
self.log_file_path = file_path
self.file_label.config(text=file_path)
messagebox.showinfo("文件选择", f"已选择文件: {file_path}")
def start_analysis(self):
if not self.log_file_path:
# 如果没有选择文件,询问是否使用示例数据
if messagebox.askyesno("无文件选择", "未选择日志文件,是否使用示例数据进行分析?"):
self.analyzer = ApacheLogAnalyzer()
self.log_text.delete(1.0, tk.END)
self.report_text.delete(1.0, tk.END)
self.analyzer.run_full_analysis()
# 显示分析报告
self.display_analysis_report()
messagebox.showinfo("分析完成", "使用示例数据的日志分析已完成!")
else:
try:
self.analyzer = ApacheLogAnalyzer(self.log_file_path)
self.log_text.delete(1.0, tk.END)
self.report_text.delete(1.0, tk.END)
self.analyzer.run_full_analysis()
# 显示分析报告
self.display_analysis_report()
messagebox.showinfo("分析完成", "日志分析已完成!")
except Exception as e:
messagebox.showerror("分析错误", f"分析过程中出现错误: {str(e)}")
def display_analysis_report(self):
"""在GUI中显示文字分析报告"""
if self.analyzer and self.analyzer.analysis_report:
self.report_text.configure(state="normal")
self.report_text.delete(1.0, tk.END)
self.report_text.insert(tk.END, self.analyzer.analysis_report)
self.report_text.configure(state="disabled")
else:
self.report_text.configure(state="normal")
self.report_text.insert(tk.END, "无法显示分析报告: 没有找到报告数据。")
self.report_text.configure(state="disabled")
class TextRedirector:
def __init__(self, text_widget, tag="stdout"):
self.text_widget = text_widget
self.tag = tag
def write(self, string):
self.text_widget.configure(state="normal")
self.text_widget.insert(tk.END, string)
self.text_widget.see(tk.END)
self.text_widget.configure(state="disabled")
def flush(self):
pass
if __name__ == "__main__":
root = tk.Tk()
app = LogAnalyzerGUI(root)
root.mainloop()