From 73659a089452945a78ec5531cfb50ccfa6b837bf Mon Sep 17 00:00:00 2001 From: Looly Date: Tue, 28 Jan 2025 18:09:59 +0800 Subject: [PATCH] add rocketmq --- hutool-extra/pom.xml | 6 + .../extra/mq/engine/kafka/KafkaConsumer.java | 10 ++ .../mq/engine/rocketmq/RocketMQConsumer.java | 106 ++++++++++++++++++ .../mq/engine/rocketmq/RocketMQEngine.java | 71 ++++++++++++ .../mq/engine/rocketmq/RocketMQProducer.java | 62 ++++++++++ .../mq/engine/rocketmq/package-info.java | 22 ++++ 6 files changed, 277 insertions(+) create mode 100644 hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQConsumer.java create mode 100644 hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQEngine.java create mode 100644 hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQProducer.java create mode 100644 hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/package-info.java diff --git a/hutool-extra/pom.xml b/hutool-extra/pom.xml index 5dd55c992..dc292ca7e 100755 --- a/hutool-extra/pom.xml +++ b/hutool-extra/pom.xml @@ -570,5 +570,11 @@ 5.24.0 provided + + org.apache.rocketmq + rocketmq-client + 5.3.1 + provided + diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.java index 45408fee2..083b95f9b 100644 --- a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.java +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.java @@ -90,10 +90,20 @@ public class KafkaConsumer implements Consumer { IoUtil.nullSafeClose(this.consumer); } + /** + * 消费者记录包装为消息 + * + * @author looly + */ private static class ConsumerRecordMessage implements Message { private final ConsumerRecord record; + /** + * 构造 + * + * @param record {@link ConsumerRecord} + */ private ConsumerRecordMessage(final ConsumerRecord record) { this.record = record; } diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQConsumer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQConsumer.java new file mode 100644 index 000000000..559c298b9 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQConsumer.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.rocketmq; + +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.MQException; +import org.dromara.hutool.extra.mq.Message; +import org.dromara.hutool.extra.mq.MessageHandler; + +import java.io.IOException; + +/** + * RocketMQ 消费者 + * + * @author Looly + * @since 6.0.0 + */ +public class RocketMQConsumer implements Consumer { + + private final MQPushConsumer consumer; + + /** + * 构造 + * + * @param consumer RocketMQ PushConsumer + */ + public RocketMQConsumer(final MQPushConsumer consumer) { + this.consumer = consumer; + } + + /** + * 设置消费的Topic + * + * @param topic Topic + * @return this + */ + public RocketMQConsumer setTopic(final String topic) { + try { + this.consumer.subscribe(topic, "*"); + } catch (final MQClientException e) { + throw new MQException(e); + } + return this; + } + + @Override + public void subscribe(final MessageHandler messageHandler) { + this.consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (final MessageExt msg : msgs) { + messageHandler.handle(new RocketMQMessage(msg)); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + } + + @Override + public void close() throws IOException { + if (null != this.consumer) { + this.consumer.shutdown(); + } + } + + /** + * RocketMQ消息包装 + * + * @author Looly + * @since 6.0.0 + */ + private static class RocketMQMessage implements Message { + private final MessageExt messageExt; + + private RocketMQMessage(final MessageExt messageExt) { + this.messageExt = messageExt; + } + + + @Override + public String topic() { + return messageExt.getTopic(); + } + + @Override + public byte[] content() { + return messageExt.getBody(); + } + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQEngine.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQEngine.java new file mode 100644 index 000000000..1cb324a07 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQEngine.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.rocketmq; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.dromara.hutool.core.lang.Assert; +import org.dromara.hutool.extra.mq.Consumer; +import org.dromara.hutool.extra.mq.MQConfig; +import org.dromara.hutool.extra.mq.MQException; +import org.dromara.hutool.extra.mq.Producer; +import org.dromara.hutool.extra.mq.engine.MQEngine; + +/** + * RocketMQ引擎 + * + * @author Looly + * @since 6.0.0 + */ +public class RocketMQEngine implements MQEngine { + + private MQConfig config; + + /** + * 默认构造 + */ + public RocketMQEngine() { + // SPI方式加载时检查库是否引入 + Assert.notNull( org.apache.rocketmq.common.message.Message.class); + } + + @Override + public RocketMQEngine init(final MQConfig config) { + this.config = config; + return this; + } + + @Override + public Producer getProducer() { + final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); + defaultMQProducer.setNamesrvAddr(config.getBrokerUrl()); + try { + defaultMQProducer.start(); + } catch (final MQClientException e) { + throw new MQException(e); + } + return new RocketMQProducer(defaultMQProducer); + } + + @Override + public Consumer getConsumer() { + final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(); + defaultMQPushConsumer.setNamesrvAddr(config.getBrokerUrl()); + return new RocketMQConsumer(defaultMQPushConsumer); + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQProducer.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQProducer.java new file mode 100644 index 000000000..decd947c8 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/RocketMQProducer.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.hutool.extra.mq.engine.rocketmq; + +import org.apache.rocketmq.client.producer.MQProducer; +import org.dromara.hutool.extra.mq.MQException; +import org.dromara.hutool.extra.mq.Message; +import org.dromara.hutool.extra.mq.Producer; + +import java.io.IOException; + +/** + * RocketMQ Producer + * + * @author Looly + * @since 6.0.0 + */ +public class RocketMQProducer implements Producer { + + private final MQProducer producer; + + /** + * 构造 + * + * @param producer RocketMQ Producer + */ + public RocketMQProducer(final MQProducer producer) { + this.producer = producer; + } + + @Override + public void send(final Message message) { + final org.apache.rocketmq.common.message.Message rocketMessage = + new org.apache.rocketmq.common.message.Message(message.topic(), message.content()); + try { + this.producer.send(rocketMessage); + } catch (final Exception e) { + throw new MQException(e); + } + } + + @Override + public void close() throws IOException { + if (null != this.producer) { + this.producer.shutdown(); + } + } +} diff --git a/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/package-info.java b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/package-info.java new file mode 100644 index 000000000..004cc5991 --- /dev/null +++ b/hutool-extra/src/main/java/org/dromara/hutool/extra/mq/engine/rocketmq/package-info.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * RocketMQ引擎 + * + * @author Looly + */ +package org.dromara.hutool.extra.mq.engine.rocketmq;