Netty4.x 源码实战系列(四):Pipeline全剖析

浏览: 165 发布日期: 2018-01-01 分类: java

在上一篇《Netty4.x 源码实战系列(三):NioServerSocketChannel全剖析》中,我们详细分析了NioServerSocketChannel的初始化过程,并得出了如下结论:

在netty中,每一个channel都有一个pipeline对象,并且其内部本质上就是一个双向链表

本篇我们将深入Pipeline源码内部,对其一探究竟,给大家一个全方位解析。

Pipeline初始化过程分析

在上一篇中,我们得知channel中的pipeline其实就是DefaultChannelPipeline的实例,首先我们先看看DefaultChannelPipeline的类继承结构图:

根据类继承结构图,我们看到DefaultChannelPipeline实现了 ChannelInboundInvoker及ChannelOutboundInvoker两个接口。
顾名思义,一个是处理通道的inbound事件调用器,另一个是处理通道的outbound事件调用器。

inbound: 本质上就是执行I/O线程将从外部read到的数据 传递给 业务线程的一个过程。
outbound: 本质上就是业务线程 将数据 传递给I/O线程, 直至发送给外部的一个过程。

如下图所示:

我们再回到DefaultChannelPipeline这个类,看看其构造方法:

protected DefaultChannelPipeline(Channel channel) {
    // 通道绑定channel对象
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    
    // 初始化头、尾上下文
    tail = new TailContext(this);
    head = new HeadContext(this);

    // 头尾相互链接,形成双向链表
    head.next = tail;
    tail.prev = head;
}

此构造方法主要做了三件事:
1、绑定了当前NioServerSocketChannel实例
2、初始化pipeline双向链表的头、尾节点

关于NioServerSocketChannel,我在前一篇中已经做过详细描述,现在我们着重看看head及tail这两个属性。

从上面的构造方法得知,head是HeadContext的实例,tail是TailContext的实例,HeadContext与TailContext都是DefaultChannelPipeline的内部类,它们的类继承结构图如下:

HeadContext类继承结构图

TailContext类继承结构图

从类继承图我们可以看出:
1、HeadContext与TailContext都是通道的handler(中文一般叫做处理器)
2、HeadContext既可以用于outbound过程的handler,也可以用于inbound过程的handler (关于inboun和outbound上面已经作了解释)
3、TailContext只可以用于inbound过程的handler
4、HeadContext 与 TailContext 同时也是一个处理器上下文对象

下面我将以HeadContext为例,看看它初始化过程中到底作了哪些工作

head = new HeadContext(this);

在DefaultChannelPipeline的构造方法中,我们看到head结点初始化代码如上面所示,对应构造器代码如下:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
    setAddComplete();
}

在其内部,它会继续调用父类AbstractChannelHandlerContext的构造器

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

此构造方法,只是设置了当前context对象对应的Pipeline以及此context是作用于outbound。
AbstractChannelHandlerContext类还有另外两个额外属性,他们是实现双向链表的关键:

volatile AbstractChannelHandlerContext next;  // 指定下一个结点
volatile AbstractChannelHandlerContext prev;  // 指定前一个结点

HeadContext 同时还绑定了unsafe对象,我们再回顾一下unsafe对象。

我们从上一篇已经得知 unsafe其实就是对java nio 通道底层调用进行的封装,就相当于一个代理类对象。

而DefaultChannelPipeline初始化时,已经绑定了channel,且由于是服务端,所以此channel是NioServerSocketChannel

protected DefaultChannelPipeline(Channel channel) {
    // 通道绑定channel对象
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    
    ... //非相关代码已省略
}

所以

unsafe = pipeline.channel().unsafe();

就是

unsafe = new NioMessageUnsafe();

关于pipeline的tail结点初始化过程跟head差不多,这里就不作赘述了。

阶段性总结:
1、每个channel初始化时,都会创建一个与之对应的pipeline;
2、此pipeline内部就是一个双向链表;
3、双向链表的头结点是处理outbound过程的handler,尾节点是处理inbound过程的handler;
4、双向链表的结点同时还是handler上下文对象;

Pipeline在服务端bind过程中的应用

通过《Netty4.x 源码实战系列(二):服务端bind流程详解》 一文,我们知道,服务端channel在初始化过程中,会调用addLast方法,并传递了一个ChannelInitializer对象

@Override
void init(Channel channel) throws Exception {
    
    // 非相关代码已省略
    ChannelPipeline p = channel.pipeline();
    
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

本节我们将详细分析一下addLast的过程 与 ChannelInitializer。

ChannelInitializer
我们先看一下ChannelInitializer的类继承结构图

通过类继承图,我们得知ChannelInitializer的匿名对象其实就是一个处理inbound过程的处理器,与pipeline中的tail一样,目前稍有不同的就是ChannelInitializer的匿名对象并不是一个context对象。

关于ChannelInitializer匿名对象的initChannel方法实现的内容,本篇先不作详述,当讲到EventLoop时,我们再来回顾一下。

pipeline.addLast方法

addLast具体实现如下:

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

通过此方法参数,我们也可以得出,init(Channel channel)方法中的addLast其实就是想Pipeline中添加一个处理器。
addLast内部继续调用另一个重载方法:

@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    if (handlers == null) {
        throw new NullPointerException("handlers");
    }

    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);
    }

    return this;
}

最终调用的是下面的重载方法(已省略非相关代码):

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);

        
    }
    
    return this;
}

newContext方法的作用就是对传入的handler进行包装,最后返回一个绑定了handler的context对象:

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

新的对象是DefaultChannelHandlerContext类的实例。

接着我们再看看addLast0方法

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

经过addLast0,新包装的context已经添加至pipeline中了,此时的pipeline结果变化过程如下:

从addLast0代码片段得知, 每个新添加的结点,都是从tail结点之前插入

本篇总结:
经过本篇的代码研究,对于Pipeline得出以下结论:
1、channel初始化时,会同时创建一个与之对应的pipeline;
2、此pipeline本质上是一个handler处理器双向链表, 用于将处理inbound及outbound过程的handler都串联起来;
3、在netty中,对于I/O处理分为两种流向,对于获取外部数据资源进行处理的,都是对应inbound,比如read等,而对于向外部发送数据资源的,都对于outbound,比如connetct及write等。

关于pipeline中的handler调用过程,后面的章节我们会做详细分析。

返回顶部