This commit is contained in:
Looly 2025-01-28 03:43:17 +08:00
parent fea52b5e82
commit 253960ca73
16 changed files with 652 additions and 73 deletions

View File

@ -39,9 +39,10 @@ public interface Consumer extends Closeable {
*
* @param messageHandler 消息处理器
*/
@SuppressWarnings("InfiniteLoopStatement")
default void listen(final MessageHandler messageHandler) {
ThreadUtil.execAsync(() -> {
while (true) {
for(;;) {
this.subscribe(messageHandler);
}
});

View File

@ -0,0 +1,133 @@
/*
* Copyright (c) 2025 Hutool Team and hutool.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.hutool.extra.mq;
import org.dromara.hutool.extra.mq.engine.MQEngine;
import java.io.Serializable;
import java.util.Properties;
/**
* MQ配置
*
* @author Looly
* @since 6.0.0
*/
public class MQConfig implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 创建配置
*
* @param brokerUrl Broker地址
* @return 配置
*/
public static MQConfig of(final String brokerUrl) {
return new MQConfig(brokerUrl);
}
private String brokerUrl;
private Properties properties;
/**
* 自定义引擎当多个jar包引入时可以自定使用的默认引擎
*/
private Class<? extends MQEngine> customEngine;
/**
* 构造
*
* @param brokerUrl Broker地址
*/
public MQConfig(final String brokerUrl) {
this.brokerUrl = brokerUrl;
}
/**
* 获取Broker地址
*
* @return Broker地址
*/
public String getBrokerUrl() {
return brokerUrl;
}
/**
* 设置Broker地址
*
* @param brokerUrl Broker地址
* @return this
*/
public MQConfig setBrokerUrl(final String brokerUrl) {
this.brokerUrl = brokerUrl;
return this;
}
/**
* 获取配置
*
* @return 配置
*/
public Properties getProperties() {
return properties;
}
/**
* 设置配置
*
* @param properties 配置
* @return this
*/
public MQConfig setProperties(final Properties properties) {
this.properties = properties;
return this;
}
/**
* 添加配置项
*
* @param key
* @param value
* @return this
*/
public Properties addProperty(final String key, final String value) {
if (null == this.properties) {
this.properties = new Properties();
}
this.properties.setProperty(key, value);
return this.properties;
}
/**
* 自定义引擎当多个jar包引入时可以自定使用的默认引擎
*
* @return 自定义引擎
*/
public Class<? extends MQEngine> getCustomEngine() {
return customEngine;
}
/**
* 自定义引擎当多个jar包引入时可以自定使用的默认引擎
*
* @param customEngine 自定义引擎
* @return this
*/
public MQConfig setCustomEngine(final Class<? extends MQEngine> customEngine) {
this.customEngine = customEngine;
return this;
}
}

View File

@ -16,6 +16,10 @@
package org.dromara.hutool.extra.mq;
import org.dromara.hutool.core.text.StrUtil;
import java.nio.charset.Charset;
/**
* 消息接口
*
@ -37,4 +41,14 @@ public interface Message {
* @return 内容
*/
byte[] content();
/**
* 获取消息内容字符串
*
* @param charset 编码
* @return 内容字符串
*/
default String contentStr(final Charset charset) {
return StrUtil.str(charset, charset);
}
}

View File

@ -17,6 +17,7 @@
package org.dromara.hutool.extra.mq.engine;
import org.dromara.hutool.extra.mq.Consumer;
import org.dromara.hutool.extra.mq.MQConfig;
import org.dromara.hutool.extra.mq.Producer;
/**
@ -27,6 +28,14 @@ import org.dromara.hutool.extra.mq.Producer;
*/
public interface MQEngine {
/**
* 初始化配置
*
* @param config 配置
* @return this
*/
MQEngine init(MQConfig config);
/**
* 获取消息生产者
*

View File

@ -16,5 +16,50 @@
package org.dromara.hutool.extra.mq.engine;
import org.dromara.hutool.core.reflect.ConstructorUtil;
import org.dromara.hutool.core.spi.SpiUtil;
import org.dromara.hutool.extra.mq.MQConfig;
import org.dromara.hutool.extra.mq.MQException;
/**
* MQ引擎工厂类
*
* @author huangchengxing
* @since 1.0.0
*/
public class MQEngineFactory {
/**
* 根据用户引入的MQ引擎jar自动创建对应的模板引擎对象<br>
* 推荐创建的引擎单例使用此方法每次调用会返回新的引擎
*
* @param config MQ配置
* @return {@link MQEngine}
*/
public static MQEngine createEngine(final MQConfig config) {
return doCreateEngine(config);
}
/**
* 根据用户引入的MQ引擎jar自动创建对应的MQ引擎对象
*
* @param config MQ配置
* @return {@link MQEngine}
*/
private static MQEngine doCreateEngine(final MQConfig config) {
final Class<? extends MQEngine> customEngineClass = config.getCustomEngine();
final MQEngine engine;
if (null != customEngineClass) {
// 自定义模板引擎
engine = ConstructorUtil.newInstance(customEngineClass);
} else {
// SPI引擎查找
engine = SpiUtil.loadFirstAvailable(MQEngine.class);
}
if (null != engine) {
return engine.init(config);
}
throw new MQException("No MQ implement found! Please add one of MQ jar to your project !");
}
}

View File

@ -18,16 +18,10 @@ package org.dromara.hutool.extra.mq.engine.activemq;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Session;
import org.dromara.hutool.core.io.IoUtil;
import org.dromara.hutool.extra.mq.Consumer;
import org.dromara.hutool.extra.mq.MQException;
import org.dromara.hutool.extra.mq.Producer;
import org.dromara.hutool.extra.mq.engine.MQEngine;
import java.io.Closeable;
import java.io.IOException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.dromara.hutool.core.lang.Assert;
import org.dromara.hutool.extra.mq.MQConfig;
import org.dromara.hutool.extra.mq.engine.jms.JMSEngine;
/**
* ActiveMQ引擎
@ -35,44 +29,53 @@ import java.io.IOException;
* @author Looly
* @since 6.0.0
*/
public class ActiveMQEngine implements MQEngine, Closeable {
public class ActiveMQEngine extends JMSEngine {
private final Connection connection;
/**
* 默认构造
*/
public ActiveMQEngine() {
super((Connection) null);
// SPI方式加载时检查库是否引入
Assert.notNull(org.apache.activemq.ActiveMQConnectionFactory.class);
}
/**
* 构造
*
* @param config 配置
*/
public ActiveMQEngine(final MQConfig config) {
super(createFactory(config));
}
/**
* 构造
*
* @param factory {@link ConnectionFactory}
*/
public ActiveMQEngine(final ConnectionFactory factory) {
try {
this.connection = factory.createConnection();
this.connection.start();
} catch (final JMSException e) {
throw new MQException(e);
}
public ActiveMQEngine(final ActiveMQConnectionFactory factory) {
super(factory);
}
@Override
public Producer getProducer() {
return new ActiveMQProducer(createSession());
public ActiveMQEngine init(final MQConfig config) {
super.init(createFactory(config));
return this;
}
@Override
public Consumer getConsumer() {
return new ActiveMQConsumer(createSession());
}
/**
* 创建{@link ActiveMQConnectionFactory}
*
* @param config 配置
* @return {@link ActiveMQConnectionFactory}
*/
private static ActiveMQConnectionFactory createFactory(final MQConfig config) {
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(config.getBrokerUrl());
private Session createSession() {
try {
return this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (final JMSException e) {
throw new MQException(e);
}
}
// TODO 配置其他参数
@Override
public void close() throws IOException {
IoUtil.closeQuietly(this.connection);
return factory;
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright (c) 2025 Hutool Team and hutool.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.hutool.extra.mq.engine.jms;
/**
* 目标类型
*
* @author Looly
*/
public enum DestinationType {
/**
* 主题
*/
TOPIC,
/**
* 队列
*/
QUEUE;
}

View File

@ -0,0 +1,105 @@
/*
* Copyright (c) 2025 Hutool Team and hutool.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.hutool.extra.mq.engine.jms;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Session;
import org.dromara.hutool.core.io.IoUtil;
import org.dromara.hutool.extra.mq.Consumer;
import org.dromara.hutool.extra.mq.MQConfig;
import org.dromara.hutool.extra.mq.MQException;
import org.dromara.hutool.extra.mq.Producer;
import org.dromara.hutool.extra.mq.engine.MQEngine;
import java.io.Closeable;
import java.io.IOException;
/**
* ActiveMQ引擎
*
* @author Looly
* @since 6.0.0
*/
public class JMSEngine implements MQEngine, Closeable {
private Connection connection;
/**
* 构造
*
* @param connection {@link Connection}
*/
public JMSEngine(final Connection connection) {
this.connection = connection;
}
/**
* 构造
*
* @param factory {@link ConnectionFactory}
*/
@SuppressWarnings("resource")
public JMSEngine(final ConnectionFactory factory) {
init(factory);
}
@Override
public JMSEngine init(final MQConfig config) {
throw new MQException("Unsupported JMSEngine create by MQConfig!");
}
/**
* 初始化
*
* @param factory {@link ConnectionFactory}
* @return this
*/
public JMSEngine init(final ConnectionFactory factory){
try {
this.connection = factory.createConnection();
this.connection.start();
} catch (final JMSException e) {
throw new MQException(e);
}
return this;
}
@Override
public Producer getProducer() {
return new JMSProducer(createSession());
}
@Override
public Consumer getConsumer() {
return new JSMConsumer(createSession());
}
private Session createSession() {
try {
return this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (final JMSException e) {
throw new MQException(e);
}
}
@Override
public void close() throws IOException {
IoUtil.closeQuietly(this.connection);
}
}

View File

@ -14,12 +14,9 @@
* limitations under the License.
*/
package org.dromara.hutool.extra.mq.engine.activemq;
package org.dromara.hutool.extra.mq.engine.jms;
import jakarta.jms.BytesMessage;
import jakarta.jms.JMSException;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.*;
import org.dromara.hutool.core.io.IoUtil;
import org.dromara.hutool.extra.mq.MQException;
import org.dromara.hutool.extra.mq.Message;
@ -33,7 +30,7 @@ import java.io.IOException;
* @author Looly
* @since 6.0.0
*/
public class ActiveMQProducer implements Producer {
public class JMSProducer implements Producer {
private final Session session;
private MessageProducer producer;
@ -43,7 +40,7 @@ public class ActiveMQProducer implements Producer {
*
* @param session Session
*/
public ActiveMQProducer(final Session session) {
public JMSProducer(final Session session) {
this.session = session;
}
@ -53,12 +50,21 @@ public class ActiveMQProducer implements Producer {
* @param topic 主题
* @return this
*/
public ActiveMQProducer setTopic(final String topic) {
try {
this.producer = this.session.createProducer(this.session.createTopic(topic));
} catch (final JMSException e) {
throw new MQException(e);
}
public JMSProducer setTopic(final String topic) {
final Destination destination = createDestination(topic, DestinationType.TOPIC);
this.producer = createProducer(destination);
return this;
}
/**
* 设置队列
*
* @param queue 队列
* @return this
*/
public JMSProducer setQueue(final String queue) {
final Destination destination = createDestination(queue, DestinationType.QUEUE);
this.producer = createProducer(destination);
return this;
}
@ -78,4 +84,40 @@ public class ActiveMQProducer implements Producer {
IoUtil.closeQuietly(this.producer);
IoUtil.closeQuietly(this.session);
}
/**
* 创建消息生产者
*
* @param destination 目的地
* @return this
*/
private MessageProducer createProducer(final Destination destination) {
try {
return session.createProducer(destination);
} catch (final JMSException e) {
throw new MQException(e);
}
}
/**
* 创建消息目的地
*
* @param name 消息目的地名称
* @param type 消息目的地类型
* @return this
*/
private Destination createDestination(final String name, final DestinationType type) {
try {
switch (type){
case QUEUE:
return session.createQueue(name);
case TOPIC:
return session.createTopic(name);
default:
throw new MQException("Unknown destination type: " + type);
}
} catch (final JMSException e) {
throw new MQException(e);
}
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.dromara.hutool.extra.mq.engine.activemq;
package org.dromara.hutool.extra.mq.engine.jms;
import jakarta.jms.*;
import org.dromara.hutool.core.io.IoUtil;
@ -30,7 +30,7 @@ import java.io.IOException;
* @author Looly
* @since 6.0.0
*/
public class ActiveMQConsumer implements Consumer {
public class JSMConsumer implements Consumer {
private final Session session;
private MessageConsumer consumer;
@ -40,7 +40,7 @@ public class ActiveMQConsumer implements Consumer {
*
* @param session Session
*/
public ActiveMQConsumer(final Session session) {
public JSMConsumer(final Session session) {
this.session = session;
}
@ -50,7 +50,7 @@ public class ActiveMQConsumer implements Consumer {
* @param topic 主题
* @return this
*/
public ActiveMQConsumer setTopic(final String topic) {
public JSMConsumer setTopic(final String topic) {
try {
this.consumer = this.session.createConsumer(this.session.createTopic(topic));
} catch (final JMSException e) {

View File

@ -0,0 +1,22 @@
/*
* Copyright (c) 2025 Hutool Team and hutool.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* JMS(Java Message Service)消息队列引擎实现
*
* @author Looly
*/
package org.dromara.hutool.extra.mq.engine.jms;

View File

@ -16,7 +16,10 @@
package org.dromara.hutool.extra.mq.engine.kafka;
import org.apache.kafka.clients.CommonClientConfigs;
import org.dromara.hutool.core.lang.Assert;
import org.dromara.hutool.extra.mq.Consumer;
import org.dromara.hutool.extra.mq.MQConfig;
import org.dromara.hutool.extra.mq.Producer;
import org.dromara.hutool.extra.mq.engine.MQEngine;
@ -30,7 +33,24 @@ import java.util.Properties;
*/
public class KafkaEngine implements MQEngine {
private final Properties properties;
private Properties properties;
/**
* 默认构造
*/
public KafkaEngine() {
// SPI方式加载时检查库是否引入
Assert.notNull(org.apache.kafka.clients.CommonClientConfigs.class);
}
/**
* 构造
*
* @param config 配置
*/
public KafkaEngine(final MQConfig config) {
init(config);
}
/**
* 构造
@ -38,7 +58,23 @@ public class KafkaEngine implements MQEngine {
* @param properties 配置
*/
public KafkaEngine(final Properties properties) {
init(properties);
}
@Override
public KafkaEngine init(final MQConfig config) {
return init(buidProperties(config));
}
/**
* 初始化
*
* @param properties 配置
* @return this
*/
public KafkaEngine init(final Properties properties) {
this.properties = properties;
return this;
}
/**
@ -62,4 +98,17 @@ public class KafkaEngine implements MQEngine {
public Consumer getConsumer() {
return new KafkaConsumer(this.properties);
}
/**
* 构建配置
*
* @param config 配置
* @return 配置
*/
private static Properties buidProperties(final MQConfig config) {
final Properties properties = new Properties();
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, config.getBrokerUrl());
properties.putAll(config.getProperties());
return properties;
}
}

View File

@ -25,6 +25,7 @@ import org.dromara.hutool.extra.mq.Message;
import org.dromara.hutool.extra.mq.MessageHandler;
import java.io.IOException;
import java.util.Map;
/**
* RabbitMQ消费者
@ -59,28 +60,24 @@ public class RabbitMQConsumer implements Consumer {
@Override
public void subscribe(final MessageHandler messageHandler) {
try {
this.channel.queueDeclare(this.topic, false, false, false, null);
} catch (final IOException e) {
throw new MQException(e);
}
queueDeclare(false, false, false, null);
final DeliverCallback deliverCallback = (consumerTag, delivery) -> {
final DeliverCallback deliverCallback = (consumerTag, delivery) ->
messageHandler.handle(new Message() {
@Override
public String topic() {
return consumerTag;
}
@Override
public String topic() {
return consumerTag;
}
@Override
public byte[] content() {
return delivery.getBody();
}
});
};
@Override
public byte[] content() {
return delivery.getBody();
}
});
try {
this.channel.basicConsume(this.topic, true, deliverCallback, consumerTag -> { });
this.channel.basicConsume(this.topic, true, deliverCallback, consumerTag -> {
});
} catch (final IOException e) {
throw new MQException(e);
}
@ -90,4 +87,22 @@ public class RabbitMQConsumer implements Consumer {
public void close() {
IoUtil.closeQuietly(this.channel);
}
/**
* 声明队列
*
* @param durable 是否持久化
* @param exclusive 是否排他
* @param autoDelete 是否自动删除
* @param arguments 其他参数
*/
@SuppressWarnings("SameParameterValue")
private void queueDeclare(final boolean durable, final boolean exclusive, final boolean autoDelete,
final Map<String, Object> arguments) {
try {
this.channel.queueDeclare(this.topic, durable, exclusive, autoDelete, arguments);
} catch (final IOException e) {
throw new MQException(e);
}
}
}

View File

@ -20,7 +20,9 @@ import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.dromara.hutool.core.io.IoUtil;
import org.dromara.hutool.core.lang.Assert;
import org.dromara.hutool.extra.mq.Consumer;
import org.dromara.hutool.extra.mq.MQConfig;
import org.dromara.hutool.extra.mq.MQException;
import org.dromara.hutool.extra.mq.Producer;
import org.dromara.hutool.extra.mq.engine.MQEngine;
@ -37,19 +39,53 @@ import java.util.concurrent.TimeoutException;
*/
public class RabbitMQEngine implements MQEngine, Closeable {
private final Connection connection;
private Connection connection;
/**
* 默认构造
*/
public RabbitMQEngine() {
// SPI方式加载时检查库是否引入
Assert.notNull(com.rabbitmq.client.Connection.class);
}
/**
* 构造
*
* @param config 配置
*/
public RabbitMQEngine(final MQConfig config){
init(config);
}
/**
* 构造
*
* @param factory 连接工厂
*/
@SuppressWarnings("resource")
public RabbitMQEngine(final ConnectionFactory factory) {
init(factory);
}
@Override
public RabbitMQEngine init(final MQConfig config) {
return init(createFactory(config));
}
/**
* 初始化
*
* @param factory 连接工厂
* @return this
*/
public RabbitMQEngine init(final ConnectionFactory factory){
try {
this.connection = factory.newConnection();
} catch (final IOException | TimeoutException e) {
throw new MQException(e);
}
return this;
}
@Override
@ -79,4 +115,23 @@ public class RabbitMQEngine implements MQEngine, Closeable {
throw new MQException(e);
}
}
/**
* 创建连接工厂
*
* @param config 配置
* @return 连接工厂
*/
private static ConnectionFactory createFactory(final MQConfig config) {
final ConnectionFactory factory = new ConnectionFactory();
try {
factory.setUri(config.getBrokerUrl());
} catch (final Exception e) {
throw new MQException(e);
}
// TODO 配置其他参数
return factory;
}
}

View File

@ -24,6 +24,7 @@ import org.dromara.hutool.extra.mq.Message;
import org.dromara.hutool.extra.mq.Producer;
import java.io.IOException;
import java.util.Map;
/**
* RabbitMQ消息生产者
@ -34,6 +35,7 @@ import java.io.IOException;
public class RabbitMQProducer implements Producer {
private final Channel channel;
private String exchange = StrUtil.EMPTY;
/**
* 构造
@ -44,10 +46,41 @@ public class RabbitMQProducer implements Producer {
this.channel = channel;
}
/**
* 设置交换器默认为{@link StrUtil#EMPTY}
*
* @param exchange 交换器
* @return this
*/
public RabbitMQProducer setExchange(final String exchange) {
this.exchange = exchange;
return this;
}
/**
* 声明队列
*
* @param queue 队列名
* @param durable 是否持久化
* @param exclusive 是否排他
* @param autoDelete 是否自动删除
* @param arguments 其他参数
* @return this
*/
public RabbitMQProducer queueDeclare(final String queue, final boolean durable, final boolean exclusive, final boolean autoDelete,
final Map<String, Object> arguments) {
try {
this.channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
} catch (final IOException e) {
throw new MQException(e);
}
return this;
}
@Override
public void send(final Message message) {
try {
this.channel.basicPublish(StrUtil.EMPTY, message.topic(), null, message.content());
this.channel.basicPublish(exchange, message.topic(), null, message.content());
} catch (final IOException e) {
throw new MQException(e);
}

View File

@ -0,0 +1,19 @@
#
# Copyright (c) 2025 Hutool Team and hutool.cn
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.dromara.hutool.extra.mq.engine.kafka.KafkaEngine
org.dromara.hutool.extra.mq.engine.rabbitmq.RabbitMQEngine
org.dromara.hutool.extra.mq.engine.activemq.ActiveMQEngine