EventLoopGroup与EventLoop 源码分析


声明:本文转载自https://my.oschina.net/LucasZhu/blog/1647376,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

1.NioEventLoopGroup实例化过程

下面来分析对NioEventLoopGroup类进行实例化的过程中发生了什么。

NioEventLoopGroup 类层次结构

先给出类图:


 

我们查看上面接口的结构图:

ExecutorService模拟Java线程池接口,ScheduledExecutorService模拟定时器线程池接口。

对于NioEventLoopGroup核心的类继承关系就是:

NioEventLoopGroup –》MultithreadEventLoopGroup –》MultithreadEventExecutorGroup

面从这三个类出发分析NioEventLoopGroup实例化过程。

首先盗用一下网上的一张示意图

下面大致解释一下这个实例化过程做了什么

//对于不指定线程数参数的构造器,默认设置0 (但是在后面的构造器中会判断,如果设置为0 就会初始化为2*CPU)     public NioEventLoopGroup() {         this(0);     }      /**      * Create a new instance using the specified number of threads, {@link ThreadFactory} and the      * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.      */ //然后调用:这里设置了NioEventLoopGroup线程池中每个线程执行器默认是null(这里设置为null,在后面的构造器中会判断,如果为null就实例化一个线程执行器)     public NioEventLoopGroup(int nThreads) {         this(nThreads, (Executor) null);     } //再调用:这里就存在于JDK的NIO的交互了,这里设置了线程池的SelectorProvider, 通过SelectorProvider.provider() 返回。     public NioEventLoopGroup(int nThreads, Executor executor) {         this(nThreads, executor, SelectorProvider.provider());     } //然后调用:在这个重载的构造器中又传入了默认的选择策略工厂DefaultSelectStrategyFactory;     public NioEventLoopGroup(             int nThreads, Executor executor, final SelectorProvider selectorProvider) {         this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);     } //这里就是调用父类MultithreadEventLoopGroup的构造器了, 这里还添加了线程的拒绝执行策略。     public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {         super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());     }

在MultithreadEventLoopGroup构造器调用:

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {         super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);     }

接下来就是调用的基类MultithreadEventExecutorGroup的构造器:

//这个构造器里面多传入了一个参数 DefaultEventExecutorChooserFactory.INSTANCE , // 通过这个EventLoop选择器工厂可以实例化GenericEventExecutorChooser或者PowerOfTwoEventExecutorChooser //这个类, 这个类是EventLoopGroup线程池里面的EventLoop的选择器, //调用GenericEventExecutorChooser.next() 方法可以从线程池中选择出一个合适的EventLoop线程。      protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {     this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }

然后就是重载调用MultithreadEventExecutorGroup类的构造器:

构造器调用到了这里其实也就是不断向上传递调用的终点了,由于构造器代码比较长,我就删除一些校验和不重要的代码,只保留核心代码:

/**  * 最终的创建实例构造器  *  * @param nThreads          该实例将使用的线程数  * @param executor          将要使用的executor, 默认为null  * @param chooserFactory    将要使用的EventExecutorChooserFactory  * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call  */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                         EventExecutorChooserFactory chooserFactory, Object... args) {     /** 1.初始化线程池 */     //参数校验nThread合法性,     if (nThreads <= 0) {         throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));     } 	//executor校验非空, 如果为空就创建ThreadPerTaskExecutor, 该类实现了 Executor接口     //这个executor 是用来执行线程池中的所有的线程,也就是所有的NioEventLoop,其实从     //NioEventLoop构造器中也可以知道,NioEventLoop构造器中都传入了executor这个参数。     if (executor == null) {         executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());     } 	//这里的children数组, 其实就是线程池的核心实现,线程池中就是通过指定的线程数组来实现线程池;     //数组中每个元素其实就是一个EventLoop,EventLoop是EventExecutor的子接口。     children = new EventExecutor[nThreads];  	//for循环实例化children数组,NioEventLoop对象     for (int i = 0; i < nThreads; i ++) {         boolean success = false;         try {         	//newChild(executor, args) 函数在NioEventLoopGroup类中实现了,              // 实质就是就是存入了一个 NIOEventLoop类实例             children[i] = newChild(executor, args);             success = true;         } catch (Exception e) {             // TODO: Think about if this is a good exception type             throw new IllegalStateException("failed to create a child event loop", e);         } finally {          	//如果构造失败, 就清理资源             if (!success) {                 for (int j = 0; j < i; j ++) {                     children[j].shutdownGracefully();                 }                  for (int j = 0; j < i; j ++) {                     EventExecutor e = children[j];                     try {                         while (!e.isTerminated()) {                             e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);                         }                     } catch (InterruptedException interrupted) {                         // Let the caller handle the interruption.                         Thread.currentThread().interrupt();                         break;                     }                 }             }         }     } 	// 2.实例化线程工厂执行器选择器: 根据children获取选择器      chooser = chooserFactory.newChooser(children); 	// 3.为每个EventLoop线程添加 线程终止监听器     final FutureListener<Object> terminationListener = new FutureListener<Object>() {         @Override         public void operationComplete(Future<Object> future) throws Exception {             if (terminatedChildren.incrementAndGet() == children.length) {                 terminationFuture.setSuccess(null);             }         }     };      for (EventExecutor e: children) {         e.terminationFuture().addListener(terminationListener);     } 	// 4. 将children 添加到对应的set集合中去重, 表示只可读。     Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);     Collections.addAll(childrenSet, children);//(EventLoop是EventExecutor的子接口)     readonlyChildren = Collections.unmodifiableSet(childrenSet); }

总结一下上面的初始化步骤(3)中的一些重点:

1)NIOEventLoopGroup的线程池实现其实就是一个NIOEventLoop数组,一个NIOEventLoop可以理解成就是一个线程。

2)所有的NIOEventLoop线程是使用相同的 executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler以及是属于某一个NIOEventLoopGroup的。 这一点从 newChild(executor, args); 方法就可以看出:newChild()的实现是在NIOEventLoopGroup中实现的。

    @Override     protected EventLoop newChild(Executor executor, Object... args) throws Exception {         return new NioEventLoop(this, executor, (SelectorProvider) args[0],             ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);     }

3)当有IO事件来时,需要从线程池中选择一个线程出来执行,这时候的NioEventLoop选择策略是由GenericEventExecutorChooser实现的, 并调用该类的next() 方法获取到下一个 NioEventLoop.

到了这里线程池的初始化就已经结束了, 基本这部分就只涉及 netty线程池的内容,不涉及到channel 与 channelPipeline和ChannelHandler等内容。下面的内容就是分析 NioEventLoop的构造器实现了。

2. NioEventLoop类层次结构

NioEventLoop 的类层次结构图还是比较复杂的, 不过我们只需要关注几个重要的点即可. 首先 NioEventLoop 的继承链如下:

NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor

在 AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即我们可以通过调用一个 NioEventLoop 实例的 schedule()方法来运行一些定时任务. 而在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 通过它, 我们可以调用一个 NioEventLoop 实例的 execute() 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.

通常来说, NioEventLoop 肩负着两种任务,: 
1)第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理等;

2)而第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的.

NioEventLoop 的实例化过程

对于NioEventLoop的实例化,基本就是在NioEventLoopGroup.newChild() 中调用的,下面先给出源码:

@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception {     return new NioEventLoop(this, executor, (SelectorProvider) args[0],         ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }

从上面函数里面 new NioEventLoop()出发分析实例化过程:

(1)最先调用NioEventLoop 里面的构造函数:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,                  SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {         super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);         if (selectorProvider == null) {             throw new NullPointerException("selectorProvider");         }         if (strategy == null) {             throw new NullPointerException("selectStrategy");         }         provider = selectorProvider;         final SelectorTuple selectorTuple = openSelector();         selector = selectorTuple.selector;         unwrappedSelector = selectorTuple.unwrappedSelector;         selectStrategy = strategy;     }

需要注意的是:构造器里面传入了 NioEventLoopGroup、Executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler。从这里可以看出,一个NioEventLoop属于某一个NioEventLoopGroup, 且处于同一个NioEventLoopGroup下的所有NioEventLoop 公用Executor、SelectorProvider、SelectStrategyFactory和RejectedExecutionHandler。

还有一点需要注意的是,这里的SelectorProvider构造参数传入的是通过在NioEventLoopGroup里面的构造器里面的 SelectorProvider.provider();方式获取的, 而这个方法返回的是一个单例SelectorProvider, 所以所有的NioEventLoop公用同一个单例SelectorProvider。

(2)核心的东西说完了,就是调用父类SingleThreadEventLoop的构造器:

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,                                     boolean addTaskWakesUp, int maxPendingTasks,                                     RejectedExecutionHandler rejectedExecutionHandler) {         //父类构造器         super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);         tailTasks = newTaskQueue(maxPendingTasks);     }

这里除了调用父类SingleThreadEventExecutor的构造器以外, 就是实例化了 tailTasks 这个变量; 
对于tailTasks在SingleThreadEventLoop属性的定义如下:

private final Queue<Runnable> tailTasks;// 尾部任务队列

队列的数量maxPendingTasks参数默认是SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASK,其实就是Integer.MAX_VALUE; 对于new的这个队列, 其实就是一个LinkedBlockingQueue 无界队列。

(3)再看调用的父类SingleThreadEventExecutor的构造器:

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,                                         boolean addTaskWakesUp, int maxPendingTasks,                                         RejectedExecutionHandler rejectedHandler) {         super(parent);// 设置EventLoop所属于的EventLoopGroup         this.addTaskWakesUp = addTaskWakesUp;         this.maxPendingTasks = Math.max(16, maxPendingTasks);//默认是Integer.MAX_VALUE         this.executor = ObjectUtil.checkNotNull(executor, "executor");         taskQueue = newTaskQueue(this.maxPendingTasks);//创建EventLoop的任务队列, 默认是 LinkedBlockingQueue         rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");     }

自此,NioEventLoop的实例化过程已经分析完毕。

前面已经分析完了EventLoopGroup和EventLoop,那么有一个问题,我们知道一个EventLoop实际上是对应于一个线程,那么这个EventLoop是什么时候启动的呢?

 

 

 

 

本文发表于2018年03月19日 22:38
(c)注:本文转载自https://my.oschina.net/LucasZhu/blog/1647376,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1954 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1