solon 集成 rocketmq5 sdk
使用 rocketmq5 是比较简单的事情。也有些同学对 sdk 原始接口会陌生,会希望有个集成的示例。<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>${rocketmq5.version}</version></dependency>完整的集成代码参考:
https://gitee.com/opensolon/solon-examples/tree/main/b.Integration/demoB002-rocketmq5
希望更加简化使用的同学,可以使用:
rocketmq5-solon-cloud-plugin(但定制性会变差)
1、看看配置怎么搞?
使用 Solon 初始器 生成一个 Solon Web 模板项目,然后添加上面的 rocketmq5 依赖。
[*]添加 yml 配置(具体的配置属性,参考:ClientConfigurationBuilder,ProducerBuilder, PushConsumerBuilder)
solon.app:name: "demo-app"group: "demo"solon.logging:logger: root: level: INFO# 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考)solon.rocketmq:properties:#公共配置(配置项,参考:ClientConfigurationBuilder) endpoints: "127.0.0.1:8081" sessionCredentialsProvider: "@type": "demoB002.SessionCredentialsProviderImpl" # solon 支持 "@type" 类型申明当前配置节的实例类型 accessKey: "xxx" accessSecret: "xxx" securityToken: "xxx" requestTimeout: "10s"producer: #生产者专属配置(配置项,参考:ProducerBuilder) maxAttempts: 3consumer: #消费者专属配置(配置项,参考:PushConsumerBuilder) consumerGroup: "${solon.app.group}_${solon.app.name}" consumptionThreadCount: 2 maxCacheMessageCount: 1 maxCacheMessageSizeInBytes: 1
[*]添加 java 配置器
@Configurationpublic class RocketmqConfig { private ClientServiceProvider clientProvider = ClientServiceProvider.loadService(); @Bean public ClientConfiguration client(@Inject("${solon.rocketmq.properties}") Properties common){ ClientConfigurationBuilder builder = ClientConfiguration.newBuilder(); //注入属性 Utils.injectProperties(builder, common); return builder.build(); } @Bean public Producer producer(@Inject("${solon.rocketmq.producer}") Properties producer, ClientConfiguration clientConfiguration) throws ClientException { ProducerBuilder producerBuilder = clientProvider.newProducerBuilder(); //注入属性 if (producer.size() > 0) { Utils.injectProperties(producerBuilder, producer); } producerBuilder.setClientConfiguration(clientConfiguration); return producerBuilder.build(); } @Bean public PushConsumer consumer(@Inject("${solon.rocketmq.consumer}") Properties consumer, ClientConfiguration clientConfiguration, MessageListener messageListener) throws ClientException{ //按需选择 PushConsumerBuilder 或 SimpleConsumerBuilder PushConsumerBuilder consumerBuilder = clientProvider.newPushConsumerBuilder(); //注入属性 Utils.injectProperties(consumerBuilder, consumer); Map<String, FilterExpression> subscriptionExpressions = new HashMap<>(); subscriptionExpressions.put("topic.test",new FilterExpression("*")); consumerBuilder.setSubscriptionExpressions(subscriptionExpressions); consumerBuilder.setClientConfiguration(clientConfiguration); consumerBuilder.setMessageListener(messageListener); return consumerBuilder.build(); }}//这个实现类,(相对于 StaticSessionCredentialsProvider)方便配置自动注入public class SessionCredentialsProviderImpl implements SessionCredentialsProvider { private String accessKey; private String accessSecret; private String securityToken; private SessionCredentials sessionCredentials; @Override public SessionCredentials getSessionCredentials() { if (sessionCredentials == null) { if (securityToken == null) { sessionCredentials = new SessionCredentials(accessKey, accessSecret); } else { sessionCredentials = new SessionCredentials(accessKey, accessSecret, securityToken); } } return sessionCredentials; }}2、代码应用
[*]发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controllerpublic class DemoController { @Inject private Producer producer; @Mapping("/send") public void send(String msg) throws ClientException { //发送 producer.send(new MessageBuilderImpl() .setTopic("topic.test") .setBody(msg.getBytes()) .build()); }}
[*]监听(或消费),这里采用订阅回调的方式:(仅供参考)
@Componentpublic class DemoMessageListener implements MessageListener { @Override public ConsumeResult consume(MessageView messageView) { System.out.println(messageView); return ConsumeResult.SUCCESS; }}
页:
[1]