python怎么实时监控logstash日志

人气:418 ℃/2023-02-25 08:42:07

第一步,实时读取logstash日志,有异常错误keywork即触发报警。

#/usr/bin/envpython3#-*-coding:utf-8-*-#__author__=caozhi#create_time2018-11-12,update_time2018-11-15#version=1.0#录像高可用报警#1读取日志使用游标移动#2线上业务日志文件会切割,切割后,读取上一个切割的日志importosimportsysimportjsonimportrequestsimporttimeimportrecini=conf.ini'log_file=logstash.log'defreadconf():try:withopen(cini,'r+')asf:CONF=json.load(f)except:CONF={"seek":0,"inode":922817,"last_file":logstash.log"}writeconf(CONF=CONF)print('conf.ini配置文件缺失,自动创建一个新的配置文件')returnCONFdefwriteconf(CONF):withopen(cini,'w+')ase:json.dump(CONF,e)defread_log(log_file,seek):try:f=open(log_file,'r')exceptFileNotFoundError:f=open(logstash.log','r')seek=0print('上一个文件读取失败了,请检查切割的日志文件')except:print('日志文件打开错误,退出程序')sys.exit()f.seek(seek)line=f.readline()new_seek=f.tell()ifnew_seek==seek:print('没有追加日志,退出程序')sys.exit()whileline:try:logstash=json.loads(line)except:CONF={"seek":0,"inode":922817,"last_file":"/data/logs/lmrs/logstash.log"}writeconf(CONF=CONF)print('json数据加载错误,重新创建一个新的配置文件')sys.exit()#if'''re.search(time.strftime("%Y:%H:%M",time.localtime()),logstash.get('log_time'))and'''logstash.get('rtype')==6andlogstash.get('uri')=='/publish'andlogstash.get('event')==0:iflogstash.get('rtype')==6andlogstash.get('uri')=='/publish'andlogstash.get('event')==0:value=1stream=logstash.get('name')print('{}{}'.format(value,stream))record(value=value,stream=stream)else:value=0stream=0line=f.readline()seek=f.tell()f.closereturnvalue,stream,seekdefrecord(value,stream):data=[]record={}record['metric']='recording_high_availability_monitor'record['endpoint']=os.uname()[1]record['timestamp']=int(time.time())record['step']=60record['value']=valuerecord['counterType']='GAUGE'record['Tags']='{}={}'.format(int(time.time()),stream)data.append(record)ifdata:print('这是data的json数据')print(data)falcon_request=requests.post("http://127.0.0.1:1988/v1/push",data=json.dumps(data))#falcon_request=requests.post("http://127.0.0.1:1988/v1/push",json=data)print('json参数请求返回状态码为:'+str(falcon_request.status_code))print('json参数请求返回为:'+str(falcon_request.text))if__name__=='__main__':print()print('***************************************')print('本次执行脚本时间:{}'.format(time.strftime("%Y%m%d_%H%M",time.localtime())))CONF=readconf()print('first_CONF:{}'.format(CONF))print('NO1.log_file',log_file)last_inode=CONF['inode']inode=os.stat(log_file).st_inoprint('last_inode:{}inode:{}'.format(last_inode,inode))ifinode==last_inode:seek=CONF['seek']next_file=0else:log_file=CONF['last_file']+time.strftime("-%Y%m%d_",time.localtime())+str(time.strftime("%H%M",time.localtime()))[:-1]+'0'next_file=1seek=CONF['seek']print('NO2.log_file',log_file)value,stream,seek=read_log(log_file=log_file,seek=seek)ifnext_file:CONF['seek']=0else:CONF['seek']=seekCONF['inode']=os.stat(logstash.log').st_inowriteconf(CONF=CONF)print('last_CONF:{}'.format(CONF))

扩展代码:logstash 调用exec

[elk@Vsftplogstash]$catt3.confinput{stdin{}}filter{grok{match=>["message","(?m)\s*%{TIMESTAMP_ISO8601:time}\s*(?(\S+)).*"]}date{match=>["time","yyyy-MM-ddHH:mm:ss,SSS"]}mutate{add_field=>["type","tailong"]add_field=>["messager","%{type}-%{message}"]remove_field=>["message"]}}output{if([Level]=="ERROR"or[messager]=~"Exception")and[messager]!~"温金服务未连接"and[messager]!~"调用温金代理系统接口错误"and[messager]!~"BusinessException"{exec{command=>"/bin/smail.pl\"%{messager}\"\"%{type}\""}}stdout{codec=>rubydebug}}Vsftp:/root#cat/bin/smail.pl#!/usr/bin/perluseNet::SMTP;useHTTP::Dateqw(time2isostr2timetime2isotime2isoz);useData::Dumper;useGetopt::Std;usevarsqw($opt_d);getopts('d:');#mail_usershouldbeyour_mail@163.com$message="@ARGV";$env="$opt_d";subsend_mail{my$CurrTime=time2iso(time());my$to_address=shift;my$mail_user='zhao.yangjian@163.com';my$mail_pwd='xx';my$mail_server='smtp.163.com';my$from="From:$mail_user\n";my$subject="Subject:zjcapinfo\n";my$info="$CurrTime--$message";my$message=auth($mail_user,$mail_pwd)||die"AuthError!$!";$smtp->mail($mail_user);$smtp->to($to_address);$smtp->data();#beginthedata$smtp->datasend($from);#setuser$smtp->datasend($subject);#setsubject$smtp->datasend("\n\n");$smtp->datasend("$message\n");#setcontent$smtp->dataend();$smtp->quit();};send_mail('zhao.yangjian@163.com');2017-01-1210:19:19,888jjjjjException{"@version"=>"1","@timestamp"=>"2017-01-12T02:19:19.888Z","host"=>"Vsftp","time"=>"2017-01-1210:19:19,888","Level"=>"jjjjj","type"=>"tailong","messager"=>"tailong-2017-01-1210:19:19,888jjjjjException"}

python日志分析及可视化

Python运维自动化入门:10分钟搞定日志分析

开篇引导

大家好,我是汪哥!今天跟大家聊一个特别实用的话题 - Python日志分析自动化。还记得我刚开始做运维时,每天都要花大量时间人工查看日志,又累又容易漏掉重要信息。直到我开始用Python来解放双手,运维工作顿时轻松了许多!

学习目标

掌握Python读取和解析日志文件的基本方法

学会使用正则表达式提取关键信息

实现简单的日志分析自动化脚本

预备知识

Python基础语法

文件操作基本概念

简单的正则表达式

核心内容

1. 日志处理基础

日志文件就像是系统的"日记本",记录着服务器的一举一动。我们先来看一个最基础的日志分析脚本:

python运行复制

import re

from datetime import datetime

def analyze_log(log_file):

# 定义错误模式

error_pattern = r'\[ERROR\].*'

# 统计数据

error_count = 0

error_logs = []

# 读取并分析日志

with open(log_file, 'r') as f:

for line in f:

if re.search(error_pattern, line):

error_count = 1

error_logs.append(line.strip())

return error_count, error_logs

# 使用示例

log_file = 'app.log'

count, errors = analyze_log(log_file)

print(f"发现{count}个错误")

2. 实战案例:智能日志分析器

下面我们来开发一个更实用的日志分析工具:

python运行复制

import pandas as pd

from collections import defaultdict

import datetime

class LogAnalyzer:

def __init__(self, log_file):

self.log_file = log_file

self.stats = defaultdict(int)

self.alerts = []

def analyze(self):

with open(self.log_file, 'r') as f:

for line in f:

# 统计错误类型

if '[ERROR]' in line:

self.stats['error'] = 1

elif '[WARN]' in line:

self.stats['warn'] = 1

# 检查关键错误

if 'OutOfMemory' in line:

self.alerts.append({

'time': datetime.datetime.now(),

'type': 'Critical',

'message': line.strip()

})

def generate_report(self):

print("==== 日志分析报告 ====")

print(f"错误总数:{self.stats['error']}")

print(f"警告总数:{self.stats['warn']}")

print("\n严重问题:")

for alert in self.alerts:

print(f"- {alert['time']}: {alert['message']}")

3. 性能优化建议

处理大文件时的优化技巧:

使用生成器逐行读取,避免一次性加载

多进程并行处理大文件

使用pandas进行数据分析加速

互动环节

练习题

如何统计日志中每小时的错误数量?

添加邮件告警功能,当发现严重错误时自动通知

挑战任务

实现一个实时日志监控脚本,包含以下功能:

错误分类统计

关键字告警

性能指标分析

可视化报表生成

小结

本节重点回顾

Python文件操作基础

正则表达式应用

日志分析实战技巧

性能优化方案

实践任务

尝试分析你自己的应用日志,统计一下各类型错误的分布情况。把结果在评论区和大家分享!

下期预告

下期我们将深入探讨Python自动化运维的另一个重要主题:服务器性能监控,记得关注哦!

有问题随时在评论区问我。我是汪哥,我们下期再见!

学习打卡:在评论区分享你的学习心得

实战展示:欢迎分享你的代码实现

⭐ 点赞收藏,你的支持是我创作的动力!

推荐

首页/电脑版/网名
© 2025 NiBaKu.Com All Rights Reserved.