a54546 发表于 2025-2-7 02:10:13

Grafana监控图形拉取

python拉取grafana监控图形

python通过grafana提供的api接口拉取grafana监控图形并保存至word文档生成日报发送邮件
前置条件:

1.grafana平台需要安装grafana-image-renderer 插件,用于生成静态图形

页面可以检查是否已安装


未安装会进入如下页面:

从API接口拉取图片会提示:

安装方式参考:
https://grafana.com/grafana/plugins/grafana-image-renderer/
该插件对内存大小有一定要求:

如果已安装该插件,点击Direct link rendered image后会显示一个静态监控图形页面
参考: https://cloud.tencent.com/document/product/1437/65674
2.在grafana页面生成api_keys,用于接口请求认证

不同版本获取方式不同 此示例为V9.2.6版,入口如下:



点击Add API key添加,可以指定有效期,弹出密钥的界面记得把密钥复制下来,否则关闭后就看不到了。
代码示例:


[*]该示例是将Grafana获取的多张图形保存到本地的panels目录下(多线程执行),
[*]通过python的Image模块进行图片拼接(拼接方式可自定义),
[*]读取日志模版文档(模版中预留了等字符用于定位),
[*]将拼好的图片插入到日志模版文档并替换文档中的日期信息,
[*]保存生成新的文档,删除本地panels目录下的图片缓存,并发送邮件。
# -*- coding: UTF-8 -*-import smtplibimport requestsimport osimport shutilimport timefrom PIL import Imagefrom docx import Documentfrom docx.shared import Inchesfrom datetime import datetime, timedeltafrom concurrent.futures import ThreadPoolExecutor, as_completedfrom email.header import Headerfrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartfrom email.header import make_header# 配置 Grafana API 参数GRAFANA_HOST = "http://xxxxxxxxxxx"# Grafana API 密钥具有时效性,如过期,请联系运维人员生成新的api_keysAPI_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"# 仪表板IDDASHBOARD_UID = "xxxxxxxxxx"# 图形IDPANEL_ID_DATA = {"PANEL_1":,               "PANEL_2":,}ORG_ID = "1"#日志模版文件document_path = "xxxxxxxx日志 - 模板.docx"# 邮箱接收人RECEIVERS = ["xxxxxx.com", "xxx.com"]# 保存图片的文件名folder = "panels"#邮件发送def send_mail(filename):    sender = 'xxxxxxx.com'    receiver = ','.join(RECEIVERS)    smtpserver = 'xxxxxx.com'    user = 'xxxxxx.com'    password = 'xxxxxxxx'    mail_title = filename.split('/')    mail_title = mail_title.split('.')    # 创建一个带附件的实例    message = MIMEMultipart()    message['From'] = sender    message['To'] = receiver    message['Subject'] = Header(mail_title, 'utf-8')    # 邮件正文内容    message.attach(MIMEText(f'hello,附件为 {mail_title},请查收,如有问题请与我联系。', 'plain', 'utf-8'))    # 构造附件    file_msg = MIMEText(open(filename, 'rb').read(), 'base64', 'UTF-8')    file_msg["Content-Type"] = 'application/octet-stream;name="%s"' % make_header([(filename, 'UTF-8')]).encode('UTF-8')    file_msg["Content-Disposition"] = 'attachment;filename= "%s"' % make_header([(filename, 'UTF-8')]).encode('UTF-8')    message.attach(file_msg)    try:               smtpObj = smtplib.SMTP(smtpserver)      smtpObj.starttls()      smtpObj.login(user, password)      smtpObj.sendmail(sender, receiver, message.as_string())      print("邮件发送成功!!!")    except Exception as e:      print(e)      print("邮件发送失败!!!")    finally:      smtpObj.quit()def center_insert_img(doc, img,date_str):    """插入图片"""    date_obj = datetime.strptime(date_str, '%Y-%m-%d')    # 将datetime对象格式化为指定的中文格式字符串    formatted_date = date_obj.strftime('%Y年%m月%d日')    for paragraph in doc.paragraphs:      # 根据文档中的占位符定位图片插入的位置      if '<datetime>' in paragraph.text:            paragraph.text = paragraph.text.replace('<datetime>', formatted_date)      elif '<img1>' in paragraph.text:            # 把占位符去掉            paragraph.text = paragraph.text.replace('<img1>', '')            # 添加一个文字块            run = paragraph.add_run('')            # 添加一个’回车换行效果‘            run.add_break()            # 添加图片并指定大小            run.add_picture(img, width=Inches(6.2))      elif '<img2>' in paragraph.text:            # 把占位符去掉            paragraph.text = paragraph.text.replace('<img2>', '')            # 添加一个文字块            run = paragraph.add_run('')            # 添加一个’回车换行效果‘            run.add_break()            # 添加图片并指定大小            run.add_picture(img, width=Inches(6.2))def save_img_to_doc(img,day_time):    """把图片保存到doc文件中的指定位置"""    tpl_doc = document_path    current_year = datetime.now().year    if not os.path.exists(f"{current_year}_doc"):      os.makedirs(f"{current_year}_doc")    res_doc = f'{current_year}_doc/日志报告_{day_time}.docx'    # 打开模板文件    document = Document(tpl_doc)    # 插入图片居中    center_insert_img(document, img,day_time)    # 保存结果文件    document.save(res_doc)    return res_docdef download_image(render_url, output_path, headers):    """下载单张图片的函数"""    try:      response = requests.get(render_url, headers=headers, stream=True)      if response.status_code == 200 and "image/png" in response.headers.get("Content-Type", ""):            with open(output_path, "wb") as f:                for chunk in response.iter_content(chunk_size=8192):                  f.write(chunk)            return output_path      else:            print(f"获取图形失败: {render_url}")            print(f"状态码: {response.status_code}, 返回内容: {response.text}")            return None    except Exception as e:      print(f"下载图片出错: {e}")      return Nonedef merge_images(image_paths, output_path, layout):    """合并图片"""    images =     width, height = images.size    new_image = object    if layout == "grid_3xN":      num_rows = (len(images) + 2) // 3      new_image = Image.new('RGB', (3 * width, num_rows * height))      for idx, img in enumerate(images):            x, y = (idx % 3) * width, (idx // 3) * height            new_image.paste(img, (x, y))    elif layout == "grid_3x3":      new_image = Image.new('RGB', (3 * width, 3 * height))      for idx, img in enumerate(images[:9]):            x, y = (idx % 3) * width, (idx // 3) * height            new_image.paste(img, (x, y))    new_image.save(output_path)    new_image.close()def download_grafana_panel():    # start = time.time()    time_tuple = generate_daily_timestamps_and_dates()    image_paths = []    save_img_to_doc_paths = []    headers = {"Authorization": f"Bearer {API_TOKEN}"}    tasks = []    # 创建下载目录    daily_folder = f"{folder}/{time_tuple}"    os.makedirs(daily_folder, exist_ok=True)    # 使用线程池并行下载图片   # 此处的的URL仅供参考,I6xasdas要替换为你页面的实际值    # 也就是你点击Direct link rendered image后显示的静态监控图形页面的URL    with ThreadPoolExecutor(max_workers=5) as executor:      for PANEL_NAME, PANEL_ID_list in PANEL_ID_DATA.items():            for PANEL_ID in PANEL_ID_list:                render_url = f'{GRAFANA_HOST}/render/d-solo/I6xasdas/{DASHBOARD_UID}?orgId=1&from={time_tuple}&to={time_tuple}&panelId={PANEL_ID}&width=1000&height=500&tz=Asia%2FShanghai'                output_path = f"{daily_folder}/{DASHBOARD_UID}_panel_{PANEL_ID}.png"                tasks.append(executor.submit(download_image, render_url, output_path, headers))      # 收集任务结果      for future in as_completed(tasks):            result = future.result()            if result:                image_paths.append(result)    # 合并图片 (第一组 3xN)    if len(image_paths) > 0:      output_path1 = f"{daily_folder}/panel_01.jpg"      merge_images(image_paths[:13], output_path1, layout="grid_3xN")      save_img_to_doc_paths.append(output_path1)    # 合并图片 (第二组 3x3)    if len(image_paths) > 13:      output_path2 = f"{daily_folder}/panel_02.jpg"      merge_images(image_paths, output_path2, layout="grid_3x3")      save_img_to_doc_paths.append(output_path2)    # 插入文档保存    doc_file_path = save_img_to_doc(save_img_to_doc_paths, time_tuple)    # 删除缓存图片    remove_dir(f"{folder}/{time_tuple}")    #发送邮件    send_mail(doc_file_path)    # end_time = time.time() - start    # print(f"巡检报告成功保存并删除图片缓存!耗时{round(end_time,2)}秒")def generate_daily_timestamps_and_dates():    current_date_time = datetime.now()    # 提取日期部分并格式化为年-月-日的字符串形式    formatted_date = current_date_time.strftime('%Y-%m-%d')    # 将输入的日期字符串转换为datetime对象    start_date = datetime.strptime(formatted_date, '%Y-%m-%d')    last_day_start_date = start_date - timedelta(days=1)    # 获取当天的起始时间(将时间设置为00:00:00)    start_of_day = last_day_start_date.replace(hour=0, minute=0, second=0, microsecond=0)    # 将datetime对象转换为时间戳(单位:秒)    timestamp = int(start_of_day.timestamp()*1000)    date_str = start_of_day.strftime('%Y-%m-%d')    return (date_str, timestamp, timestamp + 86399*1000)def remove_dir(directory):    try:      shutil.rmtree(directory)    except OSError as e:      print(f"删除目录时出错: {e}")      if e.errno == 32:# 文件正在被占用            print("文件被占用,等待释放后重试...")            time.sleep(0.1)# 等待 0.1 秒后再次尝试            try:                shutil.rmtree(directory)                print(f"第二次尝试删除成功: {directory}")            except Exception as e2:                print(f"仍然无法删除目录: {e2}")if __name__ == "__main__":    download_grafana_panel()
页: [1]
查看完整版本: Grafana监控图形拉取