爆改xxl-job:websocket版本的改造方案
之前改过一版xxl-job,解决了滥用netty的问题,文章链接:xxl-job滥用netty导致的问题和解决方案 ,后续基于此又做了一些优化修复了一些bug,但是还留下了一个待优化项:xxl-job客户端和服务端必须在同一个网段内,否则服务端调用客户端接口会调用失败。因为像是定时任务触发、日志查看等实际上都是通过服务端调用客户端restful接口实现的功能,这一直让我觉得如鲠在喉,想找机会使用websocket长连接来替代restful接口调用,这些天有空,用了两三天时间,终于把它改了个七七八八。代码仓库地址:
xxl-job服务端:https://github.com/kdyzm/xxl-job
xxl-job客户端:https://gitee.com/kdyzm/xxljob-spring-boot-starter
注意,都要切换websocket分支
一、websocket版本改造难点总结
首先我的想法是“小改”,也就是说,核心的逻辑不变,只是更改http调用为一条websockt长连接调用,这样才能保证不出大的错,但是一上手,就发现了就算是“小改”,问题也不少。
1、同步改造异步
之前说过,客户端暴露了几个restful接口给服务端调用,现在要将restful接口全删掉,改用一条websocket连接,所有的请求都会从这条连接上发起,响应也会从这条websockt连接上返回。先看看restful接口调用时序图:
从上图中可以看得出来http调用很简单,它完全是一个串行化调用的过程,我问你答,然后将调用结果展示在页面上即可。而websocket只有一条连接,它是全双工双向通信的连接,可以同时发送和接收消息,消息的处理是基于事件驱动的,代码调用形式如下所示:
public class WebSocketServer{ /** * 连接打开时被调用 */ @OnOpen public void onOpen(Session session){ } /** * 连接关闭时被调用 */ @OnClose public void onClose(Session session) { } /** * 发生异常时被调用 */ @OnError public void onError(Throwable error){ } /** * 收到消息时被调用 */ @OnMessage public final void receiveMessage(String message){ }}websocket客户端和服务端的形式都如上代码所示,这就导致了所有的请求都是“异步”的,前端页面请求xxl-job-admin是restful接口,但是xxl-job-admin请求client是通过websocket,websocket接受消息是异步的,就算能回调结果,又怎么传达给xxl-job-admin调用点呢?
我在这里使用了LinkedBlockingQueue 队列来传输该消息,我们知道,BlockingQueue的特点就是在接收消息的时候有消息就接受消息,如果没有消息就阻塞等待,可以让xxl-job-admin在发送websocket消息之后立即在BlockingQueue上等待消息回调。完整的时序图交互如下所示
https://blog.kdyzm.cn/blog/public/2025/02/09/568a5f0b-6eef-4564-bf54-e2e80f89af87.png可以看出来,核心问题实际上是跨线程传参的问题,BlockingQueue非常适合做这个事情。
2、websocket重连
服务端使用了spring-boot-starter-websocket,客户端使用了Java-WebSocket组件,websocket重连实际上特指客户端重连服务端,重连可能会发生在服务端挂了,或者业务处理没处理好导致websocket连接被中断等情况。
Java-Websocket组件有重连方法reconnect,但是要求在新线程中调用,这样能够保证完全清理掉旧websocke连接遗留的数据以避免bug的产生。调用的时机选择在监听到websocket连接被关闭的时候调用的onClose方法,有趣的是reconnect方法如果调用失败会继续调用onClose方法,经过多次尝试,发现有无法避免的并发请求reconnect方法的情况,为了彻底解决该问题,这里使用了加锁+双重验证的方式
@Overridepublic void onClose(int i, String s, boolean b) { log.info("websocket closed connection"); reconnectSelf(this);}private void reconnectSelf(ExecuteWebSocketClient client) { if (client.isOpen()) { return; } synchronized (this) { if (client.isOpen()) { return; } CompletableFuture.runAsync(() -> { log.info("websocket reconnecting ..."); //reconnect方法失败会触发执行onClose方法 client.reconnect(); }); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { log.error("", e); } }}二、websocket版本的使用
websocket版本的xxl-job也分为客户端和服务端,先说客户端
1、xxl-job客户端starter
我将xxl-job-core分离出来封装了xxljob-websocket-spring-boot-starter,并已经上传到了中央仓库,GAV坐标为
<dependency> <groupId>cn.kdyzm</groupId> <artifactId>xxljob-websocket-spring-boot-starter</artifactId> <version>1.0.1</version></dependency>springboot项目引入该starter之后,需要引入以下配置文件
xxl:job: admin: addresses: ws://127.0.0.1:8080/xxl-job-admin accessToken: default_token executor: title: xxl-job-client-demo appname: xxl-job-client-demo logpath: ./logs logretentiondays: 30 port: 8080这和以前旧版的配置几乎是一模一样的,只是address地址形式从http协议变成了ws协议。
其它使用方式和旧版一模一样:
@Component@Slf4jpublic class TestHandler { @XxlJob("test") public void handle(String args) { XxlJobHelper.log("处理参数:{}", args); XxlJobHelper.handleSuccess("处理成功"); }}我写的一个demo已经集成了xxl-job starter,可以参考下:https://github.com/kdyzm/xxl-job/tree/websocket/xxl-job-client-demo
2、xxl-job服务端
需要先初始化数据库,运行https://github.com/kdyzm/xxl-job/blob/websocket/doc/db/tables_xxl_job.sql数据库脚本。
初始化数据库成功以后,就可以启动xxl-job-admin服务了,可以使用我打包好的docker镜像快速体验,使用docker-compose up -d命令运行以下docker-compose.yaml文件(注意修改yaml文件中的内容)
version: '2'services:xxl-job: image: registry.cn-hangzhou.aliyuncs.com/kdyzm/xxl-job-websocket-admin:1.0.1 restart: always container_name: xxl-job-websocket-admin environment: - SPRING_PROFILE_ACTIVE=docker - SPRING_DATASOURCE_URL=jdbc:mysql://数据库地址:端口号/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai - SPRING_DATASOURCE_USERNAME=数据库账号 - SPRING_DATASOURCE_PASSWORD=数据库密码 - SPRING_MAIL_HOST=smtp.163.com - SPRING_MAIL_PORT=465 - SPRING_MAIL_USERNAME=邮箱账号 - SPRING_MAIL_FROM=邮箱账号 - SPRING_MAIL_PASSWORD=邮箱密码 - SPRING_MAIL_PROPERTIES_MAIL_SMTP_AUTH=true - SPRING_MAIL_PROPERTIES_MAIL_SMTP_STARTTLS_ENABLE=true - SPRING_MAIL_PROPERTIES_MAIL_SMTP_STARTTLS_REQUIRED=true - SPRING_MAIL_PROPERTIES_MAIL_SMTP_SOCKETFACTORY_CLASS=javax.net.ssl.SSLSocketFactory ports: - 8083:8080也可以下载源码:https://github.com/kdyzm/xxl-job 运行xxl-job-admin自行体验下。
服务端启动之后,浏览器访问地址:http://localhost:8080/xxl-job-admin/ 进入管理页面,登录账号密码:admin/admin123
3、测试
可以使用我之前说的demo做客户端:https://github.com/kdyzm/xxl-job/tree/websocket/xxl-job-client-demo,运行之后,如果服务端打印如下日志,即表示websocket连接已经成功
https://blog.kdyzm.cn/blog/public/2025/02/09/7ec4d3ee-a0bd-4a3f-9a06-306c8cead168.png剩下的就可以自由做测试了。
测试下来基本没什么问题,有问题可以在github或者gitee代码仓库中给我反馈。
三、我的自省
经过了几天的时间改造,终于将其改造成了websocket版本,功能上虽然没什么问题,但是回头看看改造的过程和结果,我不禁对当初坚持要改造成websocket版本的决定有了深深的怀疑:
[*]websocket基于事件驱动的使用方式和消息队列非常像,为什么不使用消息队列呢?比如rabbitmq就是一个很好的选择,而且消息队列非常稳定能保证消息不丢失,不像websocket连接,发生点异常连接就断掉了,一段断掉就全挂了
[*]说到底,为什么xxl-job-admin不能和客户端在同一个局域网啊,在一个局域网很合理吧?像是nacos、eureka等注册中心不都是和客户端在同一个局域网吗?怎么它们可以xxl-job-admin就不行了,明明xxl-job-admin也自带服务注册功能啊,我这是不是钻牛角尖了?
我将两个代码仓库都单独新建了websocket分支,就是因为这个功能不能合并到master分支,保持现有的restful版本是最好的选择。
相对于以往我的博客长篇大论分析技术细节,这篇博客显得很短了,因为被自己气的不想写了。。。
最后,欢迎大家关注我的博客: ´͈ ᵕ `͈https://blog.kdyzm.cn
页:
[1]