mirror of
https://gitee.com/chinabugotech/hutool.git
synced 2025-04-19 03:01:48 +08:00
add mq implements
This commit is contained in:
parent
91af319d9c
commit
1b468245c5
@ -236,6 +236,18 @@ public class ListUtil {
|
||||
return new ArrayList<>(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取一个只包含一个元素的List,不可变
|
||||
*
|
||||
* @param <T> 元素类型
|
||||
* @param element 元素
|
||||
* @return 只包含一个元素的List
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public static <T> List<T> singleton(final T element){
|
||||
return Collections.singletonList(element);
|
||||
}
|
||||
|
||||
/**
|
||||
* 新建一个CopyOnWriteArrayList
|
||||
*
|
||||
|
@ -18,14 +18,7 @@ package org.dromara.hutool.core.collection.set;
|
||||
|
||||
import org.dromara.hutool.core.array.ArrayUtil;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 集合中的{@link java.util.Set}相关方法封装
|
||||
@ -202,6 +195,18 @@ public class SetUtil {
|
||||
return new HashSet<>(0, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取一个只包含一个元素的Set,不可变
|
||||
*
|
||||
* @param <T> 元素类型
|
||||
* @param element 元素
|
||||
* @return 只包含一个元素的Set
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public static <T> Set<T> singleton(final T element) {
|
||||
return Collections.singleton(element);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取一个初始大小为0的LinkedHashSet,这个空Set可变
|
||||
*
|
||||
|
@ -1065,9 +1065,9 @@ public class MapUtil extends MapGetUtil {
|
||||
* 去除Map中值为指定值的键值对<br>
|
||||
* 注意:此方法在传入的Map上直接修改。
|
||||
*
|
||||
* @param <K> key的类型
|
||||
* @param <V> value的类型
|
||||
* @param map Map
|
||||
* @param <K> key的类型
|
||||
* @param <V> value的类型
|
||||
* @param map Map
|
||||
* @param value 给定值
|
||||
* @return map
|
||||
* @since 6.0.0
|
||||
@ -1080,9 +1080,9 @@ public class MapUtil extends MapGetUtil {
|
||||
* 去除Map中值为{@code null}的键值对<br>
|
||||
* 注意:此方法在传入的Map上直接修改。
|
||||
*
|
||||
* @param <K> key的类型
|
||||
* @param <V> value的类型
|
||||
* @param map Map
|
||||
* @param <K> key的类型
|
||||
* @param <V> value的类型
|
||||
* @param map Map
|
||||
* @param predicate 移除条件,当{@link Predicate#test(Object)}为{@code true}时移除
|
||||
* @return map
|
||||
* @since 6.0.0
|
||||
@ -1119,6 +1119,19 @@ public class MapUtil extends MapGetUtil {
|
||||
return new HashMap<>(0, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回一个只包含一个键值对的Map,不可变
|
||||
*
|
||||
* @param key 键
|
||||
* @param value 值
|
||||
* @param <K> 键类型
|
||||
* @param <V> 值类型
|
||||
* @return Map
|
||||
*/
|
||||
public static <K, V> Map<K, V> singleton(final K key, final V value) {
|
||||
return Collections.singletonMap(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据传入的Map类型不同,返回对应类型的空Map,支持类型包括:
|
||||
*
|
||||
|
@ -16,6 +16,34 @@
|
||||
|
||||
package org.dromara.hutool.extra.mq;
|
||||
|
||||
public interface Consumer {
|
||||
import org.dromara.hutool.core.thread.ThreadUtil;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
||||
/**
|
||||
* 消息消费者接口
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public interface Consumer extends Closeable {
|
||||
/**
|
||||
* 单次订阅消息
|
||||
*
|
||||
* @param messageHandler 消息处理器
|
||||
*/
|
||||
void subscribe(MessageHandler messageHandler);
|
||||
|
||||
/**
|
||||
* 持续订阅消息
|
||||
*
|
||||
* @param messageHandler 消息处理器
|
||||
*/
|
||||
default void listen(final MessageHandler messageHandler) {
|
||||
ThreadUtil.execAsync(() -> {
|
||||
while (true) {
|
||||
this.subscribe(messageHandler);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Copyright (c) 2013-2025 Hutool Team and hutool.cn
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.dromara.hutool.extra.mq;
|
||||
|
||||
import org.dromara.hutool.core.exception.HutoolException;
|
||||
|
||||
/**
|
||||
* 消息队列异常
|
||||
*
|
||||
* @author Looly
|
||||
*/
|
||||
public class MQException extends HutoolException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param e 异常
|
||||
*/
|
||||
public MQException(final Throwable e) {
|
||||
super(e);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public MQException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param messageTemplate 消息模板
|
||||
* @param params 参数
|
||||
*/
|
||||
public MQException(final String messageTemplate, final Object... params) {
|
||||
super(messageTemplate, params);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param message 消息
|
||||
* @param cause 被包装的子异常
|
||||
*/
|
||||
public MQException(final String message, final Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param message 消息
|
||||
* @param cause 被包装的子异常
|
||||
* @param enableSuppression 是否启用抑制
|
||||
* @param writableStackTrace 堆栈跟踪是否应该是可写的
|
||||
*/
|
||||
public MQException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param cause 被包装的子异常
|
||||
* @param messageTemplate 消息模板
|
||||
* @param params 参数
|
||||
*/
|
||||
public MQException(final Throwable cause, final String messageTemplate, final Object... params) {
|
||||
super(cause, messageTemplate, params);
|
||||
}
|
||||
}
|
@ -16,5 +16,25 @@
|
||||
|
||||
package org.dromara.hutool.extra.mq;
|
||||
|
||||
/**
|
||||
* 消息接口
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public interface Message {
|
||||
|
||||
/**
|
||||
* 获取消息主题
|
||||
*
|
||||
* @return 主题
|
||||
*/
|
||||
String topic();
|
||||
|
||||
/**
|
||||
* 获取消息内容
|
||||
*
|
||||
* @return 内容
|
||||
*/
|
||||
byte[] content();
|
||||
}
|
||||
|
@ -16,7 +16,19 @@
|
||||
|
||||
package org.dromara.hutool.extra.mq;
|
||||
|
||||
/**
|
||||
* 消息处理器
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface MessageHandler {
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
void handle(Message message);
|
||||
}
|
||||
|
@ -16,6 +16,20 @@
|
||||
|
||||
package org.dromara.hutool.extra.mq;
|
||||
|
||||
public interface Producer {
|
||||
import java.io.Closeable;
|
||||
|
||||
/**
|
||||
* 消息生产者
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public interface Producer extends Closeable {
|
||||
|
||||
/**
|
||||
* 发送消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
void send(Message message);
|
||||
}
|
||||
|
@ -16,5 +16,28 @@
|
||||
|
||||
package org.dromara.hutool.extra.mq.engine;
|
||||
|
||||
import org.dromara.hutool.extra.mq.Consumer;
|
||||
import org.dromara.hutool.extra.mq.Producer;
|
||||
|
||||
/**
|
||||
* 消息队列引擎接口
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public interface MQEngine {
|
||||
|
||||
/**
|
||||
* 获取消息生产者
|
||||
*
|
||||
* @return 消息生产者
|
||||
*/
|
||||
Producer getProducer();
|
||||
|
||||
/**
|
||||
* 获取消息消费者
|
||||
*
|
||||
* @return 消息消费者
|
||||
*/
|
||||
Consumer getConsumer();
|
||||
}
|
||||
|
@ -0,0 +1,118 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Hutool Team and hutool.cn
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.dromara.hutool.extra.mq.engine.activemq;
|
||||
|
||||
import jakarta.jms.*;
|
||||
import org.dromara.hutool.core.io.IoUtil;
|
||||
import org.dromara.hutool.extra.mq.Consumer;
|
||||
import org.dromara.hutool.extra.mq.MQException;
|
||||
import org.dromara.hutool.extra.mq.MessageHandler;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* ActiveMQ消息消费者
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public class ActiveMQConsumer implements Consumer {
|
||||
|
||||
private final Session session;
|
||||
private MessageConsumer consumer;
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param session Session
|
||||
*/
|
||||
public ActiveMQConsumer(final Session session) {
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置主题
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return this
|
||||
*/
|
||||
public ActiveMQConsumer setTopic(final String topic) {
|
||||
try {
|
||||
this.consumer = this.session.createConsumer(this.session.createTopic(topic));
|
||||
} catch (final JMSException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(final MessageHandler messageHandler) {
|
||||
final Message message;
|
||||
try{
|
||||
message = consumer.receive(3000);
|
||||
} catch (final JMSException e){
|
||||
throw new MQException(e);
|
||||
}
|
||||
|
||||
messageHandler.handle(new JMSMessage(message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void listen(final MessageHandler messageHandler) {
|
||||
try {
|
||||
consumer.setMessageListener(message -> messageHandler.handle(new JMSMessage(message)));
|
||||
} catch (final JMSException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IoUtil.closeQuietly(this.consumer);
|
||||
IoUtil.closeQuietly(this.session);
|
||||
}
|
||||
|
||||
/**
|
||||
* JMS消息包装
|
||||
*/
|
||||
private static class JMSMessage implements org.dromara.hutool.extra.mq.Message{
|
||||
|
||||
private final Message message;
|
||||
|
||||
private JMSMessage(final Message message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String topic() {
|
||||
try {
|
||||
return message.getJMSDestination().toString();
|
||||
} catch (final JMSException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] content() {
|
||||
try {
|
||||
return message.getBody(byte[].class);
|
||||
} catch (final JMSException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Hutool Team and hutool.cn
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.dromara.hutool.extra.mq.engine.activemq;
|
||||
|
||||
import jakarta.jms.Connection;
|
||||
import jakarta.jms.ConnectionFactory;
|
||||
import jakarta.jms.JMSException;
|
||||
import jakarta.jms.Session;
|
||||
import org.dromara.hutool.core.io.IoUtil;
|
||||
import org.dromara.hutool.extra.mq.Consumer;
|
||||
import org.dromara.hutool.extra.mq.MQException;
|
||||
import org.dromara.hutool.extra.mq.Producer;
|
||||
import org.dromara.hutool.extra.mq.engine.MQEngine;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* ActiveMQ引擎
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public class ActiveMQEngine implements MQEngine, Closeable {
|
||||
|
||||
private final Connection connection;
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param factory {@link ConnectionFactory}
|
||||
*/
|
||||
public ActiveMQEngine(final ConnectionFactory factory) {
|
||||
try {
|
||||
this.connection = factory.createConnection();
|
||||
this.connection.start();
|
||||
} catch (final JMSException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Producer getProducer() {
|
||||
return new ActiveMQProducer(createSession());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Consumer getConsumer() {
|
||||
return new ActiveMQConsumer(createSession());
|
||||
}
|
||||
|
||||
private Session createSession() {
|
||||
try {
|
||||
return this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
} catch (final JMSException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IoUtil.closeQuietly(this.connection);
|
||||
}
|
||||
}
|
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Hutool Team and hutool.cn
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.dromara.hutool.extra.mq.engine.activemq;
|
||||
|
||||
import jakarta.jms.BytesMessage;
|
||||
import jakarta.jms.JMSException;
|
||||
import jakarta.jms.MessageProducer;
|
||||
import jakarta.jms.Session;
|
||||
import org.dromara.hutool.core.io.IoUtil;
|
||||
import org.dromara.hutool.extra.mq.MQException;
|
||||
import org.dromara.hutool.extra.mq.Message;
|
||||
import org.dromara.hutool.extra.mq.Producer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* ActiveMQ消息生产者
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public class ActiveMQProducer implements Producer {
|
||||
|
||||
private final Session session;
|
||||
private MessageProducer producer;
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param session Session
|
||||
*/
|
||||
public ActiveMQProducer(final Session session) {
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置主题
|
||||
*
|
||||
* @param topic 主题
|
||||
* @return this
|
||||
*/
|
||||
public ActiveMQProducer setTopic(final String topic) {
|
||||
try {
|
||||
this.producer = this.session.createProducer(this.session.createTopic(topic));
|
||||
} catch (final JMSException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(final Message message) {
|
||||
try {
|
||||
final BytesMessage bytesMessage = session.createBytesMessage();
|
||||
bytesMessage.writeBytes(message.content());
|
||||
this.producer.send(bytesMessage);
|
||||
} catch (final JMSException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IoUtil.closeQuietly(this.producer);
|
||||
IoUtil.closeQuietly(this.session);
|
||||
}
|
||||
}
|
@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Hutool Team and hutool.cn
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.dromara.hutool.extra.mq.engine.kafka;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.dromara.hutool.core.collection.ListUtil;
|
||||
import org.dromara.hutool.core.io.IoUtil;
|
||||
import org.dromara.hutool.extra.mq.Consumer;
|
||||
import org.dromara.hutool.extra.mq.Message;
|
||||
import org.dromara.hutool.extra.mq.MessageHandler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Properties;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Kafka消费端
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public class KafkaConsumer implements Consumer {
|
||||
|
||||
private final org.apache.kafka.clients.consumer.Consumer<String, byte[]> consumer;
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param properties 配置
|
||||
*/
|
||||
public KafkaConsumer(final Properties properties) {
|
||||
this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(properties);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param consumer {@link org.apache.kafka.clients.consumer.Consumer}
|
||||
*/
|
||||
public KafkaConsumer(final org.apache.kafka.clients.consumer.Consumer<String, byte[]> consumer) {
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置消费的topic
|
||||
*
|
||||
* @param topics topic
|
||||
* @return this
|
||||
*/
|
||||
public KafkaConsumer setTopics(final String... topics) {
|
||||
this.consumer.subscribe(ListUtil.of(topics));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置消费的topic正则
|
||||
*
|
||||
* @param topicPattern topic{@link Pattern}
|
||||
* @return this
|
||||
*/
|
||||
public KafkaConsumer setTopicPattern(final Pattern topicPattern){
|
||||
this.consumer.subscribe(topicPattern);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(final MessageHandler messageHandler) {
|
||||
for (final ConsumerRecord<String, byte[]> record : this.consumer.poll(Duration.ofMillis(3000))) {
|
||||
messageHandler.handle(new ConsumerRecordMessage(record));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IoUtil.nullSafeClose(this.consumer);
|
||||
}
|
||||
|
||||
private static class ConsumerRecordMessage implements Message {
|
||||
|
||||
private final ConsumerRecord<String, byte[]> record;
|
||||
|
||||
private ConsumerRecordMessage(final ConsumerRecord<String, byte[]> record) {
|
||||
this.record = record;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String topic() {
|
||||
return record.topic();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] content() {
|
||||
return record.value();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Hutool Team and hutool.cn
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.dromara.hutool.extra.mq.engine.kafka;
|
||||
|
||||
import org.dromara.hutool.extra.mq.Consumer;
|
||||
import org.dromara.hutool.extra.mq.Producer;
|
||||
import org.dromara.hutool.extra.mq.engine.MQEngine;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Kafka引擎
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public class KafkaEngine implements MQEngine {
|
||||
|
||||
private final Properties properties;
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param properties 配置
|
||||
*/
|
||||
public KafkaEngine(final Properties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
/**
|
||||
* 增加配置项
|
||||
*
|
||||
* @param key 配置项
|
||||
* @param value 值
|
||||
* @return this
|
||||
*/
|
||||
public KafkaEngine addProperty(final String key, final String value) {
|
||||
this.properties.put(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Producer getProducer() {
|
||||
return new KafkaProducer(this.properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Consumer getConsumer() {
|
||||
return new KafkaConsumer(this.properties);
|
||||
}
|
||||
}
|
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Hutool Team and hutool.cn
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.dromara.hutool.extra.mq.engine.kafka;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.dromara.hutool.core.io.IoUtil;
|
||||
import org.dromara.hutool.extra.mq.Message;
|
||||
import org.dromara.hutool.extra.mq.Producer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Kafka 生产者
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public class KafkaProducer implements Producer {
|
||||
|
||||
private final org.apache.kafka.clients.producer.Producer<String, byte[]> producer;
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param properties 配置
|
||||
*/
|
||||
public KafkaProducer(final Properties properties) {
|
||||
this(new org.apache.kafka.clients.producer.KafkaProducer<>(properties));
|
||||
}
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param producer Kafka Producer
|
||||
*/
|
||||
public KafkaProducer(final org.apache.kafka.clients.producer.Producer<String, byte[]> producer) {
|
||||
this.producer = producer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(final Message message) {
|
||||
this.producer.send(new ProducerRecord<>(message.topic(), message.content()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IoUtil.nullSafeClose(this.producer);
|
||||
}
|
||||
}
|
@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Hutool Team and hutool.cn
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.dromara.hutool.extra.mq.engine.rabbitmq;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
import org.dromara.hutool.core.io.IoUtil;
|
||||
import org.dromara.hutool.extra.mq.Consumer;
|
||||
import org.dromara.hutool.extra.mq.MQException;
|
||||
import org.dromara.hutool.extra.mq.Message;
|
||||
import org.dromara.hutool.extra.mq.MessageHandler;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* RabbitMQ消费者
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public class RabbitMQConsumer implements Consumer {
|
||||
|
||||
private final Channel channel;
|
||||
private String topic;
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param channel Channel
|
||||
*/
|
||||
public RabbitMQConsumer(final Channel channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置队列(主题)
|
||||
*
|
||||
* @param topic 队列名
|
||||
* @return this
|
||||
*/
|
||||
public RabbitMQConsumer setTopic(final String topic) {
|
||||
this.topic = topic;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(final MessageHandler messageHandler) {
|
||||
try {
|
||||
this.channel.queueDeclare(this.topic, false, false, false, null);
|
||||
} catch (final IOException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
|
||||
final DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
||||
messageHandler.handle(new Message() {
|
||||
@Override
|
||||
public String topic() {
|
||||
return consumerTag;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] content() {
|
||||
return delivery.getBody();
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
try {
|
||||
this.channel.basicConsume(this.topic, true, deliverCallback, consumerTag -> { });
|
||||
} catch (final IOException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IoUtil.closeQuietly(this.channel);
|
||||
}
|
||||
}
|
@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Hutool Team and hutool.cn
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.dromara.hutool.extra.mq.engine.rabbitmq;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import org.dromara.hutool.core.io.IoUtil;
|
||||
import org.dromara.hutool.extra.mq.Consumer;
|
||||
import org.dromara.hutool.extra.mq.MQException;
|
||||
import org.dromara.hutool.extra.mq.Producer;
|
||||
import org.dromara.hutool.extra.mq.engine.MQEngine;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* RabbitMQ引擎
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public class RabbitMQEngine implements MQEngine, Closeable {
|
||||
|
||||
private final Connection connection;
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param factory 连接工厂
|
||||
*/
|
||||
public RabbitMQEngine(final ConnectionFactory factory) {
|
||||
try {
|
||||
this.connection = factory.newConnection();
|
||||
} catch (final IOException | TimeoutException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Producer getProducer() {
|
||||
return new RabbitMQProducer(createChannel());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Consumer getConsumer() {
|
||||
return new RabbitMQConsumer(createChannel());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IoUtil.nullSafeClose(this.connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建Channel
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
private Channel createChannel() {
|
||||
try {
|
||||
return this.connection.createChannel();
|
||||
} catch (final IOException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Copyright (c) 2025 Hutool Team and hutool.cn
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.dromara.hutool.extra.mq.engine.rabbitmq;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import org.dromara.hutool.core.io.IoUtil;
|
||||
import org.dromara.hutool.core.text.StrUtil;
|
||||
import org.dromara.hutool.extra.mq.MQException;
|
||||
import org.dromara.hutool.extra.mq.Message;
|
||||
import org.dromara.hutool.extra.mq.Producer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* RabbitMQ消息生产者
|
||||
*
|
||||
* @author Looly
|
||||
* @since 6.0.0
|
||||
*/
|
||||
public class RabbitMQProducer implements Producer {
|
||||
|
||||
private final Channel channel;
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @param channel Channel
|
||||
*/
|
||||
public RabbitMQProducer(final Channel channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(final Message message) {
|
||||
try {
|
||||
this.channel.basicPublish(StrUtil.EMPTY, message.topic(), null, message.content());
|
||||
} catch (final IOException e) {
|
||||
throw new MQException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IoUtil.closeQuietly(this.channel);
|
||||
}
|
||||
}
|
@ -19,9 +19,9 @@
|
||||
* 通过定义统一接口,统一消息中间件的调用,实现消息队列的解耦。
|
||||
* 组件包括:
|
||||
* <ul>
|
||||
* <li>Producer: 消息生产者,业务的发起方,负责生产消息</li>
|
||||
* <li>Consumer: 消息消费者,业务的处理方</li>
|
||||
* <li>Message: 消息体,根据不同通信协议定义的固定格式进行编码的数据包</li>
|
||||
* <li>{@link org.dromara.hutool.extra.mq.Producer}: 消息生产者,业务的发起方,负责生产消息</li>
|
||||
* <li>{@link org.dromara.hutool.extra.mq.Consumer}: 消息消费者,业务的处理方</li>
|
||||
* <li>{@link org.dromara.hutool.extra.mq.Message}: 消息体,根据不同通信协议定义的固定格式进行编码的数据包</li>
|
||||
* </ul>
|
||||
*
|
||||
* @author Looly
|
||||
|
@ -19,7 +19,7 @@ package org.dromara.hutool.extra.pinyin;
|
||||
import org.dromara.hutool.core.exception.HutoolException;
|
||||
|
||||
/**
|
||||
* 模板异常
|
||||
* 拼音异常
|
||||
*
|
||||
* @author Looly
|
||||
*/
|
||||
|
Loading…
x
Reference in New Issue
Block a user