|
python拉取grafana监控图形
python通过grafana提供的api接口拉取grafana监控图形并保存至word文档生成日报发送邮件
前置条件:
1.grafana平台需要安装grafana-image-renderer 插件,用于生成静态图形
页面可以检查是否已安装
未安装会进入如下页面:
从API接口拉取图片会提示:
安装方式参考:
https://grafana.com/grafana/plugins/grafana-image-renderer/
该插件对内存大小有一定要求:
如果已安装该插件,点击Direct link rendered image后会显示一个静态监控图形页面
参考: https://cloud.tencent.com/document/product/1437/65674
2.在grafana页面生成api_keys,用于接口请求认证
不同版本获取方式不同 此示例为V9.2.6版,入口如下:
点击Add API key添加,可以指定有效期,弹出密钥的界面记得把密钥复制下来,否则关闭后就看不到了。
代码示例:
- 该示例是将Grafana获取的多张图形保存到本地的panels目录下(多线程执行),
- 通过python的Image模块进行图片拼接(拼接方式可自定义),
- 读取日志模版文档(模版中预留了等字符用于定位),
- 将拼好的图片插入到日志模版文档并替换文档中的日期信息,
- 保存生成新的文档,删除本地panels目录下的图片缓存,并发送邮件。
# -*- coding: UTF-8 -*-import smtplibimport requestsimport osimport shutilimport timefrom PIL import Imagefrom docx import Documentfrom docx.shared import Inchesfrom datetime import datetime, timedeltafrom concurrent.futures import ThreadPoolExecutor, as_completedfrom email.header import Headerfrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartfrom email.header import make_header# 配置 Grafana API 参数GRAFANA_HOST = "http://xxxxxxxxxxx"# Grafana API 密钥具有时效性,如过期,请联系运维人员生成新的api_keysAPI_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"# 仪表板IDDASHBOARD_UID = "xxxxxxxxxx"# 图形IDPANEL_ID_DATA = {"PANEL_1":[30,31,2,10,12,14,16,18,20,22,24,26,28], "PANEL_2":[37, 39, 41, 43, 45, 47, 49, 51, 53],}ORG_ID = "1"#日志模版文件document_path = "xxxxxxxx日志 - 模板.docx"# 邮箱接收人RECEIVERS = ["xxxxxx.com", "xxx.com"]# 保存图片的文件名folder = "panels"#邮件发送def send_mail(filename): sender = 'xxxxxxx.com' receiver = ','.join(RECEIVERS) smtpserver = 'xxxxxx.com' user = 'xxxxxx.com' password = 'xxxxxxxx' mail_title = filename.split('/')[1] mail_title = mail_title.split('.')[0] # 创建一个带附件的实例 message = MIMEMultipart() message['From'] = sender message['To'] = receiver message['Subject'] = Header(mail_title, 'utf-8') # 邮件正文内容 message.attach(MIMEText(f'hello,附件为 {mail_title},请查收,如有问题请与我联系。', 'plain', 'utf-8')) # 构造附件 file_msg = MIMEText(open(filename, 'rb').read(), 'base64', 'UTF-8') file_msg["Content-Type"] = 'application/octet-stream;name="%s"' % make_header([(filename, 'UTF-8')]).encode('UTF-8') file_msg["Content-Disposition"] = 'attachment;filename= "%s"' % make_header([(filename, 'UTF-8')]).encode('UTF-8') message.attach(file_msg) try: smtpObj = smtplib.SMTP(smtpserver) smtpObj.starttls() smtpObj.login(user, password) smtpObj.sendmail(sender, receiver, message.as_string()) print("邮件发送成功!!!") except Exception as e: print(e) print("邮件发送失败!!!") finally: smtpObj.quit()def center_insert_img(doc, img,date_str): """插入图片""" date_obj = datetime.strptime(date_str, '%Y-%m-%d') # 将datetime对象格式化为指定的中文格式字符串 formatted_date = date_obj.strftime('%Y年%m月%d日') for paragraph in doc.paragraphs: # 根据文档中的占位符定位图片插入的位置 if '<datetime>' in paragraph.text: paragraph.text = paragraph.text.replace('<datetime>', formatted_date) elif '<img1>' in paragraph.text: # 把占位符去掉 paragraph.text = paragraph.text.replace('<img1>', '') # 添加一个文字块 run = paragraph.add_run('') # 添加一个’回车换行效果‘ run.add_break() # 添加图片并指定大小 run.add_picture(img[0], width=Inches(6.2)) elif '<img2>' in paragraph.text: # 把占位符去掉 paragraph.text = paragraph.text.replace('<img2>', '') # 添加一个文字块 run = paragraph.add_run('') # 添加一个’回车换行效果‘ run.add_break() # 添加图片并指定大小 run.add_picture(img[1], width=Inches(6.2))def save_img_to_doc(img,day_time): """把图片保存到doc文件中的指定位置""" tpl_doc = document_path current_year = datetime.now().year if not os.path.exists(f"{current_year}_doc"): os.makedirs(f"{current_year}_doc") res_doc = f'{current_year}_doc/日志报告_{day_time}.docx' # 打开模板文件 document = Document(tpl_doc) # 插入图片居中 center_insert_img(document, img,day_time) # 保存结果文件 document.save(res_doc) return res_docdef download_image(render_url, output_path, headers): """下载单张图片的函数""" try: response = requests.get(render_url, headers=headers, stream=True) if response.status_code == 200 and "image/png" in response.headers.get("Content-Type", ""): with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return output_path else: print(f"获取图形失败: {render_url}") print(f"状态码: {response.status_code}, 返回内容: {response.text}") return None except Exception as e: print(f"下载图片出错: {e}") return Nonedef merge_images(image_paths, output_path, layout): """合并图片""" images = [Image.open(path) for path in image_paths] width, height = images[0].size new_image = object if layout == "grid_3xN": num_rows = (len(images) + 2) // 3 new_image = Image.new('RGB', (3 * width, num_rows * height)) for idx, img in enumerate(images): x, y = (idx % 3) * width, (idx // 3) * height new_image.paste(img, (x, y)) elif layout == "grid_3x3": new_image = Image.new('RGB', (3 * width, 3 * height)) for idx, img in enumerate(images[:9]): x, y = (idx % 3) * width, (idx // 3) * height new_image.paste(img, (x, y)) new_image.save(output_path) new_image.close()def download_grafana_panel(): # start = time.time() time_tuple = generate_daily_timestamps_and_dates() image_paths = [] save_img_to_doc_paths = [] headers = {"Authorization": f"Bearer {API_TOKEN}"} tasks = [] # 创建下载目录 daily_folder = f"{folder}/{time_tuple[0]}" os.makedirs(daily_folder, exist_ok=True) # 使用线程池并行下载图片 # 此处的的URL仅供参考,I6xasdas要替换为你页面的实际值 # 也就是你点击Direct link rendered image后显示的静态监控图形页面的URL with ThreadPoolExecutor(max_workers=5) as executor: for PANEL_NAME, PANEL_ID_list in PANEL_ID_DATA.items(): for PANEL_ID in PANEL_ID_list: render_url = f'{GRAFANA_HOST}/render/d-solo/I6xasdas/{DASHBOARD_UID}?orgId=1&from={time_tuple[1]}&to={time_tuple[2]}&panelId={PANEL_ID}&width=1000&height=500&tz=Asia%2FShanghai' output_path = f"{daily_folder}/{DASHBOARD_UID}_panel_{PANEL_ID}.png" tasks.append(executor.submit(download_image, render_url, output_path, headers)) # 收集任务结果 for future in as_completed(tasks): result = future.result() if result: image_paths.append(result) # 合并图片 (第一组 3xN) if len(image_paths) > 0: output_path1 = f"{daily_folder}/panel_01.jpg" merge_images(image_paths[:13], output_path1, layout="grid_3xN") save_img_to_doc_paths.append(output_path1) # 合并图片 (第二组 3x3) if len(image_paths) > 13: output_path2 = f"{daily_folder}/panel_02.jpg" merge_images(image_paths[12:21], output_path2, layout="grid_3x3") save_img_to_doc_paths.append(output_path2) # 插入文档保存 doc_file_path = save_img_to_doc(save_img_to_doc_paths, time_tuple[0]) # 删除缓存图片 remove_dir(f"{folder}/{time_tuple[0]}") #发送邮件 send_mail(doc_file_path) # end_time = time.time() - start # print(f"巡检报告成功保存并删除图片缓存!耗时{round(end_time,2)}秒")def generate_daily_timestamps_and_dates(): current_date_time = datetime.now() # 提取日期部分并格式化为年-月-日的字符串形式 formatted_date = current_date_time.strftime('%Y-%m-%d') # 将输入的日期字符串转换为datetime对象 start_date = datetime.strptime(formatted_date, '%Y-%m-%d') last_day_start_date = start_date - timedelta(days=1) # 获取当天的起始时间(将时间设置为00:00:00) start_of_day = last_day_start_date.replace(hour=0, minute=0, second=0, microsecond=0) # 将datetime对象转换为时间戳(单位:秒) timestamp = int(start_of_day.timestamp()*1000) date_str = start_of_day.strftime('%Y-%m-%d') return (date_str, timestamp, timestamp + 86399*1000)def remove_dir(directory): try: shutil.rmtree(directory) except OSError as e: print(f"删除目录时出错: {e}") if e.errno == 32: # 文件正在被占用 print("文件被占用,等待释放后重试...") time.sleep(0.1) # 等待 0.1 秒后再次尝试 try: shutil.rmtree(directory) print(f"第二次尝试删除成功: {directory}") except Exception as e2: print(f"仍然无法删除目录: {e2}")if __name__ == "__main__": download_grafana_panel() |
|