1. 介绍

In this article, we're going to take a look at Netty — an asynchronous event-driven network application framework.

The main purpose of Netty is building high-performance protocol servers based on NIO (or possibly NIO.2) with separation and loose coupling of the network and business logic components. It might implement a widely known protocol, such as HTTP, or your own specific protocol.

2. 核心概念

Netty is a non-blocking framework. This leads to high throughput compared to blocking IO. Understanding non-blocking IO is crucial to understanding Netty's core components and their relationships.

2.1. Channel

Channel is the base of Java NIO. It represents an open connection which is capable of IO operations such as reading and writing.

2.2. Future

Every IO operation on a Channel in Netty is non-blocking.

This means that every operation is returned immediately after the call. There is a Future interface in the standard Java library, but it's not convenient for Netty purposes — we can only ask the Future about the completion of the operation or to block the current thread until the operation is done.

That's why Netty has its own ChannelFuture interface. We can pass a callback to ChannelFuture which will be called upon operation completion.

2.3. 事件和处理器

Netty uses an event-driven application paradigm, so the pipeline of the data processing is a chain of events going through handlers. Events and handlers can be related to the inbound and outbound data flow. Inbound events can be the following:

  • Channel activation and deactivation
  • Read operation events
  • Exception events
  • User events

Outbound events are simpler and, generally, are related to opening/closing a connection and writing/flushing data.

Netty applications consist of a couple of networking and application logic events and their handlers. The base interfaces for the channel event handlers are ChannelHandler and its ancestors ChannelOutboundHandler and ChannelInboundHandler.

Netty provides a huge hierarchy of implementations of ChannelHandler. It is worth noting the adapters which are just empty implementations, e.g. ChannelInboundHandlerAdapter and ChannelOutboundHandlerAdapter. We could extend these adapters when we need to process only a subset of all events.

Also, there are many implementations of specific protocols such as HTTP, e.g. HttpRequestDecoder, HttpResponseEncoder, HttpObjectAggregator. It would be good to get acquainted with them in Netty's Javadoc.

2.4. 编码器 & 解码器

As we work with the network protocol, we need to perform data serialization and deserialization. For this purpose, Netty introduces special extensions of the ChannelInboundHandler for decoders which are capable of decoding incoming data. The base class of most decoders is ByteToMessageDecoder.

For encoding outgoing data, Netty has extensions of the ChannelOutboundHandler called encoders. MessageToByteEncoder is the base for most encoder implementations_._ We can convert the message from byte sequence to Java object and vice versa with encoders and decoders.

3. 一个简单的 Server 端应用

Let's create a project representing a simple protocol server which receives a request, performs a calculation and sends a response.

3.1. Maven 依赖

First of all, we need to provide the Netty dependency in our pom.xml:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.10.Final</version>
</dependency>

We can find the latest version over on Maven Central.

3.2. 数据模型

The request data class would have the following structure:

    public class RequestData {
        private int intValue;
        private String stringValue;
        
        // standard getters and setters
    }

Let's assume that the server receives the request and returns the intValue multiplied by 2. The response would have the single int value:

    public class ResponseData {
        private int intValue;
    
        // standard getters and setters
    }

3.3. 解码器

Now we need to create encoders and decoders for our protocol messages.

It should be noted that Netty works with socket receive buffer, which is represented not as a queue but just as a bunch of bytes. This means that our inbound handler can be called when the full message is not received by a server.

We must make sure that we have received the full message before processing and there are many ways to do that.

First of all, we can create a temporary ByteBuf and append to it all inbound bytes until we get the required amount of bytes:

    public class SimpleProcessingHandler 
      extends ChannelInboundHandlerAdapter {
        private ByteBuf tmp;
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            System.out.println("Handler added");
            tmp = ctx.alloc().buffer(4);
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) {
            System.out.println("Handler removed");
            tmp.release();
            tmp = null;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf m = (ByteBuf) msg;
            tmp.writeBytes(m);
            m.release();
            if (tmp.readableBytes() >= 4) {
                // request processing
                RequestData requestData = new RequestData();
                requestData.setIntValue(tmp.readInt());
                ResponseData responseData = new ResponseData();
                responseData.setIntValue(requestData.getIntValue() * 2);
                ChannelFuture future = ctx.writeAndFlush(responseData);
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }

The example shown above looks a bit weird but helps us to understand how Netty works. Every method of our handler is called when its corresponding event occurs. So we initialize the buffer when the handler is added, fill it with data on receiving new bytes and start processing it when we get enough data.

We deliberately did not use a stringValue — decoding in such a manner would be unnecessarily complex. That's why Netty provides useful decoder classes which are implementations of ChannelInboundHandler: ByteToMessageDecoder and ReplayingDecoder.

As we noted above we can create a channel processing pipeline with Netty. So we can put our decoder as the first handler and the processing logic handler can come after it.

The decoder for RequestData is shown next:

    public class RequestDecoder extends ReplayingDecoder<RequestData> {
    
        private final Charset charset = Charset.forName("UTF-8");
    
        @Override
        protected void decode(ChannelHandlerContext ctx, 
          ByteBuf in, List<Object> out) throws Exception {
     
            RequestData data = new RequestData();
            data.setIntValue(in.readInt());
            int strLen = in.readInt();
            data.setStringValue(
              in.readCharSequence(strLen, charset).toString());
            out.add(data);
        }
    }

An idea of this decoder is pretty simple. It uses an implementation of ByteBuf which throws an exception when there is not enough data in the buffer for the reading operation.

When the exception is caught the buffer is rewound to the beginning and the decoder waits for a new portion of data. Decoding stops when the out list is not empty after decode execution.

3.4. 编码器

Besides decoding the RequestData we need to encode the message. This operation is simpler because we have the full message data when the write operation occurs.

We can write data to Channel in our main handler or we can separate the logic and create a handler extending MessageToByteEncoder which will catch the write ResponseData operation:

    public class ResponseDataEncoder 
      extends MessageToByteEncoder<ResponseData> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, 
          ResponseData msg, ByteBuf out) throws Exception {
            out.writeInt(msg.getIntValue());
        }
    }

3.5. 请求处理

Since we carried out the decoding and encoding in separate handlers we need to change our ProcessingHandler:

    public class ProcessingHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) 
          throws Exception {
     
            RequestData requestData = (RequestData) msg;
            ResponseData responseData = new ResponseData();
            responseData.setIntValue(requestData.getIntValue() * 2);
            ChannelFuture future = ctx.writeAndFlush(responseData);
            future.addListener(ChannelFutureListener.CLOSE);
            System.out.println(requestData);
        }
    }

3.6. 启动 Server

Now let's put it all together and run our server:

    public class NettyServer {
    
        private int port;
    
        // constructor
    
        public static void main(String[] args) throws Exception {
     
            int port = args.length > 0
              ? Integer.parseInt(args[0]);
              : 8080;
     
            new NettyServer(port).run();
        }
    
        public void run() throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                  .channel(NioServerSocketChannel.class)
                  .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) 
                      throws Exception {
                        ch.pipeline().addLast(new RequestDecoder(), 
                          new ResponseDataEncoder(), 
                          new ProcessingHandler());
                    }
                }).option(ChannelOption.SO_BACKLOG, 128)
                  .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                ChannelFuture f = b.bind(port).sync();
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    }

The details of the classes used in the above server bootstrap example can be found in their Javadoc. The most interesting part is this line:

ch.pipeline().addLast(
  new RequestDecoder(), 
  new ResponseDataEncoder(), 
  new ProcessingHandler());

Here we define inbound and outbound handlers that will process requests and output in the correct order.

4. Client 端应用

The client should perform reverse encoding and decoding, so we need to have a RequestDataEncoder and ResponseDataDecoder:

    public class RequestDataEncoder 
      extends MessageToByteEncoder<RequestData> {
    
        private final Charset charset = Charset.forName("UTF-8");
    
        @Override
        protected void encode(ChannelHandlerContext ctx, 
          RequestData msg, ByteBuf out) throws Exception {
     
            out.writeInt(msg.getIntValue());
            out.writeInt(msg.getStringValue().length());
            out.writeCharSequence(msg.getStringValue(), charset);
        }
    }
    
    public class ResponseDataDecoder 
      extends ReplayingDecoder<ResponseData> {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, 
          ByteBuf in, List<Object> out) throws Exception {
     
            ResponseData data = new ResponseData();
            data.setIntValue(in.readInt());
            out.add(data);
        }
    }

Also, we need to define a ClientHandler which will send the request and receive the response from server:

    public class ClientHandler extends ChannelInboundHandlerAdapter {
     
        @Override
        public void channelActive(ChannelHandlerContext ctx) 
          throws Exception {
     
            RequestData msg = new RequestData();
            msg.setIntValue(123);
            msg.setStringValue(
              "all work and no play makes jack a dull boy");
            ChannelFuture future = ctx.writeAndFlush(msg);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) 
          throws Exception {
            System.out.println((ResponseData)msg);
            ctx.close();
        }
    }

Now let's bootstrap the client:

    public class NettyClient {
        public static void main(String[] args) throws Exception {
     
            String host = "localhost";
            int port = 8080;
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                Bootstrap b = new Bootstrap();
                b.group(workerGroup);
                b.channel(NioSocketChannel.class);
                b.option(ChannelOption.SO_KEEPALIVE, true);
                b.handler(new ChannelInitializer<SocketChannel>() {
     
                    @Override
                    public void initChannel(SocketChannel ch) 
                      throws Exception {
                        ch.pipeline().addLast(new RequestDataEncoder(), 
                          new ResponseDataDecoder(), new ClientHandler());
                    }
                });
    
                ChannelFuture f = b.connect(host, port).sync();
    
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    }

As we can see, there are many details in common with the server bootstrapping.

Now we can run the client's main method and take a look at the console output. As expected, we got ResponseData with intValue equal to 246.

5. 总结

In this article, we had a quick introduction to Netty. We showed its core components such as Channel and ChannelHandler. Also, we've made a simple non-blocking protocol server and a client for it.

As always, all code samples are available over on GitHub.