1.NioEventLoopGroup实例化过程
下面来分析对NioEventLoopGroup类进行实例化的过程中发生了什么。
NioEventLoopGroup 类层次结构
先给出类图:
我们查看上面接口的结构图:
ExecutorService模拟Java线程池接口,ScheduledExecutorService模拟定时器线程池接口。
对于NioEventLoopGroup核心的类继承关系就是:
NioEventLoopGroup –》MultithreadEventLoopGroup –》MultithreadEventExecutorGroup
面从这三个类出发分析NioEventLoopGroup实例化过程。
首先盗用一下网上的一张示意图
下面大致解释一下这个实例化过程做了什么
//对于不指定线程数参数的构造器,默认设置0 (但是在后面的构造器中会判断,如果设置为0 就会初始化为2*CPU) public NioEventLoopGroup() { this(0); } /** * Create a new instance using the specified number of threads, {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ //然后调用:这里设置了NioEventLoopGroup线程池中每个线程执行器默认是null(这里设置为null,在后面的构造器中会判断,如果为null就实例化一个线程执行器) public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } //再调用:这里就存在于JDK的NIO的交互了,这里设置了线程池的SelectorProvider, 通过SelectorProvider.provider() 返回。 public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); } //然后调用:在这个重载的构造器中又传入了默认的选择策略工厂DefaultSelectStrategyFactory; public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } //这里就是调用父类MultithreadEventLoopGroup的构造器了, 这里还添加了线程的拒绝执行策略。 public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
在MultithreadEventLoopGroup构造器调用:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
接下来就是调用的基类MultithreadEventExecutorGroup的构造器:
//这个构造器里面多传入了一个参数 DefaultEventExecutorChooserFactory.INSTANCE , // 通过这个EventLoop选择器工厂可以实例化GenericEventExecutorChooser或者PowerOfTwoEventExecutorChooser //这个类, 这个类是EventLoopGroup线程池里面的EventLoop的选择器, //调用GenericEventExecutorChooser.next() 方法可以从线程池中选择出一个合适的EventLoop线程。 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
然后就是重载调用MultithreadEventExecutorGroup类的构造器:
构造器调用到了这里其实也就是不断向上传递调用的终点了,由于构造器代码比较长,我就删除一些校验和不重要的代码,只保留核心代码:
/** * 最终的创建实例构造器 * * @param nThreads 该实例将使用的线程数 * @param executor 将要使用的executor, 默认为null * @param chooserFactory 将要使用的EventExecutorChooserFactory * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { /** 1.初始化线程池 */ //参数校验nThread合法性, if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } //executor校验非空, 如果为空就创建ThreadPerTaskExecutor, 该类实现了 Executor接口 //这个executor 是用来执行线程池中的所有的线程,也就是所有的NioEventLoop,其实从 //NioEventLoop构造器中也可以知道,NioEventLoop构造器中都传入了executor这个参数。 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //这里的children数组, 其实就是线程池的核心实现,线程池中就是通过指定的线程数组来实现线程池; //数组中每个元素其实就是一个EventLoop,EventLoop是EventExecutor的子接口。 children = new EventExecutor[nThreads]; //for循环实例化children数组,NioEventLoop对象 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //newChild(executor, args) 函数在NioEventLoopGroup类中实现了, // 实质就是就是存入了一个 NIOEventLoop类实例 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { //如果构造失败, 就清理资源 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 2.实例化线程工厂执行器选择器: 根据children获取选择器 chooser = chooserFactory.newChooser(children); // 3.为每个EventLoop线程添加 线程终止监听器 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } // 4. 将children 添加到对应的set集合中去重, 表示只可读。 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children);//(EventLoop是EventExecutor的子接口) readonlyChildren = Collections.unmodifiableSet(childrenSet); }
总结一下上面的初始化步骤(3)中的一些重点:
1)NIOEventLoopGroup的线程池实现其实就是一个NIOEventLoop数组,一个NIOEventLoop可以理解成就是一个线程。
2)所有的NIOEventLoop线程是使用相同的 executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler以及是属于某一个NIOEventLoopGroup的。 这一点从 newChild(executor, args); 方法就可以看出:newChild()的实现是在NIOEventLoopGroup中实现的。
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
3)当有IO事件来时,需要从线程池中选择一个线程出来执行,这时候的NioEventLoop选择策略是由GenericEventExecutorChooser实现的, 并调用该类的next() 方法获取到下一个 NioEventLoop.
到了这里线程池的初始化就已经结束了, 基本这部分就只涉及 netty线程池的内容,不涉及到channel 与 channelPipeline和ChannelHandler等内容。下面的内容就是分析 NioEventLoop的构造器实现了。
2. NioEventLoop类层次结构
NioEventLoop 的类层次结构图还是比较复杂的, 不过我们只需要关注几个重要的点即可. 首先 NioEventLoop 的继承链如下:
NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor
在 AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即我们可以通过调用一个 NioEventLoop 实例的 schedule()方法来运行一些定时任务. 而在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 通过它, 我们可以调用一个 NioEventLoop 实例的 execute() 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.
通常来说, NioEventLoop 肩负着两种任务,:
1)第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理等;
2)而第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的.
NioEventLoop 的实例化过程
对于NioEventLoop的实例化,基本就是在NioEventLoopGroup.newChild() 中调用的,下面先给出源码:
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
从上面函数里面 new NioEventLoop()出发分析实例化过程:
(1)最先调用NioEventLoop 里面的构造函数:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
需要注意的是:构造器里面传入了 NioEventLoopGroup、Executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler。从这里可以看出,一个NioEventLoop属于某一个NioEventLoopGroup, 且处于同一个NioEventLoopGroup下的所有NioEventLoop 公用Executor、SelectorProvider、SelectStrategyFactory和RejectedExecutionHandler。
还有一点需要注意的是,这里的SelectorProvider构造参数传入的是通过在NioEventLoopGroup里面的构造器里面的 SelectorProvider.provider();方式获取的, 而这个方法返回的是一个单例SelectorProvider, 所以所有的NioEventLoop公用同一个单例SelectorProvider。
(2)核心的东西说完了,就是调用父类SingleThreadEventLoop的构造器:
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { //父类构造器 super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); tailTasks = newTaskQueue(maxPendingTasks); }
这里除了调用父类SingleThreadEventExecutor的构造器以外, 就是实例化了 tailTasks 这个变量;
对于tailTasks在SingleThreadEventLoop属性的定义如下:
private final Queue<Runnable> tailTasks;// 尾部任务队列
队列的数量maxPendingTasks参数默认是SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASK,其实就是Integer.MAX_VALUE; 对于new的这个队列, 其实就是一个LinkedBlockingQueue 无界队列。
(3)再看调用的父类SingleThreadEventExecutor的构造器:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent);// 设置EventLoop所属于的EventLoopGroup this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks);//默认是Integer.MAX_VALUE this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(this.maxPendingTasks);//创建EventLoop的任务队列, 默认是 LinkedBlockingQueue rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
自此,NioEventLoop的实例化过程已经分析完毕。
前面已经分析完了EventLoopGroup和EventLoop,那么有一个问题,我们知道一个EventLoop实际上是对应于一个线程,那么这个EventLoop是什么时候启动的呢?