[关闭]
@Yano 2017-08-19T15:22:44.000000Z 字数 9221 阅读 2308

Netty 之入门应用

Netty


说明

系列文章:http://www.jianshu.com/p/594441fb9c9e

本文完全参考自《Netty权威指南(第2版)》,李林峰著。

Netty 环境搭建

例程使用Maven构建工程,在pom文件中,加入Netty的依赖。

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.14.Final</version>
</dependency>

服务端程序

  1. public class TimeServer {
  2. public void bind(int port) throws Exception {
  3. // 配置服务端的NIO线程
  4. EventLoopGroup bossGroup = new NioEventLoopGroup();
  5. EventLoopGroup workerGroup = new NioEventLoopGroup();
  6. try {
  7. ServerBootstrap bootstrap = new ServerBootstrap();
  8. bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
  9. .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
  10. // 绑定端口,同步等待成功
  11. ChannelFuture f = bootstrap.bind(port).sync();
  12. // 等待服务端监听端口关闭
  13. f.channel().closeFuture().sync();
  14. } finally {
  15. // 优雅退出,释放线程池资源
  16. bossGroup.shutdownGracefully();
  17. workerGroup.shutdownGracefully();
  18. }
  19. }
  20. private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  21. @Override
  22. protected void initChannel(SocketChannel arg0) throws Exception {
  23. arg0.pipeline().addLast(new TimeServerHandler());
  24. }
  25. }
  26. public static void main(String[] args) throws Exception {
  27. int port = 8080;
  28. new TimeServer().bind(port);
  29. }
  30. }
  1. public class TimeServerHandler extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  4. ByteBuf buf = (ByteBuf) msg;
  5. byte[] req = new byte[buf.readableBytes()];
  6. buf.readBytes(req);
  7. String body = new String(req, Charsets.UTF_8);
  8. System.out.println("The time server receive order : " + body);
  9. String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date().toString() : "BAD ORDER";
  10. ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
  11. ctx.write(resp);
  12. }
  13. @Override
  14. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  15. ctx.flush();
  16. }
  17. @Override
  18. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  19. ctx.close();
  20. }
  21. }

流程:

客户端代码

  1. public class TimeClient {
  2. public void connect(int port, String host) throws Exception {
  3. EventLoopGroup group = new NioEventLoopGroup();
  4. try {
  5. Bootstrap bootstrap = new Bootstrap();
  6. bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
  7. .handler(new ChannelInitializer<SocketChannel>() {
  8. @Override
  9. protected void initChannel(SocketChannel ch) throws Exception {
  10. ch.pipeline().addLast(new TimeClientHandler());
  11. }
  12. });
  13. // 发起异步连接操作
  14. ChannelFuture future = bootstrap.connect(host, port).sync();
  15. // 等待客户端链路关闭
  16. future.channel().closeFuture().sync();
  17. } finally {
  18. // 优雅退出,释放NIO线程组
  19. group.shutdownGracefully();
  20. }
  21. }
  22. public static void main(String[] args) throws Exception {
  23. int port = 8080;
  24. new TimeClient().connect(port, "127.0.0.1");
  25. }
  26. }
  1. public class TimeClientHandler extends ChannelInboundHandlerAdapter {
  2. private final ByteBuf message;
  3. public TimeClientHandler() {
  4. byte[] req = "QUERY TIME ORDER".getBytes();
  5. message = Unpooled.buffer(req.length);
  6. message.writeBytes(req);
  7. }
  8. @Override
  9. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  10. ctx.writeAndFlush(message);
  11. }
  12. @Override
  13. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  14. ByteBuf buf = (ByteBuf) msg;
  15. byte[] req = new byte[buf.readableBytes()];
  16. buf.readBytes(req);
  17. String body = new String(req, Charsets.UTF_8);
  18. System.out.println("Now is : " + body);
  19. }
  20. @Override
  21. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  22. ctx.close();
  23. }
  24. }

调试和运行

启动后,客户端输出:

Now is : Sat Aug 19 13:41:54 CST 2017

服务端输出:

The time server receive order : QUERY TIME ORDER

问题:TCP粘包/拆包

是什么

TCP是个“流”协议,流就是没有界限的一串数据。TCP底层并不了解业务上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分。所以在业务上,一个完整的包可能会被TCP拆成多个包发送,也有可能把多个小的包封装成一个大包发送。

产生原因

解决策略

因为底层的TCP无法理解上层的业务数据,所以只能通过上层的应用协议栈来解决这个问题,主要的解决方案有:

代码模拟

服务端:

  1. public class TimeServerHandler extends ChannelInboundHandlerAdapter {
  2. private int counter;
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. ByteBuf buf = (ByteBuf) msg;
  6. byte[] req = new byte[buf.readableBytes()];
  7. buf.readBytes(req);
  8. String body = new String(req, Charsets.UTF_8).substring(0,
  9. req.length - System.getProperty("line.separator").length());
  10. System.out.println("count = " + ++counter);
  11. System.out.println("The time server receive order : " + body);
  12. String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date().toString() : "BAD ORDER";
  13. ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
  14. ctx.write(resp);
  15. }
  16. @Override
  17. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  18. ctx.flush();
  19. }
  20. @Override
  21. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  22. ctx.close();
  23. }
  24. }

客户端:

  1. public class TimeClientHandler extends ChannelInboundHandlerAdapter {
  2. private byte[] req;
  3. private int counter;
  4. public TimeClientHandler() {
  5. req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
  6. }
  7. @Override
  8. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  9. ByteBuf message = null;
  10. for (int i = 0; i < 100; i++) {
  11. message = Unpooled.buffer(req.length);
  12. message.writeBytes(req);
  13. ctx.writeAndFlush(message);
  14. }
  15. }
  16. @Override
  17. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  18. ByteBuf buf = (ByteBuf) msg;
  19. byte[] req = new byte[buf.readableBytes()];
  20. buf.readBytes(req);
  21. String body = new String(req, Charsets.UTF_8);
  22. System.out.println("Now is : " + body + ", the counter is " + ++counter);
  23. }
  24. @Override
  25. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  26. cause.printStackTrace();
  27. ctx.close();
  28. }
  29. }

利用LineBasedFrameDecoder解决TCP粘包问题

TimeServer和TimeClient中,都增加下面的代码:

  1. arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
  2. arg0.pipeline().addLast(new StringDecoder());

ServerHandler和ClientServer中,都将msg直接变成String。

完整代码

服务端:

  1. public class TimeServer {
  2. public void bind(int port) throws Exception {
  3. // 配置服务端的NIO线程
  4. EventLoopGroup bossGroup = new NioEventLoopGroup();
  5. EventLoopGroup workerGroup = new NioEventLoopGroup();
  6. try {
  7. ServerBootstrap bootstrap = new ServerBootstrap();
  8. bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
  9. .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
  10. // 绑定端口,同步等待成功
  11. ChannelFuture f = bootstrap.bind(port).sync();
  12. // 等待服务端监听端口关闭
  13. f.channel().closeFuture().sync();
  14. } finally {
  15. // 优雅退出,释放线程池资源
  16. bossGroup.shutdownGracefully();
  17. workerGroup.shutdownGracefully();
  18. }
  19. }
  20. private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  21. @Override
  22. protected void initChannel(SocketChannel arg0) throws Exception {
  23. arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
  24. arg0.pipeline().addLast(new StringDecoder());
  25. arg0.pipeline().addLast(new TimeServerHandler());
  26. }
  27. }
  28. public static void main(String[] args) throws Exception {
  29. int port = 8080;
  30. new TimeServer().bind(port);
  31. }
  32. }
  1. public class TimeServerHandler extends ChannelInboundHandlerAdapter {
  2. private int counter;
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. String body = (String) msg;
  6. System.out.println("count = " + ++counter);
  7. System.out.println("The time server receive order : " + body);
  8. String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date().toString() : "BAD ORDER";
  9. ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
  10. ctx.write(resp);
  11. }
  12. @Override
  13. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  14. ctx.flush();
  15. }
  16. @Override
  17. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  18. ctx.close();
  19. }
  20. }

客户端:

  1. public class TimeClient {
  2. public void connect(int port, String host) {
  3. EventLoopGroup group = new NioEventLoopGroup();
  4. try {
  5. Bootstrap bootstrap = new Bootstrap();
  6. bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
  7. .handler(new ChannelInitializer<SocketChannel>() {
  8. @Override
  9. protected void initChannel(SocketChannel ch) throws Exception {
  10. ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
  11. ch.pipeline().addLast(new StringDecoder());
  12. ch.pipeline().addLast(new TimeClientHandler());
  13. }
  14. });
  15. // 发起异步连接操作
  16. ChannelFuture future = bootstrap.connect(host, port).sync();
  17. // 等待客户端链路关闭
  18. future.channel().closeFuture().sync();
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. } finally {
  22. // 优雅退出,释放NIO线程组
  23. group.shutdownGracefully();
  24. }
  25. }
  26. public static void main(String[] args) throws Exception {
  27. int port = 8080;
  28. new TimeClient().connect(port, "127.0.0.1");
  29. }
  30. }
  1. public class TimeClientHandler extends ChannelInboundHandlerAdapter {
  2. private byte[] req;
  3. private int counter;
  4. public TimeClientHandler() {
  5. req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
  6. }
  7. @Override
  8. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  9. ByteBuf message = null;
  10. for (int i = 0; i < 100; i++) {
  11. message = Unpooled.buffer(req.length);
  12. message.writeBytes(req);
  13. ctx.writeAndFlush(message);
  14. }
  15. }
  16. @Override
  17. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  18. String body = (String) msg;
  19. System.out.println("Now is : " + body + ", the counter is " + ++counter);
  20. }
  21. @Override
  22. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  23. cause.printStackTrace();
  24. ctx.close();
  25. }
  26. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注