diff --git a/hutool-core/src/main/java/org/dromara/hutool/core/collection/ListUtil.java b/hutool-core/src/main/java/org/dromara/hutool/core/collection/ListUtil.java index 65cb8b591..e7ac2ed13 100644 --- a/hutool-core/src/main/java/org/dromara/hutool/core/collection/ListUtil.java +++ b/hutool-core/src/main/java/org/dromara/hutool/core/collection/ListUtil.java @@ -236,6 +236,18 @@ public class ListUtil { return new ArrayList<>(0); } + /** + * 获取一个只包含一个元素的List,不可变 + * + * @param 元素类型 + * @param element 元素 + * @return 只包含一个元素的List + * @since 6.0.0 + */ + public static List singleton(final T element){ + return Collections.singletonList(element); + } + /** * 新建一个CopyOnWriteArrayList * diff --git a/hutool-core/src/main/java/org/dromara/hutool/core/collection/set/SetUtil.java b/hutool-core/src/main/java/org/dromara/hutool/core/collection/set/SetUtil.java index bdca889ae..171c363a5 100644 --- a/hutool-core/src/main/java/org/dromara/hutool/core/collection/set/SetUtil.java +++ b/hutool-core/src/main/java/org/dromara/hutool/core/collection/set/SetUtil.java @@ -18,14 +18,7 @@ package org.dromara.hutool.core.collection.set; import org.dromara.hutool.core.array.ArrayUtil; -import java.util.Collection; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; /** * 集合中的{@link java.util.Set}相关方法封装 @@ -202,6 +195,18 @@ public class SetUtil { return new HashSet<>(0, 1); } + /** + * 获取一个只包含一个元素的Set,不可变 + * + * @param 元素类型 + * @param element 元素 + * @return 只包含一个元素的Set + * @since 6.0.0 + */ + public static Set singleton(final T element) { + return Collections.singleton(element); + } + /** * 获取一个初始大小为0的LinkedHashSet,这个空Set可变 * diff --git a/hutool-core/src/main/java/org/dromara/hutool/core/map/MapUtil.java b/hutool-core/src/main/java/org/dromara/hutool/core/map/MapUtil.java index f832c7624..607457d0e 100644 --- a/hutool-core/src/main/java/org/dromara/hutool/core/map/MapUtil.java +++ b/hutool-core/src/main/java/org/dromara/hutool/core/map/MapUtil.java @@ -1065,9 +1065,9 @@ public class MapUtil extends MapGetUtil { * 去除Map中值为指定值的键值对
* 注意:此方法在传入的Map上直接修改。 * - * @param key的类型 - * @param value的类型 - * @param map Map + * @param key的类型 + * @param value的类型 + * @param map Map * @param value 给定值 * @return map * @since 6.0.0 @@ -1080,9 +1080,9 @@ public class MapUtil extends MapGetUtil { * 去除Map中值为{@code null}的键值对
* 注意:此方法在传入的Map上直接修改。 * - * @param key的类型 - * @param value的类型 - * @param map Map + * @param key的类型 + * @param value的类型 + * @param map Map * @param predicate 移除条件,当{@link Predicate#test(Object)}为{@code true}时移除 * @return map * @since 6.0.0 @@ -1119,6 +1119,19 @@ public class MapUtil extends MapGetUtil { return new HashMap<>(0, 1); } + /** + * 返回一个只包含一个键值对的Map,不可变 + * + * @param key 键 + * @param value 值 + * @param 键类型 + * @param 值类型 + * @return Map + */ + public static Map singleton(final K key, final V value) { + return Collections.singletonMap(key, value); + } + /** * 根据传入的Map类型不同,返回对应类型的空Map,支持类型包括: * diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Consumer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Consumer.java index 611a806fb..f810c4b9a 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Consumer.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Consumer.java @@ -16,6 +16,34 @@ package org.dromara.hutool.extra.mq; -public interface Consumer { +import org.dromara.hutool.core.thread.ThreadUtil; + +import java.io.Closeable; + +/** + * 消息消费者接口 + * + * @author Looly + * @since 6.0.0 + */ +public interface Consumer extends Closeable { + /** + * 单次订阅消息 + * + * @param messageHandler 消息处理器 + */ void subscribe(MessageHandler messageHandler); + + /** + * 持续订阅消息 + * + * @param messageHandler 消息处理器 + */ + default void listen(final MessageHandler messageHandler) { + ThreadUtil.execAsync(() -> { + while (true) { + this.subscribe(messageHandler); + } + }); + } } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/MQException.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/MQException.java new file mode 100644 index 000000000..8bd4b7961 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/MQException.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2013-2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq; + +import org.dromara.hutool.core.exception.HutoolException; + +/** + * 消息队列异常 + * + * @author Looly + */ +public class MQException extends HutoolException { + private static final long serialVersionUID = 1L; + + /** + * 构造 + * + * @param e 异常 + */ + public MQException(final Throwable e) { + super(e); + } + + /** + * 构造 + * + * @param message 消息 + */ + public MQException(final String message) { + super(message); + } + + /** + * 构造 + * + * @param messageTemplate 消息模板 + * @param params 参数 + */ + public MQException(final String messageTemplate, final Object... params) { + super(messageTemplate, params); + } + + /** + * 构造 + * + * @param message 消息 + * @param cause 被包装的子异常 + */ + public MQException(final String message, final Throwable cause) { + super(message, cause); + } + + /** + * 构造 + * + * @param message 消息 + * @param cause 被包装的子异常 + * @param enableSuppression 是否启用抑制 + * @param writableStackTrace 堆栈跟踪是否应该是可写的 + */ + public MQException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + /** + * 构造 + * + * @param cause 被包装的子异常 + * @param messageTemplate 消息模板 + * @param params 参数 + */ + public MQException(final Throwable cause, final String messageTemplate, final Object... params) { + super(cause, messageTemplate, params); + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Message.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Message.java index 6b2e3b5e4..4393a8185 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Message.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Message.java @@ -16,5 +16,25 @@ package org.dromara.hutool.extra.mq; +/** + * 消息接口 + * + * @author Looly + * @since 6.0.0 + */ public interface Message { + + /** + * 获取消息主题 + * + * @return 主题 + */ + String topic(); + + /** + * 获取消息内容 + * + * @return 内容 + */ + byte[] content(); } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/MessageHandler.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/MessageHandler.java index 7be959402..4cc76279c 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/MessageHandler.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/MessageHandler.java @@ -16,7 +16,19 @@ package org.dromara.hutool.extra.mq; +/** + * 消息处理器 + * + * @author Looly + * @since 6.0.0 + */ @FunctionalInterface public interface MessageHandler { + + /** + * 处理消息 + * + * @param message 消息 + */ void handle(Message message); } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Producer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Producer.java index 4fa21b586..a15851727 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Producer.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/Producer.java @@ -16,6 +16,20 @@ package org.dromara.hutool.extra.mq; -public interface Producer { +import java.io.Closeable; + +/** + * 消息生产者 + * + * @author Looly + * @since 6.0.0 + */ +public interface Producer extends Closeable { + + /** + * 发送消息 + * + * @param message 消息 + */ void send(Message message); } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngine.java index 9db95ff03..98e43d40a 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngine.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/MQEngine.java @@ -16,5 +16,28 @@ package org.dromara.hutool.extra.mq.engine; +import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.Producer; + +/** + * 消息队列引擎接口 + * + * @author Looly + * @since 6.0.0 + */ public interface MQEngine { + + /** + * 获取消息生产者 + * + * @return 消息生产者 + */ + Producer getProducer(); + + /** + * 获取消息消费者 + * + * @return 消息消费者 + */ + Consumer getConsumer(); } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQConsumer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQConsumer.java new file mode 100644 index 000000000..58a028ff5 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQConsumer.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.activemq; + +import jakarta.jms.*; +import org.dromara.hutool.core.io.IoUtil; +import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.MQException; +import org.dromara.hutool.extra.mq.MessageHandler; + +import java.io.IOException; + +/** + * ActiveMQ消息消费者 + * + * @author Looly + * @since 6.0.0 + */ +public class ActiveMQConsumer implements Consumer { + + private final Session session; + private MessageConsumer consumer; + + /** + * 构造 + * + * @param session Session + */ + public ActiveMQConsumer(final Session session) { + this.session = session; + } + + /** + * 设置主题 + * + * @param topic 主题 + * @return this + */ + public ActiveMQConsumer setTopic(final String topic) { + try { + this.consumer = this.session.createConsumer(this.session.createTopic(topic)); + } catch (final JMSException e) { + throw new MQException(e); + } + return this; + } + + @Override + public void subscribe(final MessageHandler messageHandler) { + final Message message; + try{ + message = consumer.receive(3000); + } catch (final JMSException e){ + throw new MQException(e); + } + + messageHandler.handle(new JMSMessage(message)); + } + + @Override + public void listen(final MessageHandler messageHandler) { + try { + consumer.setMessageListener(message -> messageHandler.handle(new JMSMessage(message))); + } catch (final JMSException e) { + throw new MQException(e); + } + } + + @Override + public void close() throws IOException { + IoUtil.closeQuietly(this.consumer); + IoUtil.closeQuietly(this.session); + } + + /** + * JMS消息包装 + */ + private static class JMSMessage implements org.dromara.hutool.extra.mq.Message{ + + private final Message message; + + private JMSMessage(final Message message) { + this.message = message; + } + + @Override + public String topic() { + try { + return message.getJMSDestination().toString(); + } catch (final JMSException e) { + throw new MQException(e); + } + } + + @Override + public byte[] content() { + try { + return message.getBody(byte[].class); + } catch (final JMSException e) { + throw new MQException(e); + } + } + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQEngine.java new file mode 100644 index 000000000..e3b8cf77c --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQEngine.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.activemq; + +import jakarta.jms.Connection; +import jakarta.jms.ConnectionFactory; +import jakarta.jms.JMSException; +import jakarta.jms.Session; +import org.dromara.hutool.core.io.IoUtil; +import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.MQException; +import org.dromara.hutool.extra.mq.Producer; +import org.dromara.hutool.extra.mq.engine.MQEngine; + +import java.io.Closeable; +import java.io.IOException; + +/** + * ActiveMQ引擎 + * + * @author Looly + * @since 6.0.0 + */ +public class ActiveMQEngine implements MQEngine, Closeable { + + private final Connection connection; + + /** + * 构造 + * + * @param factory {@link ConnectionFactory} + */ + public ActiveMQEngine(final ConnectionFactory factory) { + try { + this.connection = factory.createConnection(); + this.connection.start(); + } catch (final JMSException e) { + throw new MQException(e); + } + } + + @Override + public Producer getProducer() { + return new ActiveMQProducer(createSession()); + } + + @Override + public Consumer getConsumer() { + return new ActiveMQConsumer(createSession()); + } + + private Session createSession() { + try { + return this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (final JMSException e) { + throw new MQException(e); + } + } + + @Override + public void close() throws IOException { + IoUtil.closeQuietly(this.connection); + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQProducer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQProducer.java new file mode 100644 index 000000000..3802f9944 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/activemq/ActiveMQProducer.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.activemq; + +import jakarta.jms.BytesMessage; +import jakarta.jms.JMSException; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import org.dromara.hutool.core.io.IoUtil; +import org.dromara.hutool.extra.mq.MQException; +import org.dromara.hutool.extra.mq.Message; +import org.dromara.hutool.extra.mq.Producer; + +import java.io.IOException; + +/** + * ActiveMQ消息生产者 + * + * @author Looly + * @since 6.0.0 + */ +public class ActiveMQProducer implements Producer { + + private final Session session; + private MessageProducer producer; + + /** + * 构造 + * + * @param session Session + */ + public ActiveMQProducer(final Session session) { + this.session = session; + } + + /** + * 设置主题 + * + * @param topic 主题 + * @return this + */ + public ActiveMQProducer setTopic(final String topic) { + try { + this.producer = this.session.createProducer(this.session.createTopic(topic)); + } catch (final JMSException e) { + throw new MQException(e); + } + return this; + } + + @Override + public void send(final Message message) { + try { + final BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(message.content()); + this.producer.send(bytesMessage); + } catch (final JMSException e) { + throw new MQException(e); + } + } + + @Override + public void close() throws IOException { + IoUtil.closeQuietly(this.producer); + IoUtil.closeQuietly(this.session); + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.java new file mode 100644 index 000000000..45408fee2 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.dromara.hutool.core.collection.ListUtil; +import org.dromara.hutool.core.io.IoUtil; +import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.Message; +import org.dromara.hutool.extra.mq.MessageHandler; + +import java.io.IOException; +import java.time.Duration; +import java.util.Properties; +import java.util.regex.Pattern; + +/** + * Kafka消费端 + * + * @author Looly + * @since 6.0.0 + */ +public class KafkaConsumer implements Consumer { + + private final org.apache.kafka.clients.consumer.Consumer consumer; + + /** + * 构造 + * + * @param properties 配置 + */ + public KafkaConsumer(final Properties properties) { + this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(properties); + } + + /** + * 构造 + * + * @param consumer {@link org.apache.kafka.clients.consumer.Consumer} + */ + public KafkaConsumer(final org.apache.kafka.clients.consumer.Consumer consumer) { + this.consumer = consumer; + } + + /** + * 设置消费的topic + * + * @param topics topic + * @return this + */ + public KafkaConsumer setTopics(final String... topics) { + this.consumer.subscribe(ListUtil.of(topics)); + return this; + } + + /** + * 设置消费的topic正则 + * + * @param topicPattern topic{@link Pattern} + * @return this + */ + public KafkaConsumer setTopicPattern(final Pattern topicPattern){ + this.consumer.subscribe(topicPattern); + return this; + } + + @Override + public void subscribe(final MessageHandler messageHandler) { + for (final ConsumerRecord record : this.consumer.poll(Duration.ofMillis(3000))) { + messageHandler.handle(new ConsumerRecordMessage(record)); + } + } + + @Override + public void close() throws IOException { + IoUtil.nullSafeClose(this.consumer); + } + + private static class ConsumerRecordMessage implements Message { + + private final ConsumerRecord record; + + private ConsumerRecordMessage(final ConsumerRecord record) { + this.record = record; + } + + @Override + public String topic() { + return record.topic(); + } + + @Override + public byte[] content() { + return record.value(); + } + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaEngine.java new file mode 100644 index 000000000..3723cb52d --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaEngine.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.kafka; + +import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.Producer; +import org.dromara.hutool.extra.mq.engine.MQEngine; + +import java.util.Properties; + +/** + * Kafka引擎 + * + * @author Looly + * @since 6.0.0 + */ +public class KafkaEngine implements MQEngine { + + private final Properties properties; + + /** + * 构造 + * + * @param properties 配置 + */ + public KafkaEngine(final Properties properties) { + this.properties = properties; + } + + /** + * 增加配置项 + * + * @param key 配置项 + * @param value 值 + * @return this + */ + public KafkaEngine addProperty(final String key, final String value) { + this.properties.put(key, value); + return this; + } + + @Override + public Producer getProducer() { + return new KafkaProducer(this.properties); + } + + @Override + public Consumer getConsumer() { + return new KafkaConsumer(this.properties); + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaProducer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaProducer.java new file mode 100644 index 000000000..2673214de --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaProducer.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.kafka; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.dromara.hutool.core.io.IoUtil; +import org.dromara.hutool.extra.mq.Message; +import org.dromara.hutool.extra.mq.Producer; + +import java.io.IOException; +import java.util.Properties; + +/** + * Kafka 生产者 + * + * @author Looly + * @since 6.0.0 + */ +public class KafkaProducer implements Producer { + + private final org.apache.kafka.clients.producer.Producer producer; + + /** + * 构造 + * + * @param properties 配置 + */ + public KafkaProducer(final Properties properties) { + this(new org.apache.kafka.clients.producer.KafkaProducer<>(properties)); + } + + /** + * 构造 + * + * @param producer Kafka Producer + */ + public KafkaProducer(final org.apache.kafka.clients.producer.Producer producer) { + this.producer = producer; + } + + @Override + public void send(final Message message) { + this.producer.send(new ProducerRecord<>(message.topic(), message.content())); + } + + @Override + public void close() throws IOException { + IoUtil.nullSafeClose(this.producer); + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQConsumer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQConsumer.java new file mode 100644 index 000000000..fd3fbbb62 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQConsumer.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; +import org.dromara.hutool.core.io.IoUtil; +import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.MQException; +import org.dromara.hutool.extra.mq.Message; +import org.dromara.hutool.extra.mq.MessageHandler; + +import java.io.IOException; + +/** + * RabbitMQ消费者 + * + * @author Looly + * @since 6.0.0 + */ +public class RabbitMQConsumer implements Consumer { + + private final Channel channel; + private String topic; + + /** + * 构造 + * + * @param channel Channel + */ + public RabbitMQConsumer(final Channel channel) { + this.channel = channel; + } + + /** + * 设置队列(主题) + * + * @param topic 队列名 + * @return this + */ + public RabbitMQConsumer setTopic(final String topic) { + this.topic = topic; + return this; + } + + @Override + public void subscribe(final MessageHandler messageHandler) { + try { + this.channel.queueDeclare(this.topic, false, false, false, null); + } catch (final IOException e) { + throw new MQException(e); + } + + final DeliverCallback deliverCallback = (consumerTag, delivery) -> { + messageHandler.handle(new Message() { + @Override + public String topic() { + return consumerTag; + } + + @Override + public byte[] content() { + return delivery.getBody(); + } + }); + }; + + try { + this.channel.basicConsume(this.topic, true, deliverCallback, consumerTag -> { }); + } catch (final IOException e) { + throw new MQException(e); + } + } + + @Override + public void close() { + IoUtil.closeQuietly(this.channel); + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQEngine.java new file mode 100644 index 000000000..4e2c6ea72 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQEngine.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import org.dromara.hutool.core.io.IoUtil; +import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.MQException; +import org.dromara.hutool.extra.mq.Producer; +import org.dromara.hutool.extra.mq.engine.MQEngine; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * RabbitMQ引擎 + * + * @author Looly + * @since 6.0.0 + */ +public class RabbitMQEngine implements MQEngine, Closeable { + + private final Connection connection; + + /** + * 构造 + * + * @param factory 连接工厂 + */ + public RabbitMQEngine(final ConnectionFactory factory) { + try { + this.connection = factory.newConnection(); + } catch (final IOException | TimeoutException e) { + throw new MQException(e); + } + } + + @Override + public Producer getProducer() { + return new RabbitMQProducer(createChannel()); + } + + @Override + public Consumer getConsumer() { + return new RabbitMQConsumer(createChannel()); + } + + @Override + public void close() throws IOException { + IoUtil.nullSafeClose(this.connection); + } + + /** + * 创建Channel + * + * @return Channel + */ + private Channel createChannel() { + try { + return this.connection.createChannel(); + } catch (final IOException e) { + throw new MQException(e); + } + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQProducer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQProducer.java new file mode 100644 index 000000000..15e947dee --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rabbitmq/RabbitMQProducer.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.rabbitmq; + +import com.rabbitmq.client.Channel; +import org.dromara.hutool.core.io.IoUtil; +import org.dromara.hutool.core.text.StrUtil; +import org.dromara.hutool.extra.mq.MQException; +import org.dromara.hutool.extra.mq.Message; +import org.dromara.hutool.extra.mq.Producer; + +import java.io.IOException; + +/** + * RabbitMQ消息生产者 + * + * @author Looly + * @since 6.0.0 + */ +public class RabbitMQProducer implements Producer { + + private final Channel channel; + + /** + * 构造 + * + * @param channel Channel + */ + public RabbitMQProducer(final Channel channel) { + this.channel = channel; + } + + @Override + public void send(final Message message) { + try { + this.channel.basicPublish(StrUtil.EMPTY, message.topic(), null, message.content()); + } catch (final IOException e) { + throw new MQException(e); + } + } + + @Override + public void close() { + IoUtil.closeQuietly(this.channel); + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/package-info.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/package-info.java index 82141fa67..d96ca8bec 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/package-info.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/package-info.java @@ -19,9 +19,9 @@ * 通过定义统一接口,统一消息中间件的调用,实现消息队列的解耦。 * 组件包括: *
    - *
  • Producer: 消息生产者,业务的发起方,负责生产消息
  • - *
  • Consumer: 消息消费者,业务的处理方
  • - *
  • Message: 消息体,根据不同通信协议定义的固定格式进行编码的数据包
  • + *
  • {@link org.dromara.hutool.extra.mq.Producer}: 消息生产者,业务的发起方,负责生产消息
  • + *
  • {@link org.dromara.hutool.extra.mq.Consumer}: 消息消费者,业务的处理方
  • + *
  • {@link org.dromara.hutool.extra.mq.Message}: 消息体,根据不同通信协议定义的固定格式进行编码的数据包
  • *
* * @author Looly diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/pinyin/PinyinException.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/pinyin/PinyinException.java index 8de4f2336..0a2f44066 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/pinyin/PinyinException.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/pinyin/PinyinException.java @@ -19,7 +19,7 @@ package org.dromara.hutool.extra.pinyin; import org.dromara.hutool.core.exception.HutoolException; /** - * 模板异常 + * 拼音异常 * * @author Looly */