Netty入门(一)快速入门

Netty介绍

Netty是一个NIO client-server(客户端服务器)框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来使开发网络应用程序,这种新的方式使得它很容易使用和有很强的扩展性。Netty的内部实现时很复杂的,但是Netty提供了简单易用的api从网络处理代码中解耦业务逻辑。Netty是完全基于NIO实现的,所以整个Netty都是异步的。

Netty框架的组成

一个Netty程序的工作图如下


1.客户端连接到服务器
2.建立连接后,发送或接收数据
3.服务器处理所有的客户端连接

Netty为什么传输快

  • Netty的传输快其实也是依赖了NIO的一个特性——零拷贝。我们知道,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达他的的终点,如果数据量大,就会造成不必要的资源浪费。
  • Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。

编写一个HelloWorld

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Server {

public static void main(String[] args) throws Exception{
//1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();

//2 创建服务器辅助类
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32*1024)
.option(ChannelOption.SO_RCVBUF, 32*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//设置特殊分隔符
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
//设置字符串形式的解码
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
}
});

//4 绑定连接
ChannelFuture cf = b.bind(8765).sync();
//等待服务器监听端口关闭
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}

服务端消息处理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class ServerHandler extends ChannelHandlerAdapter{

/**
* 通道刚被激活时会调用次方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("server channel active... ");
}

/**
* 读取消息方法
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Server :" + body );

String response = "进行返回给客户端的响应:" + body ;
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));

}

/**
* 读取完毕后处理方法
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("读完了");
ctx.flush();
}


/***
* 这个方法会在发生异常时触发

*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
/**
* exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 由于 IO
* 错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来 并且把关联的 channel
* 给关闭掉。然而这个方法的处理方式会在遇到不同异常的情况下有不 同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
*/
// 出现异常就关闭
t.printStackTrace();
ctx.close();
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Client {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ClientHandler()); //收到服务端发送过来的消息处理类
}
});

ChannelFuture cf1 = null;
try {
cf1 = b.connect("127.0.0.1", 8765).sync();
//发送消息
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));

//关闭通道
cf1.channel().closeFuture().sync();
group.shutdownGracefully();

} catch (InterruptedException e) {
e.printStackTrace();
}

}

客户端消息处理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
**
* 服务端处理通道.这里只是打印一下请求的内容,并不对请求进行任何的响应 DiscardServerHandler 继承自
* ChannelHandlerAdapter, 这个类实现了ChannelHandler接口, ChannelHandler提供了许多事件处理的接口方法,
* 然后你可以覆盖这些方法。 现在仅仅只需要继承ChannelHandlerAdapter类而不是你自己去实现接口方法。
*
*/
public class ClientHandler extends ChannelHandlerAdapter {
/**
* 通道刚被激活时会调用次方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client channel active... ");
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

/**这里我们覆盖了chanelRead()事件处理方法。 每当从客户端收到新的数据时, 这个方法会在收到消息时被调用,
* 读取消息处理方法
* @param ctx 通道处理的上下文信息
* @param msg 接收的消息
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
String response = "收到服务器端的返回信息:" + body;
System.out.println(response);
} finally {
// 抛弃收到的数据
ReferenceCountUtil.release(msg);
}
}
}

觉得本文不错的话,分享一下给小伙伴吧~