English 简体中文 繁體中文 한국 사람 日本語 Deutsch русский بالعربية TÜRKÇE português คนไทย french
查看: 2|回复: 0

Grafana监控图形拉取

[复制链接]
查看: 2|回复: 0

Grafana监控图形拉取

[复制链接]
查看: 2|回复: 0

366

主题

0

回帖

1108

积分

金牌会员

积分
1108
a54546

366

主题

0

回帖

1108

积分

金牌会员

积分
1108
2025-2-7 02:10:13 | 显示全部楼层 |阅读模式
python拉取grafana监控图形

python通过grafana提供的api接口拉取grafana监控图形并保存至word文档生成日报发送邮件
前置条件:

1.grafana平台需要安装grafana-image-renderer 插件,用于生成静态图形

页面可以检查是否已安装


未安装会进入如下页面:

从API接口拉取图片会提示:

安装方式参考:
https://grafana.com/grafana/plugins/grafana-image-renderer/
该插件对内存大小有一定要求:

如果已安装该插件,点击Direct link rendered image后会显示一个静态监控图形页面
参考: https://cloud.tencent.com/document/product/1437/65674
2.在grafana页面生成api_keys,用于接口请求认证

不同版本获取方式不同 此示例为V9.2.6版,入口如下:



点击Add API key添加,可以指定有效期,弹出密钥的界面记得把密钥复制下来,否则关闭后就看不到了。
代码示例:


  • 该示例是将Grafana获取的多张图形保存到本地的panels目录下(多线程执行),
  • 通过python的Image模块进行图片拼接(拼接方式可自定义),
  • 读取日志模版文档(模版中预留了等字符用于定位),
  • 将拼好的图片插入到日志模版文档并替换文档中的日期信息,
  • 保存生成新的文档,删除本地panels目录下的图片缓存,并发送邮件。
# -*- coding: UTF-8 -*-import smtplibimport requestsimport osimport shutilimport timefrom PIL import Imagefrom docx import Documentfrom docx.shared import Inchesfrom datetime import datetime, timedeltafrom concurrent.futures import ThreadPoolExecutor, as_completedfrom email.header import Headerfrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartfrom email.header import make_header# 配置 Grafana API 参数GRAFANA_HOST = "http://xxxxxxxxxxx"# Grafana API 密钥具有时效性,如过期,请联系运维人员生成新的api_keysAPI_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"# 仪表板IDDASHBOARD_UID = "xxxxxxxxxx"# 图形IDPANEL_ID_DATA = {"PANEL_1":[30,31,2,10,12,14,16,18,20,22,24,26,28],                 "PANEL_2":[37, 39, 41, 43, 45, 47, 49, 51, 53],}ORG_ID = "1"#日志模版文件document_path = "xxxxxxxx日志 - 模板.docx"# 邮箱接收人RECEIVERS = ["xxxxxx.com", "xxx.com"]# 保存图片的文件名folder = "panels"#邮件发送def send_mail(filename):    sender = 'xxxxxxx.com'    receiver = ','.join(RECEIVERS)    smtpserver = 'xxxxxx.com'    user = 'xxxxxx.com'    password = 'xxxxxxxx'    mail_title = filename.split('/')[1]    mail_title = mail_title.split('.')[0]    # 创建一个带附件的实例    message = MIMEMultipart()    message['From'] = sender    message['To'] = receiver    message['Subject'] = Header(mail_title, 'utf-8')    # 邮件正文内容    message.attach(MIMEText(f'hello,附件为 {mail_title},请查收,如有问题请与我联系。', 'plain', 'utf-8'))    # 构造附件    file_msg = MIMEText(open(filename, 'rb').read(), 'base64', 'UTF-8')    file_msg["Content-Type"] = 'application/octet-stream;name="%s"' % make_header([(filename, 'UTF-8')]).encode('UTF-8')    file_msg["Content-Disposition"] = 'attachment;filename= "%s"' % make_header([(filename, 'UTF-8')]).encode('UTF-8')    message.attach(file_msg)    try:               smtpObj = smtplib.SMTP(smtpserver)        smtpObj.starttls()        smtpObj.login(user, password)        smtpObj.sendmail(sender, receiver, message.as_string())        print("邮件发送成功!!!")    except Exception as e:        print(e)        print("邮件发送失败!!!")    finally:        smtpObj.quit()def center_insert_img(doc, img,date_str):    """插入图片"""    date_obj = datetime.strptime(date_str, '%Y-%m-%d')    # 将datetime对象格式化为指定的中文格式字符串    formatted_date = date_obj.strftime('%Y年%m月%d日')    for paragraph in doc.paragraphs:        # 根据文档中的占位符定位图片插入的位置        if '<datetime>' in paragraph.text:            paragraph.text = paragraph.text.replace('<datetime>', formatted_date)        elif '<img1>' in paragraph.text:            # 把占位符去掉            paragraph.text = paragraph.text.replace('<img1>', '')            # 添加一个文字块            run = paragraph.add_run('')            # 添加一个’回车换行效果‘            run.add_break()            # 添加图片并指定大小            run.add_picture(img[0], width=Inches(6.2))        elif '<img2>' in paragraph.text:            # 把占位符去掉            paragraph.text = paragraph.text.replace('<img2>', '')            # 添加一个文字块            run = paragraph.add_run('')            # 添加一个’回车换行效果‘            run.add_break()            # 添加图片并指定大小            run.add_picture(img[1], width=Inches(6.2))def save_img_to_doc(img,day_time):    """把图片保存到doc文件中的指定位置"""    tpl_doc = document_path    current_year = datetime.now().year    if not os.path.exists(f"{current_year}_doc"):        os.makedirs(f"{current_year}_doc")    res_doc = f'{current_year}_doc/日志报告_{day_time}.docx'    # 打开模板文件    document = Document(tpl_doc)    # 插入图片居中    center_insert_img(document, img,day_time)    # 保存结果文件    document.save(res_doc)    return res_docdef download_image(render_url, output_path, headers):    """下载单张图片的函数"""    try:        response = requests.get(render_url, headers=headers, stream=True)        if response.status_code == 200 and "image/png" in response.headers.get("Content-Type", ""):            with open(output_path, "wb") as f:                for chunk in response.iter_content(chunk_size=8192):                    f.write(chunk)            return output_path        else:            print(f"获取图形失败: {render_url}")            print(f"状态码: {response.status_code}, 返回内容: {response.text}")            return None    except Exception as e:        print(f"下载图片出错: {e}")        return Nonedef merge_images(image_paths, output_path, layout):    """合并图片"""    images = [Image.open(path) for path in image_paths]    width, height = images[0].size    new_image = object    if layout == "grid_3xN":        num_rows = (len(images) + 2) // 3        new_image = Image.new('RGB', (3 * width, num_rows * height))        for idx, img in enumerate(images):            x, y = (idx % 3) * width, (idx // 3) * height            new_image.paste(img, (x, y))    elif layout == "grid_3x3":        new_image = Image.new('RGB', (3 * width, 3 * height))        for idx, img in enumerate(images[:9]):            x, y = (idx % 3) * width, (idx // 3) * height            new_image.paste(img, (x, y))    new_image.save(output_path)    new_image.close()def download_grafana_panel():    # start = time.time()    time_tuple = generate_daily_timestamps_and_dates()    image_paths = []    save_img_to_doc_paths = []    headers = {"Authorization": f"Bearer {API_TOKEN}"}    tasks = []    # 创建下载目录    daily_folder = f"{folder}/{time_tuple[0]}"    os.makedirs(daily_folder, exist_ok=True)    # 使用线程池并行下载图片     # 此处的的URL仅供参考,I6xasdas要替换为你页面的实际值    # 也就是你点击Direct link rendered image后显示的静态监控图形页面的URL    with ThreadPoolExecutor(max_workers=5) as executor:        for PANEL_NAME, PANEL_ID_list in PANEL_ID_DATA.items():            for PANEL_ID in PANEL_ID_list:                render_url = f'{GRAFANA_HOST}/render/d-solo/I6xasdas/{DASHBOARD_UID}?orgId=1&from={time_tuple[1]}&to={time_tuple[2]}&panelId={PANEL_ID}&width=1000&height=500&tz=Asia%2FShanghai'                output_path = f"{daily_folder}/{DASHBOARD_UID}_panel_{PANEL_ID}.png"                tasks.append(executor.submit(download_image, render_url, output_path, headers))        # 收集任务结果        for future in as_completed(tasks):            result = future.result()            if result:                image_paths.append(result)    # 合并图片 (第一组 3xN)    if len(image_paths) > 0:        output_path1 = f"{daily_folder}/panel_01.jpg"        merge_images(image_paths[:13], output_path1, layout="grid_3xN")        save_img_to_doc_paths.append(output_path1)    # 合并图片 (第二组 3x3)    if len(image_paths) > 13:        output_path2 = f"{daily_folder}/panel_02.jpg"        merge_images(image_paths[12:21], output_path2, layout="grid_3x3")        save_img_to_doc_paths.append(output_path2)    # 插入文档保存    doc_file_path = save_img_to_doc(save_img_to_doc_paths, time_tuple[0])    # 删除缓存图片    remove_dir(f"{folder}/{time_tuple[0]}")    #发送邮件    send_mail(doc_file_path)    # end_time = time.time() - start    # print(f"巡检报告成功保存并删除图片缓存!耗时{round(end_time,2)}秒")def generate_daily_timestamps_and_dates():    current_date_time = datetime.now()    # 提取日期部分并格式化为年-月-日的字符串形式    formatted_date = current_date_time.strftime('%Y-%m-%d')    # 将输入的日期字符串转换为datetime对象    start_date = datetime.strptime(formatted_date, '%Y-%m-%d')    last_day_start_date = start_date - timedelta(days=1)    # 获取当天的起始时间(将时间设置为00:00:00)    start_of_day = last_day_start_date.replace(hour=0, minute=0, second=0, microsecond=0)    # 将datetime对象转换为时间戳(单位:秒)    timestamp = int(start_of_day.timestamp()*1000)    date_str = start_of_day.strftime('%Y-%m-%d')    return (date_str, timestamp, timestamp + 86399*1000)def remove_dir(directory):    try:        shutil.rmtree(directory)    except OSError as e:        print(f"删除目录时出错: {e}")        if e.errno == 32:  # 文件正在被占用            print("文件被占用,等待释放后重试...")            time.sleep(0.1)  # 等待 0.1 秒后再次尝试            try:                shutil.rmtree(directory)                print(f"第二次尝试删除成功: {directory}")            except Exception as e2:                print(f"仍然无法删除目录: {e2}")if __name__ == "__main__":    download_grafana_panel()
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

366

主题

0

回帖

1108

积分

金牌会员

积分
1108

QQ|智能设备 | 粤ICP备2024353841号-1

GMT+8, 2025-3-10 15:13 , Processed in 1.071696 second(s), 30 queries .

Powered by 智能设备

©2025

|网站地图