1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| from prometheus_client import start_http_server, Gauge
import smtplib
import argparse
import time
# 创建一个 Gauge 类型的指标
EMAIL_AUTHENTICATION_SUCCESS = Gauge('email_authentication_success', 'Indicates if the email authentication was successful', ['email'])
def check_email_authentication(email, password, smtp_server, smtp_port):
try:
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
server.login(email, password)
# 如果登录成功,则更新指标为 1
EMAIL_AUTHENTICATION_SUCCESS.labels(email=email).set(1)
server.quit()
except Exception as e:
# 如果登录失败,则更新指标为 0
EMAIL_AUTHENTICATION_SUCCESS.labels(email=email).set(0)
print(f"Failed to authenticate email {email}: {e}")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Email Authentication Prometheus Exporter')
parser.add_argument('--email', required=True, help='Email address to authenticate')
parser.add_argument('--password', required=True, help='Password for the email account')
parser.add_argument('--smtp_server', default='smtp.gmail.com', help='SMTP server address')
parser.add_argument('--smtp_port', type=int, default=587, help='SMTP server port')
args = parser.parse_args()
# 启动 Prometheus HTTP 服务
start_http_server(8000)
print("Prometheus exporter is running on port 80. Use CTRL+C to exit.")
# 让程序保持运行
while True:
# 进行邮箱验证
check_email_authentication(args.email, args.password, args.smtp_server, args.smtp_port)
# 等待一小时,频率太快容易被ban?
time.sleep(3600)
|