34345 发表于 2025-2-10 22:50:17

爆改xxl-job:websocket版本的改造方案

之前改过一版xxl-job,解决了滥用netty的问题,文章链接:xxl-job滥用netty导致的问题和解决方案 ,后续基于此又做了一些优化修复了一些bug,但是还留下了一个待优化项:xxl-job客户端和服务端必须在同一个网段内,否则服务端调用客户端接口会调用失败。因为像是定时任务触发、日志查看等实际上都是通过服务端调用客户端restful接口实现的功能,这一直让我觉得如鲠在喉,想找机会使用websocket长连接来替代restful接口调用,这些天有空,用了两三天时间,终于把它改了个七七八八。
代码仓库地址:
xxl-job服务端:https://github.com/kdyzm/xxl-job
xxl-job客户端:https://gitee.com/kdyzm/xxljob-spring-boot-starter
注意,都要切换websocket分支
一、websocket版本改造难点总结

首先我的想法是“小改”,也就是说,核心的逻辑不变,只是更改http调用为一条websockt长连接调用,这样才能保证不出大的错,但是一上手,就发现了就算是“小改”,问题也不少。
1、同步改造异步

之前说过,客户端暴露了几个restful接口给服务端调用,现在要将restful接口全删掉,改用一条websocket连接,所有的请求都会从这条连接上发起,响应也会从这条websockt连接上返回。先看看restful接口调用时序图:
从上图中可以看得出来http调用很简单,它完全是一个串行化调用的过程,我问你答,然后将调用结果展示在页面上即可。而websocket只有一条连接,它是全双工双向通信的连接,可以同时发送和接收消息,消息的处理是基于事件驱动的,代码调用形式如下所示:
public class WebSocketServer{    /**   * 连接打开时被调用   */    @OnOpen    public void onOpen(Session session){            }      /**   * 连接关闭时被调用   */    @OnClose    public void onClose(Session session) {            }      /**   * 发生异常时被调用   */    @OnError    public void onError(Throwable error){            }      /**   * 收到消息时被调用   */    @OnMessage    public final void receiveMessage(String message){            }}websocket客户端和服务端的形式都如上代码所示,这就导致了所有的请求都是“异步”的,前端页面请求xxl-job-admin是restful接口,但是xxl-job-admin请求client是通过websocket,websocket接受消息是异步的,就算能回调结果,又怎么传达给xxl-job-admin调用点呢?
我在这里使用了LinkedBlockingQueue 队列来传输该消息,我们知道,BlockingQueue的特点就是在接收消息的时候有消息就接受消息,如果没有消息就阻塞等待,可以让xxl-job-admin在发送websocket消息之后立即在BlockingQueue上等待消息回调。完整的时序图交互如下所示
https://blog.kdyzm.cn/blog/public/2025/02/09/568a5f0b-6eef-4564-bf54-e2e80f89af87.png可以看出来,核心问题实际上是跨线程传参的问题,BlockingQueue非常适合做这个事情。
2、websocket重连

服务端使用了spring-boot-starter-websocket,客户端使用了Java-WebSocket组件,websocket重连实际上特指客户端重连服务端,重连可能会发生在服务端挂了,或者业务处理没处理好导致websocket连接被中断等情况。
Java-Websocket组件有重连方法reconnect,但是要求在新线程中调用,这样能够保证完全清理掉旧websocke连接遗留的数据以避免bug的产生。调用的时机选择在监听到websocket连接被关闭的时候调用的onClose方法,有趣的是reconnect方法如果调用失败会继续调用onClose方法,经过多次尝试,发现有无法避免的并发请求reconnect方法的情况,为了彻底解决该问题,这里使用了加锁+双重验证的方式
@Overridepublic void onClose(int i, String s, boolean b) {    log.info("websocket closed connection");    reconnectSelf(this);}private void reconnectSelf(ExecuteWebSocketClient client) {    if (client.isOpen()) {      return;    }    synchronized (this) {      if (client.isOpen()) {            return;      }      CompletableFuture.runAsync(() -> {            log.info("websocket reconnecting ...");            //reconnect方法失败会触发执行onClose方法            client.reconnect();      });      try {            TimeUnit.SECONDS.sleep(3);      } catch (InterruptedException e) {            log.error("", e);      }    }}二、websocket版本的使用

websocket版本的xxl-job也分为客户端和服务端,先说客户端
1、xxl-job客户端starter

我将xxl-job-core分离出来封装了xxljob-websocket-spring-boot-starter,并已经上传到了中央仓库,GAV坐标为
<dependency>    <groupId>cn.kdyzm</groupId>    <artifactId>xxljob-websocket-spring-boot-starter</artifactId>    <version>1.0.1</version></dependency>springboot项目引入该starter之后,需要引入以下配置文件
xxl:job:    admin:      addresses: ws://127.0.0.1:8080/xxl-job-admin    accessToken: default_token    executor:      title: xxl-job-client-demo      appname: xxl-job-client-demo      logpath: ./logs      logretentiondays: 30      port: 8080这和以前旧版的配置几乎是一模一样的,只是address地址形式从http协议变成了ws协议。
其它使用方式和旧版一模一样:
@Component@Slf4jpublic class TestHandler {    @XxlJob("test")    public void handle(String args) {      XxlJobHelper.log("处理参数:{}", args);      XxlJobHelper.handleSuccess("处理成功");    }}我写的一个demo已经集成了xxl-job starter,可以参考下:https://github.com/kdyzm/xxl-job/tree/websocket/xxl-job-client-demo
2、xxl-job服务端

需要先初始化数据库,运行https://github.com/kdyzm/xxl-job/blob/websocket/doc/db/tables_xxl_job.sql数据库脚本。
初始化数据库成功以后,就可以启动xxl-job-admin服务了,可以使用我打包好的docker镜像快速体验,使用docker-compose up -d命令运行以下docker-compose.yaml文件(注意修改yaml文件中的内容)
version: '2'services:xxl-job:    image: registry.cn-hangzhou.aliyuncs.com/kdyzm/xxl-job-websocket-admin:1.0.1    restart: always    container_name: xxl-job-websocket-admin    environment:      - SPRING_PROFILE_ACTIVE=docker      - SPRING_DATASOURCE_URL=jdbc:mysql://数据库地址:端口号/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai      - SPRING_DATASOURCE_USERNAME=数据库账号      - SPRING_DATASOURCE_PASSWORD=数据库密码      - SPRING_MAIL_HOST=smtp.163.com      - SPRING_MAIL_PORT=465      - SPRING_MAIL_USERNAME=邮箱账号      - SPRING_MAIL_FROM=邮箱账号      - SPRING_MAIL_PASSWORD=邮箱密码      - SPRING_MAIL_PROPERTIES_MAIL_SMTP_AUTH=true      - SPRING_MAIL_PROPERTIES_MAIL_SMTP_STARTTLS_ENABLE=true      - SPRING_MAIL_PROPERTIES_MAIL_SMTP_STARTTLS_REQUIRED=true      - SPRING_MAIL_PROPERTIES_MAIL_SMTP_SOCKETFACTORY_CLASS=javax.net.ssl.SSLSocketFactory    ports:      - 8083:8080也可以下载源码:https://github.com/kdyzm/xxl-job 运行xxl-job-admin自行体验下。
服务端启动之后,浏览器访问地址:http://localhost:8080/xxl-job-admin/ 进入管理页面,登录账号密码:admin/admin123
3、测试

可以使用我之前说的demo做客户端:https://github.com/kdyzm/xxl-job/tree/websocket/xxl-job-client-demo,运行之后,如果服务端打印如下日志,即表示websocket连接已经成功
https://blog.kdyzm.cn/blog/public/2025/02/09/7ec4d3ee-a0bd-4a3f-9a06-306c8cead168.png剩下的就可以自由做测试了。
测试下来基本没什么问题,有问题可以在github或者gitee代码仓库中给我反馈。
三、我的自省

经过了几天的时间改造,终于将其改造成了websocket版本,功能上虽然没什么问题,但是回头看看改造的过程和结果,我不禁对当初坚持要改造成websocket版本的决定有了深深的怀疑:

[*]websocket基于事件驱动的使用方式和消息队列非常像,为什么不使用消息队列呢?比如rabbitmq就是一个很好的选择,而且消息队列非常稳定能保证消息不丢失,不像websocket连接,发生点异常连接就断掉了,一段断掉就全挂了
[*]说到底,为什么xxl-job-admin不能和客户端在同一个局域网啊,在一个局域网很合理吧?像是nacos、eureka等注册中心不都是和客户端在同一个局域网吗?怎么它们可以xxl-job-admin就不行了,明明xxl-job-admin也自带服务注册功能啊,我这是不是钻牛角尖了?
我将两个代码仓库都单独新建了websocket分支,就是因为这个功能不能合并到master分支,保持现有的restful版本是最好的选择。
相对于以往我的博客长篇大论分析技术细节,这篇博客显得很短了,因为被自己气的不想写了。。。


最后,欢迎大家关注我的博客: ´͈ ᵕ `͈https://blog.kdyzm.cn
页: [1]
查看完整版本: 爆改xxl-job:websocket版本的改造方案