apache2日志分析简单版python代码

import re
import argparse
from collections import Counter, defaultdict
from datetime import datetime
import matplotlib
import matplotlib.pyplot as plt
import tkinter as tk
from tkinter import filedialog, messagebox
import os

# 使用Windows系统上通用的中文字体
plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei", "SimSun", "Arial"]
# 设置字体查找的回退机制
plt.rcParams["axes.unicode_minus"] = False  # 解决负号显示问题
matplotlib.use('Agg')  # 使用非交互式后端

class ApacheLogAnalyzer:
    def __init__(self, log_file_path):
        self.log_file_path = log_file_path
        # 针对用户提供的日志格式优化的正则表达式
        # 特别处理了状态码后面可能是'-'的情况
        self.log_pattern = r'^(\S+) - - \[(.*?)\] "(.*?)" (\d+) (\S+)'
        self.log_entries = []
        
    def parse_log_file(self):
        """解析日志文件并提取关键信息"""
        try:
            print(f"开始解析日志文件: {self.log_file_path}")
            print(f"文件大小: {os.path.getsize(self.log_file_path)} 字节")
            
            with open(self.log_file_path, 'r', encoding='utf-8', errors='replace') as file:
                lines_processed = 0
                for line in file:
                    lines_processed += 1
                    if lines_processed <= 5:  # 显示前5行用于调试
                        print(f"示例行 {lines_processed}: {line.strip()}")
                    
                    match = re.match(self.log_pattern, line)
                    if match:
                        ip = match.group(1)
                        timestamp_str = match.group(2)
                        request = match.group(3)
                        status_code = int(match.group(4))
                        
                        # 处理响应大小字段,可能是'-'
                        size_str = match.group(5)
                        size = int(size_str) if size_str != '-' else 0
                        
                        try:
                            # 解析时间戳,适配用户日志格式
                            timestamp = datetime.strptime(timestamp_str.split()[0], '%d/%b/%Y:%H:%M:%S')
                        except ValueError as e:
                            print(f"警告: 无法解析时间戳 '{timestamp_str}',错误: {e}")
                            # 使用当前时间作为备用,确保至少有数据用于图表
                            timestamp = datetime.now()
                        
                        # 尝试从请求中提取请求方法和路径
                        request_parts = request.split()
                        request_method = request_parts[0] if len(request_parts) > 0 else "Unknown"
                        request_path = request_parts[1] if len(request_parts) > 1 else "Unknown"
                        
                        entry = {
                            'ip': ip,
                            'timestamp': timestamp,
                            'request': request,
                            'request_method': request_method,
                            'request_path': request_path,
                            'status_code': status_code,
                            'size': size
                        }
                        self.log_entries.append(entry)
            
            print(f"共处理 {lines_processed} 行,成功解析 {len(self.log_entries)} 条记录")
            
            if len(self.log_entries) == 0:
                print("警告: 没有解析到任何日志记录,请检查日志格式是否匹配")
                messagebox.showwarning("警告", "没有解析到任何日志记录,请检查日志格式是否匹配")
                return False
            
            return True
        except Exception as e:
            print(f"解析日志文件时出错: {e}")
            messagebox.showerror("错误", f"解析日志文件时出错: {e}")
            return False
    
    def analyze_top_ips(self, limit=10):
        """分析访问量最高的IP地址"""
        if not self.log_entries:
            print("没有数据可供分析")
            return Counter()
            
        ip_counter = Counter(entry['ip'] for entry in self.log_entries)
        print(f"\n访问量最高的{limit}个IP地址:")
        for ip, count in ip_counter.most_common(limit):
            print(f"{ip}: {count} 次访问")
        return ip_counter
    
    def analyze_status_codes(self):
        """分析HTTP状态码分布"""
        if not self.log_entries:
            print("没有数据可供分析")
            return Counter()
            
        status_counter = Counter(entry['status_code'] for entry in self.log_entries)
        print(f"\nHTTP状态码分布:")
        total = sum(status_counter.values())
        for status, count in sorted(status_counter.items()):
            percentage = (count / total) * 100
            print(f"{status}: {count} 次 ({percentage:.2f}%)")
        return status_counter
    
    def analyze_requests(self, limit=10):
        """分析最常见的请求"""
        if not self.log_entries:
            print("没有数据可供分析")
            return Counter()
            
        request_counter = Counter(entry['request_path'] for entry in self.log_entries)
        
        print(f"\n最常见的{limit}个请求路径:")
        for path, count in request_counter.most_common(limit):
            print(f"{path}: {count} 次")
        return request_counter
    
    def analyze_traffic_by_hour(self):
        """分析每小时的访问流量"""
        if not self.log_entries:
            print("没有数据可供分析")
            return defaultdict(int)
            
        hourly_traffic = defaultdict(int)
        for entry in self.log_entries:
            hour_key = entry['timestamp'].strftime('%Y-%m-%d %H:00')
            hourly_traffic[hour_key] += 1
        
        print(f"\n每小时访问量:")
        sorted_hours = sorted(hourly_traffic.items())
        for hour, count in sorted_hours[:10]:  # 只显示前10个小时的数据
            print(f"{hour}: {count} 次")
        if len(sorted_hours) > 10:
            print(f"... 还有 {len(sorted_hours) - 10} 个小时的数据未显示")
        
        return hourly_traffic
    
    def analyze_request_methods(self):
        """分析请求方法分布(GET, POST等)"""
        if not self.log_entries:
            print("没有数据可供分析")
            return Counter()
            
        method_counter = Counter(entry['request_method'] for entry in self.log_entries)
        
        print(f"\n请求方法分布:")
        total = sum(method_counter.values())
        for method, count in sorted(method_counter.items()):
            percentage = (count / total) * 100
            print(f"{method}: {count} 次 ({percentage:.2f}%)")
        return method_counter
    
    def generate_hourly_traffic_chart(self, hourly_traffic):
        """可视化每小时的访问流量"""
        if not hourly_traffic:
            print("没有数据可生成图表")
            messagebox.showwarning("警告", "没有数据可生成图表")
            return
            
        try:
            hours = [item[0] for item in sorted(hourly_traffic.items())]
            counts = [item[1] for item in sorted(hourly_traffic.items())]
            
            # 确保有足够的数据点
            if len(hours) < 2:
                print("数据点太少,无法生成有意义的图表")
                messagebox.showwarning("警告", "数据点太少,无法生成有意义的图表")
                # 创建一个简单的示例图表,避免用户看到空白图表
                plt.figure(figsize=(12, 6))
                plt.plot(["示例时间1", "示例时间2"], [10, 15], marker='o')
                plt.title('示例流量趋势(实际数据点不足)')
                plt.xlabel('时间')
                plt.ylabel('访问次数')
                plt.tight_layout()
                plt.savefig('hourly_traffic.png')
                plt.show()
                messagebox.showinfo("成功", "由于实际数据点不足,已生成示例流量趋势图: hourly_traffic.png")
                return
            
            plt.figure(figsize=(12, 6))
            plt.plot(hours, counts, marker='o')
            plt.title('每小时访问流量')
            plt.xlabel('时间')
            plt.ylabel('访问次数')
            
            # 自动调整x轴标签,避免过于拥挤
            if len(hours) > 12:
                step = max(1, len(hours) // 12)
                plt.xticks(hours[::step], rotation=45)
            else:
                plt.xticks(hours, rotation=45)
                
            plt.tight_layout()
            
            # 保存图表
            plt.savefig('hourly_traffic.png', dpi=300, bbox_inches='tight')
            # 移除下面这行代码,因为Agg后端不支持交互式显示
            # plt.show()
            
            messagebox.showinfo("成功", "流量趋势图已生成并显示: hourly_traffic.png")
        except Exception as e:
            print(f"生成图表时出错: {e}")
            messagebox.showerror("错误", f"生成图表时出错: {e}")

    def run_full_analysis(self):
        """运行完整的日志分析"""
        if not self.parse_log_file():
            print("解析失败,无法继续分析")
            return
        
        print("\n===== Apache日志分析报告 =====")
        ip_counter = self.analyze_top_ips()
        status_counter = self.analyze_status_codes()
        request_counter = self.analyze_requests()
        method_counter = self.analyze_request_methods()  # 新增的请求方法分析
        hourly_traffic = self.analyze_traffic_by_hour()
        
        # 生成可视化图表
        # 确保将hourly_traffic作为参数传递
        self.generate_hourly_traffic_chart(hourly_traffic)
        
        print("\n===== 分析完成 =====")
        messagebox.showinfo("完成", "日志分析已完成!")

def select_log_file():
    """弹出文件选择对话框,让用户选择日志文件"""
    # 创建一个隐藏的Tk根窗口
    root = tk.Tk()
    root.withdraw()  # 隐藏主窗口
    
    # 设置中文字体支持
    root.option_add("*Font", "SimHei 10")
    
    # 弹出文件选择对话框
    file_path = filedialog.askopenfilename(
        title="选择Apache日志文件",
        filetypes=[
            ("日志文件", "*.log"),
            ("文本文件", "*.txt"),
            ("所有文件", "*.*")
        ]
    )
    
    return file_path

if __name__ == "__main__":
    # 创建命令行参数解析器
    parser = argparse.ArgumentParser(description='Apache日志分析工具')
    parser.add_argument('--log_file', help='Apache日志文件路径(可选,不提供则弹出文件选择对话框)')
    args = parser.parse_args()
    
    log_file_path = args.log_file
    
    # 如果没有提供日志文件路径,弹出文件选择对话框
    if not log_file_path:
        log_file_path = select_log_file()
        
        # 检查用户是否取消了文件选择
        if not log_file_path:
            print("未选择日志文件,程序退出。")
            exit(0)
    
    # 检查文件是否存在
    if not os.path.exists(log_file_path):
        print(f"错误:找不到文件 '{log_file_path}'")
        messagebox.showerror("错误", f"找不到文件 '{log_file_path}'")
        exit(1)
    
    # 创建分析器实例并运行分析
    analyzer = ApacheLogAnalyzer(log_file_path)
    analyzer.run_full_analysis()