import re
import argparse
from collections import Counter, defaultdict
from datetime import datetime
import matplotlib
import matplotlib.pyplot as plt
import tkinter as tk
from tkinter import filedialog, messagebox
import os
# 使用Windows系统上通用的中文字体
plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei", "SimSun", "Arial"]
# 设置字体查找的回退机制
plt.rcParams["axes.unicode_minus"] = False # 解决负号显示问题
matplotlib.use('Agg') # 使用非交互式后端
class ApacheLogAnalyzer:
def __init__(self, log_file_path):
self.log_file_path = log_file_path
# 针对用户提供的日志格式优化的正则表达式
# 特别处理了状态码后面可能是'-'的情况
self.log_pattern = r'^(\S+) - - \[(.*?)\] "(.*?)" (\d+) (\S+)'
self.log_entries = []
def parse_log_file(self):
"""解析日志文件并提取关键信息"""
try:
print(f"开始解析日志文件: {self.log_file_path}")
print(f"文件大小: {os.path.getsize(self.log_file_path)} 字节")
with open(self.log_file_path, 'r', encoding='utf-8', errors='replace') as file:
lines_processed = 0
for line in file:
lines_processed += 1
if lines_processed <= 5: # 显示前5行用于调试
print(f"示例行 {lines_processed}: {line.strip()}")
match = re.match(self.log_pattern, line)
if match:
ip = match.group(1)
timestamp_str = match.group(2)
request = match.group(3)
status_code = int(match.group(4))
# 处理响应大小字段,可能是'-'
size_str = match.group(5)
size = int(size_str) if size_str != '-' else 0
try:
# 解析时间戳,适配用户日志格式
timestamp = datetime.strptime(timestamp_str.split()[0], '%d/%b/%Y:%H:%M:%S')
except ValueError as e:
print(f"警告: 无法解析时间戳 '{timestamp_str}',错误: {e}")
# 使用当前时间作为备用,确保至少有数据用于图表
timestamp = datetime.now()
# 尝试从请求中提取请求方法和路径
request_parts = request.split()
request_method = request_parts[0] if len(request_parts) > 0 else "Unknown"
request_path = request_parts[1] if len(request_parts) > 1 else "Unknown"
entry = {
'ip': ip,
'timestamp': timestamp,
'request': request,
'request_method': request_method,
'request_path': request_path,
'status_code': status_code,
'size': size
}
self.log_entries.append(entry)
print(f"共处理 {lines_processed} 行,成功解析 {len(self.log_entries)} 条记录")
if len(self.log_entries) == 0:
print("警告: 没有解析到任何日志记录,请检查日志格式是否匹配")
messagebox.showwarning("警告", "没有解析到任何日志记录,请检查日志格式是否匹配")
return False
return True
except Exception as e:
print(f"解析日志文件时出错: {e}")
messagebox.showerror("错误", f"解析日志文件时出错: {e}")
return False
def analyze_top_ips(self, limit=10):
"""分析访问量最高的IP地址"""
if not self.log_entries:
print("没有数据可供分析")
return Counter()
ip_counter = Counter(entry['ip'] for entry in self.log_entries)
print(f"\n访问量最高的{limit}个IP地址:")
for ip, count in ip_counter.most_common(limit):
print(f"{ip}: {count} 次访问")
return ip_counter
def analyze_status_codes(self):
"""分析HTTP状态码分布"""
if not self.log_entries:
print("没有数据可供分析")
return Counter()
status_counter = Counter(entry['status_code'] for entry in self.log_entries)
print(f"\nHTTP状态码分布:")
total = sum(status_counter.values())
for status, count in sorted(status_counter.items()):
percentage = (count / total) * 100
print(f"{status}: {count} 次 ({percentage:.2f}%)")
return status_counter
def analyze_requests(self, limit=10):
"""分析最常见的请求"""
if not self.log_entries:
print("没有数据可供分析")
return Counter()
request_counter = Counter(entry['request_path'] for entry in self.log_entries)
print(f"\n最常见的{limit}个请求路径:")
for path, count in request_counter.most_common(limit):
print(f"{path}: {count} 次")
return request_counter
def analyze_traffic_by_hour(self):
"""分析每小时的访问流量"""
if not self.log_entries:
print("没有数据可供分析")
return defaultdict(int)
hourly_traffic = defaultdict(int)
for entry in self.log_entries:
hour_key = entry['timestamp'].strftime('%Y-%m-%d %H:00')
hourly_traffic[hour_key] += 1
print(f"\n每小时访问量:")
sorted_hours = sorted(hourly_traffic.items())
for hour, count in sorted_hours[:10]: # 只显示前10个小时的数据
print(f"{hour}: {count} 次")
if len(sorted_hours) > 10:
print(f"... 还有 {len(sorted_hours) - 10} 个小时的数据未显示")
return hourly_traffic
def analyze_request_methods(self):
"""分析请求方法分布(GET, POST等)"""
if not self.log_entries:
print("没有数据可供分析")
return Counter()
method_counter = Counter(entry['request_method'] for entry in self.log_entries)
print(f"\n请求方法分布:")
total = sum(method_counter.values())
for method, count in sorted(method_counter.items()):
percentage = (count / total) * 100
print(f"{method}: {count} 次 ({percentage:.2f}%)")
return method_counter
def generate_hourly_traffic_chart(self, hourly_traffic):
"""可视化每小时的访问流量"""
if not hourly_traffic:
print("没有数据可生成图表")
messagebox.showwarning("警告", "没有数据可生成图表")
return
try:
hours = [item[0] for item in sorted(hourly_traffic.items())]
counts = [item[1] for item in sorted(hourly_traffic.items())]
# 确保有足够的数据点
if len(hours) < 2:
print("数据点太少,无法生成有意义的图表")
messagebox.showwarning("警告", "数据点太少,无法生成有意义的图表")
# 创建一个简单的示例图表,避免用户看到空白图表
plt.figure(figsize=(12, 6))
plt.plot(["示例时间1", "示例时间2"], [10, 15], marker='o')
plt.title('示例流量趋势(实际数据点不足)')
plt.xlabel('时间')
plt.ylabel('访问次数')
plt.tight_layout()
plt.savefig('hourly_traffic.png')
plt.show()
messagebox.showinfo("成功", "由于实际数据点不足,已生成示例流量趋势图: hourly_traffic.png")
return
plt.figure(figsize=(12, 6))
plt.plot(hours, counts, marker='o')
plt.title('每小时访问流量')
plt.xlabel('时间')
plt.ylabel('访问次数')
# 自动调整x轴标签,避免过于拥挤
if len(hours) > 12:
step = max(1, len(hours) // 12)
plt.xticks(hours[::step], rotation=45)
else:
plt.xticks(hours, rotation=45)
plt.tight_layout()
# 保存图表
plt.savefig('hourly_traffic.png', dpi=300, bbox_inches='tight')
# 移除下面这行代码,因为Agg后端不支持交互式显示
# plt.show()
messagebox.showinfo("成功", "流量趋势图已生成并显示: hourly_traffic.png")
except Exception as e:
print(f"生成图表时出错: {e}")
messagebox.showerror("错误", f"生成图表时出错: {e}")
def run_full_analysis(self):
"""运行完整的日志分析"""
if not self.parse_log_file():
print("解析失败,无法继续分析")
return
print("\n===== Apache日志分析报告 =====")
ip_counter = self.analyze_top_ips()
status_counter = self.analyze_status_codes()
request_counter = self.analyze_requests()
method_counter = self.analyze_request_methods() # 新增的请求方法分析
hourly_traffic = self.analyze_traffic_by_hour()
# 生成可视化图表
# 确保将hourly_traffic作为参数传递
self.generate_hourly_traffic_chart(hourly_traffic)
print("\n===== 分析完成 =====")
messagebox.showinfo("完成", "日志分析已完成!")
def select_log_file():
"""弹出文件选择对话框,让用户选择日志文件"""
# 创建一个隐藏的Tk根窗口
root = tk.Tk()
root.withdraw() # 隐藏主窗口
# 设置中文字体支持
root.option_add("*Font", "SimHei 10")
# 弹出文件选择对话框
file_path = filedialog.askopenfilename(
title="选择Apache日志文件",
filetypes=[
("日志文件", "*.log"),
("文本文件", "*.txt"),
("所有文件", "*.*")
]
)
return file_path
if __name__ == "__main__":
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description='Apache日志分析工具')
parser.add_argument('--log_file', help='Apache日志文件路径(可选,不提供则弹出文件选择对话框)')
args = parser.parse_args()
log_file_path = args.log_file
# 如果没有提供日志文件路径,弹出文件选择对话框
if not log_file_path:
log_file_path = select_log_file()
# 检查用户是否取消了文件选择
if not log_file_path:
print("未选择日志文件,程序退出。")
exit(0)
# 检查文件是否存在
if not os.path.exists(log_file_path):
print(f"错误:找不到文件 '{log_file_path}'")
messagebox.showerror("错误", f"找不到文件 '{log_file_path}'")
exit(1)
# 创建分析器实例并运行分析
analyzer = ApacheLogAnalyzer(log_file_path)
analyzer.run_full_analysis()