add ChannelUtil

This commit is contained in:
Looly 2022-05-23 11:36:58 +08:00
parent de75ff8e63
commit 0cbbc69c4f
2 changed files with 74 additions and 34 deletions

View File

@ -0,0 +1,62 @@
package cn.hutool.socket;
import cn.hutool.core.io.IORuntimeException;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
/**
* Channel相关封装
*
* @author looly
* @since 5.8.2
*/
public class ChannelUtil {
/**
* 创建{@link AsynchronousChannelGroup}
*
* @param poolSize 线程池大小
* @return {@link AsynchronousChannelGroup}
*/
public static AsynchronousChannelGroup createFixedGroup(final int poolSize) {
try {
return AsynchronousChannelGroup.withFixedThreadPool(//
poolSize, // 默认线程池大小
ThreadFactoryBuilder.create().setNamePrefix("Huool-socket-").build()//
);
} catch (final IOException e) {
throw new IORuntimeException(e);
}
}
/**
* 连接到指定地址
*
* @param group {@link AsynchronousChannelGroup}
* @param address 地址信息包括地址和端口
* @return {@link AsynchronousSocketChannel}
*/
public static AsynchronousSocketChannel connect(final AsynchronousChannelGroup group, final InetSocketAddress address) {
final AsynchronousSocketChannel channel;
try {
channel = AsynchronousSocketChannel.open(group);
} catch (final IOException e) {
throw new IORuntimeException(e);
}
try {
channel.connect(address).get();
} catch (final InterruptedException | ExecutionException e) {
IoUtil.close(channel);
throw new SocketRuntimeException(e);
}
return channel;
}
}

View File

@ -1,19 +1,14 @@
package cn.hutool.socket.aio;
import cn.hutool.core.io.IORuntimeException;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import cn.hutool.socket.ChannelUtil;
import cn.hutool.socket.SocketConfig;
import cn.hutool.socket.SocketRuntimeException;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
/**
* Aio Socket客户端
@ -21,14 +16,14 @@ import java.util.concurrent.ExecutionException;
* @author looly
* @since 4.5.0
*/
public class AioClient implements Closeable{
public class AioClient implements Closeable {
private final AioSession session;
/**
* 构造
*
* @param address 地址
* @param address 地址
* @param ioAction IO处理类
*/
public AioClient(final InetSocketAddress address, final IoAction<ByteBuffer> ioAction) {
@ -38,9 +33,9 @@ public class AioClient implements Closeable{
/**
* 构造
*
* @param address 地址
* @param address 地址
* @param ioAction IO处理类
* @param config 配置项
* @param config 配置项
*/
public AioClient(final InetSocketAddress address, final IoAction<ByteBuffer> ioAction, final SocketConfig config) {
this(createChannel(address, config.getThreadPoolSize()), ioAction, config);
@ -49,9 +44,9 @@ public class AioClient implements Closeable{
/**
* 构造
*
* @param channel {@link AsynchronousSocketChannel}
* @param channel {@link AsynchronousSocketChannel}
* @param ioAction IO处理类
* @param config 配置项
* @param config 配置项
*/
public AioClient(final AsynchronousSocketChannel channel, final IoAction<ByteBuffer> ioAction, final SocketConfig config) {
this.session = new AioSession(channel, ioAction, config);
@ -62,8 +57,8 @@ public class AioClient implements Closeable{
* 设置 Socket Option 选项<br>
* 选项见{@link java.net.StandardSocketOptions}
*
* @param <T> 选项泛型
* @param name {@link SocketOption} 枚举
* @param <T> 选项泛型
* @param name {@link SocketOption} 枚举
* @param value SocketOption参数
* @return this
* @throws IOException IO异常
@ -112,33 +107,16 @@ public class AioClient implements Closeable{
}
// ------------------------------------------------------------------------------------- Private method start
/**
* 初始化
*
* @param address 地址和端口
* @param address 地址和端口
* @param poolSize 线程池大小
* @return this
*/
private static AsynchronousSocketChannel createChannel(final InetSocketAddress address, final int poolSize) {
final AsynchronousSocketChannel channel;
try {
final AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(//
poolSize, // 默认线程池大小
ThreadFactoryBuilder.create().setNamePrefix("Huool-socket-").build()//
);
channel = AsynchronousSocketChannel.open(group);
} catch (final IOException e) {
throw new IORuntimeException(e);
}
try {
channel.connect(address).get();
} catch (final InterruptedException | ExecutionException e) {
IoUtil.close(channel);
throw new SocketRuntimeException(e);
}
return channel;
return ChannelUtil.connect(ChannelUtil.createFixedGroup(poolSize), address);
}
// ------------------------------------------------------------------------------------- Private method end
}