qq5464642 发表于 2025-2-7 01:08:43

solon 集成 activemq-client (sdk)

原始状态的 activemq-client sdk 集成非常方便,也更适合定制。就是有些同学,可能对原始接口会比较陌生,会希望有个具体的示例。
<dependency>    <groupId>org.apache.activemq</groupId>    <artifactId>activemq-client</artifactId>    <version>${activemq.version}</version></dependency><dependency>    <groupId>org.apache.activemq</groupId>    <artifactId>activemq-pool</artifactId>    <version>${activemq.version}</version></dependency>希望更加简化使用的同学,可以使用:
activemq-solon-cloud-plugin(使用更简单,定制性弱些)
1、添加集成配置

先使用 Solon 初始器 先生成一个 Solon Web 模板项目,然后添加上面的 activemq-client 依赖。再做个配置约定(也可按需定义):

[*]"solon.activemq",作为配置前缀

[*]"properties",作为公共配置
[*]"producer",作为生态者专属配置(估计用不到)
[*]"consumer",作为消费者专属配置(估计用不到)

具体的配置属性,参考自:ActiveMQConnectionFactory
solon.app:name: "demo-app"group: "demo"# 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考)solon.activemq:properties:#公共配置(配置项,参考:ActiveMQConnectionFactory)    brokerURL: "failover:tcp://localhost:61616"    redeliveryPolicy:      initialRedeliveryDelay: 5000      backOffMultiplier: 2      useExponentialBackOff: true      maximumRedeliveries: -1      maximumRedeliveryDelay: 3600_000添加 java 配置器
@Configurationpublic class ActivemqConfig {    @Bean(destroyMethod = "stop")    public Connection client(@Inject("${solon.activemq.properties}") Props common) throws Exception {      String brokerURL = (String) common.remove("brokerURL");      String userName = (String) common.remove("userName");      String password = (String) common.remove("password");      ActiveMQConnectionFactory factory;      if (Utils.isEmpty(userName)) {            factory = new ActiveMQConnectionFactory(brokerURL);      } else {            factory = new ActiveMQConnectionFactory(brokerURL, userName, password);      }      //绑定额外的配置并创建连接      Connection connection = common.bindTo(factory).createConnection();      connection.start();      return connection;    }    @Bean    public IProducer producer(Connection connection) throws Exception {      return new IProducer(connection);    }    @Bean    public void consumer(Connection connection,                         MessageListener messageListener) throws Exception {      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);      Destination destination = session.createTopic("topic.test");      MessageConsumer consumer = session.createConsumer(destination);      consumer.setMessageListener(messageListener);    }}activemq 的消息发送的代码比较复杂,所以我们可以做个包装处理(用于上面的配置构建),临时命名为 IProducer:
public class IProducer {    private Connection connection;    public IProducer(Connection connection) {      this.connection = connection;    }    public void send(String topic, MessageBuilder messageBuilder) throws JMSException {      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);      Destination destination = session.createTopic(topic);      MessageProducer producer = session.createProducer(destination);      producer.send(destination, messageBuilder.build(session));    }    @FunctionalInterface    public static interface MessageBuilder {      Message build(Session session) throws JMSException;    }}3、代码应用

发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controllerpublic class DemoController {    @Inject    private IProducer producer;    @Mapping("/send")    public void send(String msg) throws Exception {      //发送      producer.send("topic.test", s -> s.createTextMessage("test"));    }}监听(或消费),这里采用订阅回调的方式:(仅供参考)
@Componentpublic class DemoMessageListener implements MessageListener {    @Override    public void onMessage(Message message) {      System.out.println(message);      RunUtil.runAndTry(message::acknowledge);    }}
页: [1]
查看完整版本: solon 集成 activemq-client (sdk)