问题1:TCP有粘包问题吗?
先说一下答案:TCP本身不存在粘包和半包,因为TCP本质上只是一个传输控制协议(TCP),它是一种面向连接的、可靠的、基于字节流的传输层通信协议,由 IETF RFC 793 定义。
所谓约定本质上就是一种约定,就像Java编程约定采用驼峰式命名一样。 协议的含义是让通信双方能够正常交换消息。 粘包、半包问题是如何产生的? 呢绒?
这是因为在TCP交互中,数据是以字节流的形式传输的,而“流”的传输是没有边界的。 因为没有边界,所以无法区分报文的归属,从而会造成粘包和半包问题(粘包和半包的定义详见上一篇文章)。 因此,TCP协议本身并不存在粘包和半包的问题,但如果在使用过程中不能有效确定流的边界,就会出现粘包和半包的问题。
问题2:分隔符是最优解吗?
坦白说,经过评论区大家的耐心“开导”,我也意识到用终结者作为最终解决方案是有一定局限性的。 例如,如果消息中间出现终止符,就会导致半包问题。 因此,如果是复杂的字符串,就必须对内容进行编码和解码,以保证终止符的正确性。
问题3:效率高吗?
这个问题的答案是否定的。 其实上面开头已经描述了应用场景:“传统编程”。 学习它的意义在于了解一些更早期、更底层的知识。 当然,作为补充,本文将提供更高效的信息。 通信方案——Netty通信。
说完了上面的问题,我们先添加上一篇文章中提到的将消息分为消息头和消息体的代码实现。
1.封装消息头和消息体
在开始写服务端和客户端之前,我们先写一个消息封装类,可以用来将消息封装成消息头和消息体,如下图所示:
消息体的长度存储在消息头中,从而确定消息的边界,解决粘包和半包问题。
1.消息封装类
消息的封装类提供了两种方法:一是将消息转换为消息头+消息体,二是读取消息头。 具体实现代码如下:
class SocketPacket {
// 消息头存储的长度(占 8 字节)
static final int HEAD_SIZE = 8;
public byte[] toBytes(String context) {
// 协议体 byte 数组
byte[] bodyByte = context.getBytes();
int bodyByteLength = bodyByte.length;
// 最终封装对象
byte[] result = new byte[HEAD_SIZE + bodyByteLength];
// 借助 NumberFormat 将 int 转换为 byte[]
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(HEAD_SIZE);
numberFormat.setGroupingUsed(false);
// 协议头 byte 数组
byte[] headByte = numberFormat.format(bodyByteLength).getBytes();
// 封装协议头
System.arraycopy(headByte, 0, result, 0, HEAD_SIZE);
// 封装协议体
System.arraycopy(bodyByte, 0, result, HEAD_SIZE, bodyByteLength);
return result;
}
public int getHeader(InputStream inputStream) throws IOException {
int result = 0;
byte[] bytes = new byte[HEAD_SIZE];
inputStream.read(bytes, 0, HEAD_SIZE);
// 得到消息体的字节长度
result = Integer.valueOf(new String(bytes));
return result;
}
}
2.编写客户端
接下来,让我们定义客户端。 在客户端,我们添加一组要发送的消息,随机发送一条消息到服务器。 实现代码如下:
class MySocketClient {
public static void main(String[] args) throws IOException {
// 启动 Socket 并尝试连接服务器
Socket socket = new Socket("127.0.0.1", 9093);
// 发送消息合集(随机发送一条消息)
final String[] message = {"Hi,Java.", "Hi,SQL~", "关注公众号|Java中文社群."};
// 创建协议封装对象
SocketPacket socketPacket = new SocketPacket();
try (OutputStream outputStream = socket.getOutputStream()) {
// 给服务器端发送 10 次消息
for (int i = 0; i < 10; i++) {
// 随机发送一条消息
String msg = message[new Random().nextInt(message.length)];
// 将内容封装为:协议头+协议体
byte[] bytes = socketPacket.toBytes(msg);
// 发送消息
outputStream.write(bytes, 0, bytes.length);
outputStream.flush();
}
}
}
}
3. 编写服务器端
在服务器端,我们使用线程池来处理各个客户端的业务请求。 实现代码如下:
class MySocketServer {
public static void main(String[] args) throws IOException {
// 创建 Socket 服务器端
ServerSocket serverSocket = new ServerSocket(9093);
// 获取客户端连接
Socket clientSocket = serverSocket.accept();
// 使用线程池处理更多的客户端
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150, 100,
TimeUnit.SECONDS, new linkedBlockingQueue<>(1000));
threadPool.submit(() -> {
// 客户端消息处理
processMessage(clientSocket);
});
}
private static void processMessage(Socket clientSocket) {
// Socket 封装对象
SocketPacket socketPacket = new SocketPacket();
// 获取客户端发送的消息对象
try (InputStream inputStream = clientSocket.getInputStream()) {
while (true) {
// 获取消息头(也就是消息体的长度)
int bodyLength = socketPacket.getHeader(inputStream);
// 消息体 byte 数组
byte[] bodyByte = new byte[bodyLength];
// 每次实际读取字节数
int readCount = 0;
// 消息体赋值下标
int bodyIndex = 0;
// 循环接收消息头中定义的长度
while (bodyIndex <= (bodyLength - 1) &&
(readCount = inputStream.read(bodyByte, bodyIndex, bodyLength)) != -1) {
bodyIndex += readCount;
}
bodyIndex = 0;
// 成功接收到客户端的消息并打印
System.out.println("接收到客户端的信息:" + new String(bodyByte));
}
} catch (IOException ioException) {
System.out.println(ioException.getMessage());
}
}
}
上述程序的执行结果如下:
从以上结果可以看出,消息通信正常,客户端与服务端交互不存在粘包、半包问题。
2.使用Netty实现高效通信
以上内容针对传统编程,但要实现更高效的通信和连接对象的复用,请使用NIO(Non-IO,非阻塞IO)或AIO(IO,异步非阻塞IO)。
传统的编程是BIO(IO,同步阻塞IO),它与NIO、AIO的区别如下:
PS:AIO可以看作是NIO的升级版,也称为NIO 2。
传统通信流程:
NIO通信流程:
使用Netty代替传统的NIO编程
虽然NIO的设计思想很好,但是它的代码编写比较繁琐,比如NIO的使用和编写。 而且面对断线重连、丢包、粘包等复杂问题时,手动处理的成本非常高,所以我们通常使用Netty框架来替代传统的NIO。
什么是内蒂?
Netty是一个异步、事件驱动的网络应用框架,具有高性能和高可靠性。 它可以用来快速、方便地开发网络应用程序,大大简化了网络编程的复杂度。
Netty的主要优点如下:
框架设计优雅,底层模型可随意切换,适应不同网络协议需求; 提供多种标准协议、安全、编解码支持; 简化了NIO使用中的诸多不便; 社区非常活跃,很多开源框架都使用了Netty框架,比如Dubbo、Spark等。
Netty主要包括以下三个部分,如下图所示:
下面分别介绍这三部分的功能。
1.核心核心层
Core核心层是Netty最本质的内容,提供了底层网络通信的通用抽象和实现,包括可扩展的事件模型、通用通信API、对零拷贝的支持等。
2、协议支撑层
协议支持层基本涵盖了主流协议的编解码实现,如HTTP、SSL、压缩、大文件传输、文本、二进制等主流协议。 此外,Netty还支持自定义应用层协议。 Netty丰富的协议支持降低了用户的开发成本。 基于Netty,我们可以快速开发HTTP等服务。
3、传输服务层
传输服务层提供网络传输能力的定义和实现。 支持HTTP隧道、虚拟机管道等传输方式。 Netty对TCP、UDP等数据传输进行了抽象和封装,使用户可以更加专注于业务逻辑实现,而不必关心底层数据传输的细节。
Netty 使用
对Netty有了一个大概的了解后,我们将使用Netty来编写一个基本的通信服务器,它包括两端:服务器和客户端。 客户端负责发送消息,服务器端负责接收和打印消息。 具体实施步骤如下。
1.添加Netty框架
首先,我们需要添加对Netty框架的支持。 如果是Maven项目,添加以下配置:
io.netty
netty-all
4.1.56.Final
Netty 发行说明
Netty 3.x和4.x是主流稳定版本,最新的5.x已经是废弃的测试版本,所以建议使用Netty 4.x的最新稳定版本。
2.服务端实现代码
根据官方推荐,服务端代码分为以下三部分:
PS:字面意思是“通道”,是网络传播的载体。 提供网络I/O操作的基本API,如bind、read、write、flush等。Netty自己的实现是基于JDK NIO的。 与JDK NIO相比,Netty提供了更高层次的抽象,同时屏蔽了底层的复杂性并赋予其更强大的功能。 使用Netty的时候基本上不需要使用Netty。 然后您需要直接处理 Java 类。
服务器端的实现代码如下:
// 定义服务器的端口号
static final int PORT = 8007;
static class MyNettyServer {
public static void main(String[] args) {
// 创建一个线程组,用来负责接收客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 创建另一个线程组,用来负责 I/O 的读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建一个 Server 实例(可理解为 Netty 的入门类)
ServerBootstrap b = new ServerBootstrap();
// 将两个线程池设置到 Server 实例
b.group(bossGroup, workerGroup)
// 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)
.channel(NioServerSocketChannel.class)
// 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类)
.childHandler(new ServerInitializer());
// 绑定端口并且进行同步
ChannelFuture future = b.bind(PORT).sync();
// 对关闭通道进行监听
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 资源关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
static class ServerInitializer extends ChannelInitializer {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 服务器端连接之后的执行器(自定义的类)
private static final ServerHandler SERVER_HANDLER = new ServerHandler();
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 设置
ChannelPipeline pipeline = ch.pipeline();
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 服务器端连接之后的执行器,接收到消息之后的业务处理
pipeline.addLast(SERVER_HANDLER);
}
}
static class ServerHandler extends SimpleChannelInboundHandler {
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) {
if (!request.isEmpty()) {
System.out.println("接到客户端的消息:" + request);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
3.客户端实现代码
客户端的代码实现也分为以下三个部分:
客户端的实现代码如下:
static class MyNettyClient {
public static void main(String[] args) {
// 创建事件循环线程组(客户端的线程组只有一个)
EventLoopGroup group = new NioEventLoopGroup();
try {
// Netty 客户端启动对象
Bootstrap b = new Bootstrap();
// 设置启动参数
b.group(group)
// 设置通道类型
.channel(NioSocketChannel.class)
// 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)
.handler(new ClientInitializer());
// 连接服务器端并同步通道
Channel ch = b.connect("127.0.0.1", 8007).sync().channel();
// 发送消息
ChannelFuture lastWriteFuture = null;
// 给服务器端发送 10 条消息
for (int i = 0; i < 10; i++) {
// 发送给服务器消息
lastWriteFuture = ch.writeAndFlush("Hi,Java.");
}
// 在关闭通道之前,同步刷新所有的消息
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放资源
group.shutdownGracefully();
}
}
}
static class ClientInitializer extends ChannelInitializer {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客户端连接成功之后业务处理
private static final ClientHandler CLIENT_HANDLER = new ClientHandler();
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客户端连接成功之后的业务处理
pipeline.addLast(CLIENT_HANDLER);
}
}
static class ClientHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.err.println("接到服务器的消息:" + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
从上面的代码可以看出,我们的代码实现的功能是客户端向服务器发送10条消息。
写完上面的代码后,我们就可以启动服务端和客户端了。 启动后,它们的执行结果如下:
从上面的结果可以看出,虽然客户端和服务端已经进行了通信,但是在Netty的使用中仍然存在粘包的问题。 服务器一次接收 10 条消息,而不是一次只接收一条消息。 所以接下来我们要解决Netty中的粘包问题。
3.解决Netty粘包问题
在Netty中,针对粘包问题,常见的解决方案有3种:
设置固定大小的消息长度。 如果长度不足,则使用空字符来弥补。 它的缺点很明显,消耗网络流量,所以不建议使用; 使用分隔符来确定消息的边界,以避免粘包和半包。 产生; 消息分为消息头和消息体,消息头中存储当前整个消息的长度,只有读取到足够长度的消息后,才读取完整的消息。
接下来我们分别看看后两种推荐的方案。
1、使用隔板解决粘包问题
Netty提供了一个类来使用特殊符号作为消息的结束符,从而解决粘包和半包的问题。
其核心实现代码是在初始化()时通过设置来分隔消息,客户端和服务端都需要设置。 具体实现代码如下。
服务器端核心实现代码如下:
static class ServerInitializer extends ChannelInitializer {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 服务器端连接之后的执行器(自定义的类)
private static final ServerHandler SERVER_HANDLER = new ServerHandler();
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 设置
ChannelPipeline pipeline = ch.pipeline();
// 19 行:设置结尾分隔符【核心代码】(参数1:为消息的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符])
pipeline.addLast(new DelimiterbasedframeDecoder(1024, Delimiters.lineDelimiter()));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 服务器端连接之后的执行器,接收到消息之后的业务处理
pipeline.addLast(SERVER_HANDLER);
}
}
核心代码是第19行,代码中已经注释了该方法的含义,这里不再赘述。
客户端核心实现代码如下:
static class ClientInitializer extends ChannelInitializer {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客户端连接成功之后业务处理
private static final ClientHandler CLIENT_HANDLER = new ClientHandler();
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 17 行:设置结尾分隔符【核心代码】(参数1:为消息的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符])
pipeline.addLast(new DelimiterbasedframeDecoder(1024, Delimiters.lineDelimiter()));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客户端连接成功之后的业务处理
pipeline.addLast(CLIENT_HANDLER);
}
}
完整的服务器端和客户端实现代码如下:
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterbasedframeDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyExample {
// 定义服务器的端口号
static final int PORT = 8007;
static class MyNettyServer {
public static void main(String[] args) {
// 创建一个线程组,用来负责接收客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 创建另一个线程组,用来负责 I/O 的读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建一个 Server 实例(可理解为 Netty 的入门类)
ServerBootstrap b = new ServerBootstrap();
// 将两个线程池设置到 Server 实例
b.group(bossGroup, workerGroup)
// 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)
.channel(NioServerSocketChannel.class)
// 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类)
.childHandler(new ServerInitializer());
// 绑定端口并且进行同步
ChannelFuture future = b.bind(PORT).sync();
// 对关闭通道进行监听
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 资源关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
static class ServerInitializer extends ChannelInitializer {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 服务器端连接之后的执行器(自定义的类)
private static final ServerHandler SERVER_HANDLER = new ServerHandler();
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 设置
ChannelPipeline pipeline = ch.pipeline();
// 设置结尾分隔符
pipeline.addLast(new DelimiterbasedframeDecoder(1024, Delimiters.lineDelimiter()));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 服务器端连接之后的执行器,接收到消息之后的业务处理
pipeline.addLast(SERVER_HANDLER);
}
}
static class ServerHandler extends SimpleChannelInboundHandler {
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) {
if (!request.isEmpty()) {
System.out.println("接到客户端的消息:" + request);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
static class MyNettyClient {
public static void main(String[] args) {
// 创建事件循环线程组(客户端的线程组只有一个)
EventLoopGroup group = new NioEventLoopGroup();
try {
// Netty 客户端启动对象
Bootstrap b = new Bootstrap();
// 设置启动参数
b.group(group)
// 设置通道类型
.channel(NioSocketChannel.class)
// 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)
.handler(new ClientInitializer());
// 连接服务器端并同步通道
Channel ch = b.connect("127.0.0.1", PORT).sync().channel();
// 发送消息
ChannelFuture lastWriteFuture = null;
// 给服务器端发送 10 条消息
for (int i = 0; i < 10; i++) {
// 发送给服务器消息
lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");
}
// 在关闭通道之前,同步刷新所有的消息
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放资源
group.shutdownGracefully();
}
}
}
static class ClientInitializer extends ChannelInitializer {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客户端连接成功之后业务处理
private static final ClientHandler CLIENT_HANDLER = new ClientHandler();
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 设置结尾分隔符
pipeline.addLast(new DelimiterbasedframeDecoder(1024, Delimiters.lineDelimiter()));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客户端连接成功之后的业务处理
pipeline.addLast(CLIENT_HANDLER);
}
}
static class ClientHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.err.println("接到服务器的消息:" + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}
最终执行结果如下图所示:
从上面的结果可以看出,Netty可以正常使用,并且不再出现粘包和半包的问题。
2.对消息进行封装,解决粘包问题
该方案的核心是将消息分为消息头+消息体,并在消息头中保存消息体的长度来确定消息的边界,从而避免粘包和半包问题。 其实现流程如下图所示:
在Netty中,消息封装可以通过()和()两个类来实现。 与之前的解决方案类似,我们需要通过在服务器端和客户端分别设置()来解决粘包问题。
服务器端核心代码如下:
static class ServerInitializer extends ChannelInitializer {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 服务器端连接之后的执行器(自定义的类)
private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 设置
ChannelPipeline pipeline = ch.pipeline();
// 18 行:消息解码:读取消息头和消息体
pipeline.addLast(new LengthFieldbasedframeDecoder(1024, 0, 4, 0, 4));
// 20 行:消息编码:将消息封装为消息头和消息体,在消息前添加消息体的长度
pipeline.addLast(new LengthFieldPrepender(4));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 服务器端连接之后的执行器,接收到消息之后的业务处理
pipeline.addLast(SERVER_HANDLER);
}
}
其中,核心代码分别为18行和20行,通过实现编码(将消息打包成消息头+消息体),并实现解码(从打包的消息中取出消息内容)。
参数说明如下:
(1024,0,4,0,4)表示:数据包最大长度为1024,长度字段占用前四个字节,读取数据时去掉前四个字节(即长度字段) 。
客户端核心实现代码如下:
static class ClientInitializer extends ChannelInitializer {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客户端连接成功之后业务处理
private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 消息解码:读取消息头和消息体
pipeline.addLast(new LengthFieldbasedframeDecoder(1024, 0, 4, 0, 4));
// 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度
pipeline.addLast(new LengthFieldPrepender(4));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客户端连接成功之后的业务处理
pipeline.addLast(CLIENT_HANDLER);
}
}
完整的服务器端和客户端实现代码如下:
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldbasedframeDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyExample {
// 定义服务器的端口号
static final int PORT = 8007;
static class MyNettyServer {
public static void main(String[] args) {
// 创建一个线程组,用来负责接收客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 创建另一个线程组,用来负责 I/O 的读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建一个 Server 实例(可理解为 Netty 的入门类)
ServerBootstrap b = new ServerBootstrap();
// 将两个线程池设置到 Server 实例
b.group(bossGroup, workerGroup)
// 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)
.channel(NioServerSocketChannel.class)
// 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类)
.childHandler(new NettyExample.ServerInitializer());
// 绑定端口并且进行同步
ChannelFuture future = b.bind(PORT).sync();
// 对关闭通道进行监听
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 资源关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
static class ServerInitializer extends ChannelInitializer {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 服务器端连接之后的执行器(自定义的类)
private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 设置
ChannelPipeline pipeline = ch.pipeline();
// 消息解码:读取消息头和消息体
pipeline.addLast(new LengthFieldbasedframeDecoder(1024, 0, 4, 0, 4));
// 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度
pipeline.addLast(new LengthFieldPrepender(4));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 服务器端连接之后的执行器,接收到消息之后的业务处理
pipeline.addLast(SERVER_HANDLER);
}
}
static class ServerHandler extends SimpleChannelInboundHandler {
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) {
if (!request.isEmpty()) {
System.out.println("接到客户端的消息:" + request);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
static class MyNettyClient {
public static void main(String[] args) {
// 创建事件循环线程组(客户端的线程组只有一个)
EventLoopGroup group = new NioEventLoopGroup();
try {
// Netty 客户端启动对象
Bootstrap b = new Bootstrap();
// 设置启动参数
b.group(group)
// 设置通道类型
.channel(NioSocketChannel.class)
// 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)
.handler(new NettyExample.ClientInitializer());
// 连接服务器端并同步通道
Channel ch = b.connect("127.0.0.1", PORT).sync().channel();
// 发送消息
ChannelFuture lastWriteFuture = null;
// 给服务器端发送 10 条消息
for (int i = 0; i < 10; i++) {
// 发送给服务器消息
lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");
}
// 在关闭通道之前,同步刷新所有的消息
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放资源
group.shutdownGracefully();
}
}
}
static class ClientInitializer extends ChannelInitializer {
// 字符串编码器和解码器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客户端连接成功之后业务处理
private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 消息解码:读取消息头和消息体
pipeline.addLast(new LengthFieldbasedframeDecoder(1024, 0, 4, 0, 4));
// 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度
pipeline.addLast(new LengthFieldPrepender(4));
// 设置(字符串)编码器和解码器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客户端连接成功之后的业务处理
pipeline.addLast(CLIENT_HANDLER);
}
}
static class ClientHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.err.println("接到服务器的消息:" + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}
上述程序的执行结果为:
四。 概括
本文提供了传统通信的具体代码实现,将消息分为消息头和消息体。 但传统的性能和复用性一般。 为了更高效地实现通信,我们可以使用Netty框架来代替传统的NIO编程,但是Netty在使用时仍然存在粘包的问题,所以我们提供了两种最常见的解决方案:通过分隔符或者封装消息的解决方案,最后一种方案应用更为广泛。
参考文献和致谢
《Netty核心原理解析与RPC实践》