weimin 发表于 2025-2-7 01:49:20

solon 集成 rocketmq5 sdk

使用 rocketmq5 是比较简单的事情。也有些同学对 sdk 原始接口会陌生,会希望有个集成的示例。
<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-client-java</artifactId>    <version>${rocketmq5.version}</version></dependency>完整的集成代码参考:
https://gitee.com/opensolon/solon-examples/tree/main/b.Integration/demoB002-rocketmq5
希望更加简化使用的同学,可以使用:
rocketmq5-solon-cloud-plugin(但定制性会变差)
1、看看配置怎么搞?

使用 Solon 初始器 生成一个 Solon Web 模板项目,然后添加上面的 rocketmq5 依赖。

[*]添加 yml 配置(具体的配置属性,参考:ClientConfigurationBuilder,ProducerBuilder, PushConsumerBuilder)
solon.app:name: "demo-app"group: "demo"solon.logging:logger:    root:      level: INFO# 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考)solon.rocketmq:properties:#公共配置(配置项,参考:ClientConfigurationBuilder)    endpoints: "127.0.0.1:8081"    sessionCredentialsProvider:      "@type": "demoB002.SessionCredentialsProviderImpl" # solon 支持 "@type" 类型申明当前配置节的实例类型      accessKey: "xxx"      accessSecret: "xxx"      securityToken: "xxx"    requestTimeout: "10s"producer: #生产者专属配置(配置项,参考:ProducerBuilder)    maxAttempts: 3consumer: #消费者专属配置(配置项,参考:PushConsumerBuilder)    consumerGroup: "${solon.app.group}_${solon.app.name}"    consumptionThreadCount: 2    maxCacheMessageCount: 1    maxCacheMessageSizeInBytes: 1

[*]添加 java 配置器
@Configurationpublic class RocketmqConfig {    private ClientServiceProvider clientProvider = ClientServiceProvider.loadService();      @Bean    public ClientConfiguration client(@Inject("${solon.rocketmq.properties}") Properties common){      ClientConfigurationBuilder builder = ClientConfiguration.newBuilder();      //注入属性      Utils.injectProperties(builder, common);      return builder.build();    }    @Bean    public Producer producer(@Inject("${solon.rocketmq.producer}") Properties producer,                           ClientConfiguration clientConfiguration) throws ClientException {      ProducerBuilder producerBuilder = clientProvider.newProducerBuilder();      //注入属性      if (producer.size() > 0) {            Utils.injectProperties(producerBuilder, producer);      }      producerBuilder.setClientConfiguration(clientConfiguration);      return producerBuilder.build();    }    @Bean    public PushConsumer consumer(@Inject("${solon.rocketmq.consumer}") Properties consumer,                                 ClientConfiguration clientConfiguration,                                 MessageListener messageListener) throws ClientException{      //按需选择 PushConsumerBuilder 或 SimpleConsumerBuilder      PushConsumerBuilder consumerBuilder = clientProvider.newPushConsumerBuilder();      //注入属性      Utils.injectProperties(consumerBuilder, consumer);      Map<String, FilterExpression> subscriptionExpressions = new HashMap<>();      subscriptionExpressions.put("topic.test",new FilterExpression("*"));      consumerBuilder.setSubscriptionExpressions(subscriptionExpressions);      consumerBuilder.setClientConfiguration(clientConfiguration);      consumerBuilder.setMessageListener(messageListener);      return consumerBuilder.build();    }}//这个实现类,(相对于 StaticSessionCredentialsProvider)方便配置自动注入public class SessionCredentialsProviderImpl implements SessionCredentialsProvider {    private String accessKey;    private String accessSecret;    private String securityToken;    private SessionCredentials sessionCredentials;    @Override    public SessionCredentials getSessionCredentials() {      if (sessionCredentials == null) {            if (securityToken == null) {                sessionCredentials = new SessionCredentials(accessKey, accessSecret);            } else {                sessionCredentials = new SessionCredentials(accessKey, accessSecret, securityToken);            }      }      return sessionCredentials;    }}2、代码应用


[*]发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controllerpublic class DemoController {    @Inject    private Producer producer;    @Mapping("/send")    public void send(String msg) throws ClientException {      //发送      producer.send(new MessageBuilderImpl()                .setTopic("topic.test")                .setBody(msg.getBytes())                .build());    }}

[*]监听(或消费),这里采用订阅回调的方式:(仅供参考)
@Componentpublic class DemoMessageListener implements MessageListener {    @Override    public ConsumeResult consume(MessageView messageView) {      System.out.println(messageView);      return ConsumeResult.SUCCESS;    }}
页: [1]
查看完整版本: solon 集成 rocketmq5 sdk