From 253960ca73768650d08b37916a61954243aff1d2 Mon Sep 17 00:00:00 2001 From: Looly Date: Tue, 28 Jan 2025 03:43:17 +0800 Subject: [PATCH] fix code --- .../org/dromara/hutool/extra/mq/Consumer.java | 3 +- .../org/dromara/hutool/extra/mq/MQConfig.java | 133 ++++++++++++++++++ .../org/dromara/hutool/extra/mq/Message.java | 14 ++ .../hutool/extra/mq/engine/MQEngine.java | 9 ++ .../extra/mq/engine/MQEngineFactory.java | 45 ++++++ .../mq/engine/activemq/ActiveMQEngine.java | 73 +++++----- .../extra/mq/engine/jms/DestinationType.java | 34 +++++ .../hutool/extra/mq/engine/jms/JMSEngine.java | 105 ++++++++++++++ .../JMSProducer.java} | 68 +++++++-- .../JSMConsumer.java} | 8 +- .../extra/mq/engine/jms/package-info.java | 22 +++ .../extra/mq/engine/kafka/KafkaEngine.java | 51 ++++++- .../mq/engine/rabbitmq/RabbitMQConsumer.java | 49 ++++--- .../mq/engine/rabbitmq/RabbitMQEngine.java | 57 +++++++- .../mq/engine/rabbitmq/RabbitMQProducer.java | 35 ++++- ...rg.dromara.hutool.extra.mq.engine.MQEngine | 19 +++ 16 files changed, 652 insertions(+), 73 deletions(-) create mode 100644 hutool-extra/src/main/java/org/dromara/hutool/extra/mq/MQConfig.java create mode 100644 hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/DestinationType.java create mode 100644 hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/JMSEngine.java rename hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/{activemq/ActiveMQProducer.java => jms/JMSProducer.java} (54%) rename hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/{activemq/ActiveMQConsumer.java => jms/JSMConsumer.java} (92%) create mode 100644 hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/package-info.java create mode 100644 hutool-extra/src/main/resources/META-INF/services/org.dromara.hutool.extra.mq.engine.MQEngine diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Consumer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Consumer.java index f810c4b9a..0a07eef86 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Consumer.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Consumer.java @@ -39,9 +39,10 @@ public interface Consumer extends Closeable { * * @param messageHandler 消息处理器 */ + @SuppressWarnings("InfiniteLoopStatement") default void listen(final MessageHandler messageHandler) { ThreadUtil.execAsync(() -> { - while (true) { + for(;;) { this.subscribe(messageHandler); } }); diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/MQConfig.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/MQConfig.java new file mode 100644 index 000000000..9179dc7c8 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/MQConfig.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq; + +import org.dromara.hutool.extra.mq.engine.MQEngine; + +import java.io.Serializable; +import java.util.Properties; + +/** + * MQ配置 + * + * @author Looly + * @since 6.0.0 + */ +public class MQConfig implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * 创建配置 + * + * @param brokerUrl Broker地址 + * @return 配置 + */ + public static MQConfig of(final String brokerUrl) { + return new MQConfig(brokerUrl); + } + + private String brokerUrl; + private Properties properties; + /** + * 自定义引擎,当多个jar包引入时,可以自定使用的默认引擎 + */ + private Class customEngine; + + /** + * 构造 + * + * @param brokerUrl Broker地址 + */ + public MQConfig(final String brokerUrl) { + this.brokerUrl = brokerUrl; + } + + /** + * 获取Broker地址 + * + * @return Broker地址 + */ + public String getBrokerUrl() { + return brokerUrl; + } + + /** + * 设置Broker地址 + * + * @param brokerUrl Broker地址 + * @return this + */ + public MQConfig setBrokerUrl(final String brokerUrl) { + this.brokerUrl = brokerUrl; + return this; + } + + /** + * 获取配置 + * + * @return 配置 + */ + public Properties getProperties() { + return properties; + } + + /** + * 设置配置 + * + * @param properties 配置 + * @return this + */ + public MQConfig setProperties(final Properties properties) { + this.properties = properties; + return this; + } + + /** + * 添加配置项 + * + * @param key 键 + * @param value 值 + * @return this + */ + public Properties addProperty(final String key, final String value) { + if (null == this.properties) { + this.properties = new Properties(); + } + this.properties.setProperty(key, value); + return this.properties; + } + + /** + * 自定义引擎,当多个jar包引入时,可以自定使用的默认引擎 + * + * @return 自定义引擎 + */ + public Class getCustomEngine() { + return customEngine; + } + + /** + * 自定义引擎,当多个jar包引入时,可以自定使用的默认引擎 + * + * @param customEngine 自定义引擎 + * @return this + */ + public MQConfig setCustomEngine(final Class customEngine) { + this.customEngine = customEngine; + return this; + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Message.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Message.java index 4393a8185..0ee1d32db 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Message.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Message.java @@ -16,6 +16,10 @@ package org.dromara.hutool.extra.mq; +import org.dromara.hutool.core.text.StrUtil; + +import java.nio.charset.Charset; + /** * 消息接口 * @@ -37,4 +41,14 @@ public interface Message { * @return 内容 */ byte[] content(); + + /** + * 获取消息内容字符串 + * + * @param charset 编码 + * @return 内容字符串 + */ + default String contentStr(final Charset charset) { + return StrUtil.str(charset, charset); + } } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngine.java index 98e43d40a..065a07c99 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngine.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngine.java @@ -17,6 +17,7 @@ package org.dromara.hutool.extra.mq.engine; import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.MQConfig; import org.dromara.hutool.extra.mq.Producer; /** @@ -27,6 +28,14 @@ import org.dromara.hutool.extra.mq.Producer; */ public interface MQEngine { + /** + * 初始化配置 + * + * @param config 配置 + * @return this + */ + MQEngine init(MQConfig config); + /** * 获取消息生产者 * diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngineFactory.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngineFactory.java index ff75e40fe..2e5940cb6 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngineFactory.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngineFactory.java @@ -16,5 +16,50 @@ package org.dromara.hutool.extra.mq.engine; +import org.dromara.hutool.core.reflect.ConstructorUtil; +import org.dromara.hutool.core.spi.SpiUtil; +import org.dromara.hutool.extra.mq.MQConfig; +import org.dromara.hutool.extra.mq.MQException; + +/** + * MQ引擎工厂类 + * + * @author huangchengxing + * @since 1.0.0 + */ public class MQEngineFactory { + + /** + * 根据用户引入的MQ引擎jar,自动创建对应的模板引擎对象
+ * 推荐创建的引擎单例使用,此方法每次调用会返回新的引擎 + * + * @param config MQ配置 + * @return {@link MQEngine} + */ + public static MQEngine createEngine(final MQConfig config) { + return doCreateEngine(config); + } + + /** + * 根据用户引入的MQ引擎jar,自动创建对应的MQ引擎对象 + * + * @param config MQ配置 + * @return {@link MQEngine} + */ + private static MQEngine doCreateEngine(final MQConfig config) { + final Class customEngineClass = config.getCustomEngine(); + final MQEngine engine; + if (null != customEngineClass) { + // 自定义模板引擎 + engine = ConstructorUtil.newInstance(customEngineClass); + } else { + // SPI引擎查找 + engine = SpiUtil.loadFirstAvailable(MQEngine.class); + } + if (null != engine) { + return engine.init(config); + } + + throw new MQException("No MQ implement found! Please add one of MQ jar to your project !"); + } } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQEngine.java index e3b8cf77c..15b0d8dd6 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQEngine.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQEngine.java @@ -18,16 +18,10 @@ package org.dromara.hutool.extra.mq.engine.activemq; import jakarta.jms.Connection; import jakarta.jms.ConnectionFactory; -import jakarta.jms.JMSException; -import jakarta.jms.Session; -import org.dromara.hutool.core.io.IoUtil; -import org.dromara.hutool.extra.mq.Consumer; -import org.dromara.hutool.extra.mq.MQException; -import org.dromara.hutool.extra.mq.Producer; -import org.dromara.hutool.extra.mq.engine.MQEngine; - -import java.io.Closeable; -import java.io.IOException; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.dromara.hutool.core.lang.Assert; +import org.dromara.hutool.extra.mq.MQConfig; +import org.dromara.hutool.extra.mq.engine.jms.JMSEngine; /** * ActiveMQ引擎 @@ -35,44 +29,53 @@ import java.io.IOException; * @author Looly * @since 6.0.0 */ -public class ActiveMQEngine implements MQEngine, Closeable { +public class ActiveMQEngine extends JMSEngine { - private final Connection connection; + /** + * 默认构造 + */ + public ActiveMQEngine() { + super((Connection) null); + // SPI方式加载时检查库是否引入 + Assert.notNull(org.apache.activemq.ActiveMQConnectionFactory.class); + } + + /** + * 构造 + * + * @param config 配置 + */ + public ActiveMQEngine(final MQConfig config) { + super(createFactory(config)); + } /** * 构造 * * @param factory {@link ConnectionFactory} */ - public ActiveMQEngine(final ConnectionFactory factory) { - try { - this.connection = factory.createConnection(); - this.connection.start(); - } catch (final JMSException e) { - throw new MQException(e); - } + public ActiveMQEngine(final ActiveMQConnectionFactory factory) { + super(factory); } @Override - public Producer getProducer() { - return new ActiveMQProducer(createSession()); + public ActiveMQEngine init(final MQConfig config) { + super.init(createFactory(config)); + return this; } - @Override - public Consumer getConsumer() { - return new ActiveMQConsumer(createSession()); - } + /** + * 创建{@link ActiveMQConnectionFactory} + * + * @param config 配置 + * @return {@link ActiveMQConnectionFactory} + */ + private static ActiveMQConnectionFactory createFactory(final MQConfig config) { + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + factory.setBrokerURL(config.getBrokerUrl()); - private Session createSession() { - try { - return this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } catch (final JMSException e) { - throw new MQException(e); - } - } + // TODO 配置其他参数 - @Override - public void close() throws IOException { - IoUtil.closeQuietly(this.connection); + return factory; } } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/DestinationType.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/DestinationType.java new file mode 100644 index 000000000..6849f1bd1 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/DestinationType.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.jms; + +/** + * 目标类型 + * + * @author Looly + */ +public enum DestinationType { + /** + * 主题 + */ + TOPIC, + + /** + * 队列 + */ + QUEUE; +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/JMSEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/JMSEngine.java new file mode 100644 index 000000000..bab9e98a9 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/JMSEngine.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.jms; + +import jakarta.jms.Connection; +import jakarta.jms.ConnectionFactory; +import jakarta.jms.JMSException; +import jakarta.jms.Session; +import org.dromara.hutool.core.io.IoUtil; +import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.MQConfig; +import org.dromara.hutool.extra.mq.MQException; +import org.dromara.hutool.extra.mq.Producer; +import org.dromara.hutool.extra.mq.engine.MQEngine; + +import java.io.Closeable; +import java.io.IOException; + +/** + * ActiveMQ引擎 + * + * @author Looly + * @since 6.0.0 + */ +public class JMSEngine implements MQEngine, Closeable { + + private Connection connection; + + /** + * 构造 + * + * @param connection {@link Connection} + */ + public JMSEngine(final Connection connection) { + this.connection = connection; + } + + /** + * 构造 + * + * @param factory {@link ConnectionFactory} + */ + @SuppressWarnings("resource") + public JMSEngine(final ConnectionFactory factory) { + init(factory); + } + + @Override + public JMSEngine init(final MQConfig config) { + throw new MQException("Unsupported JMSEngine create by MQConfig!"); + } + + /** + * 初始化 + * + * @param factory {@link ConnectionFactory} + * @return this + */ + public JMSEngine init(final ConnectionFactory factory){ + try { + this.connection = factory.createConnection(); + this.connection.start(); + } catch (final JMSException e) { + throw new MQException(e); + } + return this; + } + + @Override + public Producer getProducer() { + return new JMSProducer(createSession()); + } + + @Override + public Consumer getConsumer() { + return new JSMConsumer(createSession()); + } + + private Session createSession() { + try { + return this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (final JMSException e) { + throw new MQException(e); + } + } + + @Override + public void close() throws IOException { + IoUtil.closeQuietly(this.connection); + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQProducer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/JMSProducer.java similarity index 54% rename from hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQProducer.java rename to hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/JMSProducer.java index 3802f9944..17a28e0fa 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQProducer.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/JMSProducer.java @@ -14,12 +14,9 @@ * limitations under the License. */ -package org.dromara.hutool.extra.mq.engine.activemq; +package org.dromara.hutool.extra.mq.engine.jms; -import jakarta.jms.BytesMessage; -import jakarta.jms.JMSException; -import jakarta.jms.MessageProducer; -import jakarta.jms.Session; +import jakarta.jms.*; import org.dromara.hutool.core.io.IoUtil; import org.dromara.hutool.extra.mq.MQException; import org.dromara.hutool.extra.mq.Message; @@ -33,7 +30,7 @@ import java.io.IOException; * @author Looly * @since 6.0.0 */ -public class ActiveMQProducer implements Producer { +public class JMSProducer implements Producer { private final Session session; private MessageProducer producer; @@ -43,7 +40,7 @@ public class ActiveMQProducer implements Producer { * * @param session Session */ - public ActiveMQProducer(final Session session) { + public JMSProducer(final Session session) { this.session = session; } @@ -53,12 +50,21 @@ public class ActiveMQProducer implements Producer { * @param topic 主题 * @return this */ - public ActiveMQProducer setTopic(final String topic) { - try { - this.producer = this.session.createProducer(this.session.createTopic(topic)); - } catch (final JMSException e) { - throw new MQException(e); - } + public JMSProducer setTopic(final String topic) { + final Destination destination = createDestination(topic, DestinationType.TOPIC); + this.producer = createProducer(destination); + return this; + } + + /** + * 设置队列 + * + * @param queue 队列 + * @return this + */ + public JMSProducer setQueue(final String queue) { + final Destination destination = createDestination(queue, DestinationType.QUEUE); + this.producer = createProducer(destination); return this; } @@ -78,4 +84,40 @@ public class ActiveMQProducer implements Producer { IoUtil.closeQuietly(this.producer); IoUtil.closeQuietly(this.session); } + + /** + * 创建消息生产者 + * + * @param destination 目的地 + * @return this + */ + private MessageProducer createProducer(final Destination destination) { + try { + return session.createProducer(destination); + } catch (final JMSException e) { + throw new MQException(e); + } + } + + /** + * 创建消息目的地 + * + * @param name 消息目的地名称 + * @param type 消息目的地类型 + * @return this + */ + private Destination createDestination(final String name, final DestinationType type) { + try { + switch (type){ + case QUEUE: + return session.createQueue(name); + case TOPIC: + return session.createTopic(name); + default: + throw new MQException("Unknown destination type: " + type); + } + } catch (final JMSException e) { + throw new MQException(e); + } + } } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQConsumer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/JSMConsumer.java similarity index 92% rename from hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQConsumer.java rename to hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/JSMConsumer.java index 58a028ff5..8399eaa79 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQConsumer.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/JSMConsumer.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.dromara.hutool.extra.mq.engine.activemq; +package org.dromara.hutool.extra.mq.engine.jms; import jakarta.jms.*; import org.dromara.hutool.core.io.IoUtil; @@ -30,7 +30,7 @@ import java.io.IOException; * @author Looly * @since 6.0.0 */ -public class ActiveMQConsumer implements Consumer { +public class JSMConsumer implements Consumer { private final Session session; private MessageConsumer consumer; @@ -40,7 +40,7 @@ public class ActiveMQConsumer implements Consumer { * * @param session Session */ - public ActiveMQConsumer(final Session session) { + public JSMConsumer(final Session session) { this.session = session; } @@ -50,7 +50,7 @@ public class ActiveMQConsumer implements Consumer { * @param topic 主题 * @return this */ - public ActiveMQConsumer setTopic(final String topic) { + public JSMConsumer setTopic(final String topic) { try { this.consumer = this.session.createConsumer(this.session.createTopic(topic)); } catch (final JMSException e) { diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/package-info.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/package-info.java new file mode 100644 index 000000000..486191811 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/jms/package-info.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * JMS(Java Message Service)消息队列引擎实现 + * + * @author Looly + */ +package org.dromara.hutool.extra.mq.engine.jms; diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaEngine.java index 3723cb52d..5cc6fed51 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaEngine.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaEngine.java @@ -16,7 +16,10 @@ package org.dromara.hutool.extra.mq.engine.kafka; +import org.apache.kafka.clients.CommonClientConfigs; +import org.dromara.hutool.core.lang.Assert; import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.MQConfig; import org.dromara.hutool.extra.mq.Producer; import org.dromara.hutool.extra.mq.engine.MQEngine; @@ -30,7 +33,24 @@ import java.util.Properties; */ public class KafkaEngine implements MQEngine { - private final Properties properties; + private Properties properties; + + /** + * 默认构造 + */ + public KafkaEngine() { + // SPI方式加载时检查库是否引入 + Assert.notNull(org.apache.kafka.clients.CommonClientConfigs.class); + } + + /** + * 构造 + * + * @param config 配置 + */ + public KafkaEngine(final MQConfig config) { + init(config); + } /** * 构造 @@ -38,7 +58,23 @@ public class KafkaEngine implements MQEngine { * @param properties 配置 */ public KafkaEngine(final Properties properties) { + init(properties); + } + + @Override + public KafkaEngine init(final MQConfig config) { + return init(buidProperties(config)); + } + + /** + * 初始化 + * + * @param properties 配置 + * @return this + */ + public KafkaEngine init(final Properties properties) { this.properties = properties; + return this; } /** @@ -62,4 +98,17 @@ public class KafkaEngine implements MQEngine { public Consumer getConsumer() { return new KafkaConsumer(this.properties); } + + /** + * 构建配置 + * + * @param config 配置 + * @return 配置 + */ + private static Properties buidProperties(final MQConfig config) { + final Properties properties = new Properties(); + properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, config.getBrokerUrl()); + properties.putAll(config.getProperties()); + return properties; + } } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQConsumer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQConsumer.java index fd3fbbb62..fbfcb25f6 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQConsumer.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQConsumer.java @@ -25,6 +25,7 @@ import org.dromara.hutool.extra.mq.Message; import org.dromara.hutool.extra.mq.MessageHandler; import java.io.IOException; +import java.util.Map; /** * RabbitMQ消费者 @@ -59,28 +60,24 @@ public class RabbitMQConsumer implements Consumer { @Override public void subscribe(final MessageHandler messageHandler) { - try { - this.channel.queueDeclare(this.topic, false, false, false, null); - } catch (final IOException e) { - throw new MQException(e); - } + queueDeclare(false, false, false, null); - final DeliverCallback deliverCallback = (consumerTag, delivery) -> { + final DeliverCallback deliverCallback = (consumerTag, delivery) -> messageHandler.handle(new Message() { - @Override - public String topic() { - return consumerTag; - } + @Override + public String topic() { + return consumerTag; + } - @Override - public byte[] content() { - return delivery.getBody(); - } - }); - }; + @Override + public byte[] content() { + return delivery.getBody(); + } + }); try { - this.channel.basicConsume(this.topic, true, deliverCallback, consumerTag -> { }); + this.channel.basicConsume(this.topic, true, deliverCallback, consumerTag -> { + }); } catch (final IOException e) { throw new MQException(e); } @@ -90,4 +87,22 @@ public class RabbitMQConsumer implements Consumer { public void close() { IoUtil.closeQuietly(this.channel); } + + /** + * 声明队列 + * + * @param durable 是否持久化 + * @param exclusive 是否排他 + * @param autoDelete 是否自动删除 + * @param arguments 其他参数 + */ + @SuppressWarnings("SameParameterValue") + private void queueDeclare(final boolean durable, final boolean exclusive, final boolean autoDelete, + final Map arguments) { + try { + this.channel.queueDeclare(this.topic, durable, exclusive, autoDelete, arguments); + } catch (final IOException e) { + throw new MQException(e); + } + } } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQEngine.java index 4e2c6ea72..3da99edf3 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQEngine.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQEngine.java @@ -20,7 +20,9 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.dromara.hutool.core.io.IoUtil; +import org.dromara.hutool.core.lang.Assert; import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.MQConfig; import org.dromara.hutool.extra.mq.MQException; import org.dromara.hutool.extra.mq.Producer; import org.dromara.hutool.extra.mq.engine.MQEngine; @@ -37,19 +39,53 @@ import java.util.concurrent.TimeoutException; */ public class RabbitMQEngine implements MQEngine, Closeable { - private final Connection connection; + private Connection connection; + + /** + * 默认构造 + */ + public RabbitMQEngine() { + // SPI方式加载时检查库是否引入 + Assert.notNull(com.rabbitmq.client.Connection.class); + } + + /** + * 构造 + * + * @param config 配置 + */ + public RabbitMQEngine(final MQConfig config){ + init(config); + } /** * 构造 * * @param factory 连接工厂 */ + @SuppressWarnings("resource") public RabbitMQEngine(final ConnectionFactory factory) { + init(factory); + } + + @Override + public RabbitMQEngine init(final MQConfig config) { + return init(createFactory(config)); + } + + /** + * 初始化 + * + * @param factory 连接工厂 + * @return this + */ + public RabbitMQEngine init(final ConnectionFactory factory){ try { this.connection = factory.newConnection(); } catch (final IOException | TimeoutException e) { throw new MQException(e); } + return this; } @Override @@ -79,4 +115,23 @@ public class RabbitMQEngine implements MQEngine, Closeable { throw new MQException(e); } } + + /** + * 创建连接工厂 + * + * @param config 配置 + * @return 连接工厂 + */ + private static ConnectionFactory createFactory(final MQConfig config) { + final ConnectionFactory factory = new ConnectionFactory(); + try { + factory.setUri(config.getBrokerUrl()); + } catch (final Exception e) { + throw new MQException(e); + } + + // TODO 配置其他参数 + + return factory; + } } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQProducer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQProducer.java index 15e947dee..2c6ab667a 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQProducer.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQProducer.java @@ -24,6 +24,7 @@ import org.dromara.hutool.extra.mq.Message; import org.dromara.hutool.extra.mq.Producer; import java.io.IOException; +import java.util.Map; /** * RabbitMQ消息生产者 @@ -34,6 +35,7 @@ import java.io.IOException; public class RabbitMQProducer implements Producer { private final Channel channel; + private String exchange = StrUtil.EMPTY; /** * 构造 @@ -44,10 +46,41 @@ public class RabbitMQProducer implements Producer { this.channel = channel; } + /** + * 设置交换器,默认为{@link StrUtil#EMPTY} + * + * @param exchange 交换器 + * @return this + */ + public RabbitMQProducer setExchange(final String exchange) { + this.exchange = exchange; + return this; + } + + /** + * 声明队列 + * + * @param queue 队列名 + * @param durable 是否持久化 + * @param exclusive 是否排他 + * @param autoDelete 是否自动删除 + * @param arguments 其他参数 + * @return this + */ + public RabbitMQProducer queueDeclare(final String queue, final boolean durable, final boolean exclusive, final boolean autoDelete, + final Map arguments) { + try { + this.channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments); + } catch (final IOException e) { + throw new MQException(e); + } + return this; + } + @Override public void send(final Message message) { try { - this.channel.basicPublish(StrUtil.EMPTY, message.topic(), null, message.content()); + this.channel.basicPublish(exchange, message.topic(), null, message.content()); } catch (final IOException e) { throw new MQException(e); } diff --git a/hutool-extra/src/main/resources/META-INF/services/org.dromara.hutool.extra.mq.engine.MQEngine b/hutool-extra/src/main/resources/META-INF/services/org.dromara.hutool.extra.mq.engine.MQEngine new file mode 100644 index 000000000..5c5691444 --- /dev/null +++ b/hutool-extra/src/main/resources/META-INF/services/org.dromara.hutool.extra.mq.engine.MQEngine @@ -0,0 +1,19 @@ +# +# Copyright (c) 2025 Hutool Team and hutool.cn +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.dromara.hutool.extra.mq.engine.kafka.KafkaEngine +org.dromara.hutool.extra.mq.engine.rabbitmq.RabbitMQEngine +org.dromara.hutool.extra.mq.engine.activemq.ActiveMQEngine