NioEventLoop创建
我们在启动服务端时,会创建两个Group,一个bossGroup,一个workerGroup,两个对应的都是NioEventLoopGroup,一个传的参数1,一个默认0,我们进入NioEventLoopGroup
负责创建NioEventLoopGroup底层的线程
负责创建NioEventLoopGroup维护的selector
进入父类,这个在不传构造参数时,默认穿进来与一个0,如果是0,给一个DEFAULT_EVENT_LOOP_THREADS,默认是2倍的cpu核数,在进入父类
以下就是如何创建NioEventLoopGroup,首先创建线程创建器,然后通过for循环创建newChild,最后创建线程选择器,接下来仔细分析这三个过程。
ThreadPerTaskExecutor
1.每次执行任务时都会创建一个线程实体
1 | protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { |
ThreadPerTaskExecutor
每次把执行任务放入一个线程
2.NioEventLoop线程命名规则nioEventLoop-1-xx
我们进入newDefaultThreadFactory
通过getClass获取到当前的一个类,这个类其实就是NioEventLoop,在进去
这里的poolType就是我们的NioEventLoop,这个toPoolName就是把NioEventLoop转换成线程池的一个名字以下是具体方法
首先先获取类名,接着如果首字母是大写且第一个字母是小写的话,就会把这个name的第一个转换成小写,然后把后面的字符拼接,最后返回,我们回到上图,进入DefaultThreadFactory方法
首先前面做了参数校验,最后prefix前缀=小写的NioEventLoop+‘—‘+poolId(自增)+’—‘,我们的线程执行器ThreadPerTaskExecutor的execute的newThread方法和DefaultThreadFactory的newThread是对应的
这里把Runnable传进来,然后名字就是prefix前缀+nextId(自增),就是小写的NioEventLoop+‘—‘+poolId(自增)+’—‘+nextId(自增)。我们就如newThread方法
这里new了一个FastThreadLocalThread,这里Netty最底层的Thread不是原生的线程,而是封装的一个Thread,FastThreadLocalThread也是继承jdk底层的Thread,它的作用就是把ThreadLocal
newChild()
我们进入newChild方法,进入NioEventLoop的newChild方法
在进入NioEventLoop的构造函数
这里会调用父类的构造函数,下面调用openSelector,这里可以看到一个selector和一个NioEventLoop绑定,openSelector也是创建一个openSelector,然后我们跟一下父类
继续进入父类
1.保存线程执行器ThreadPerTaskExcetor
这里就把线程执行器保存起来,taskQueue是用在Web线程在执行Netty任务的时候,如果判断不是NioEventLoop对应的线程里,而直接塞到任务的一个队列里,然后又NioEventLoop一个线程去执行,我们进入newTaskQueue,是在NioEventLoop里的newTaskQueue
2.创建一个MpsQueue
newMpsQueue的方式来创建queue来保存一个异步的队列,创建一个selector去轮询上面的连接
3.创建一个selector
newChooser()
创建线程选择器,newChooser对应的是MutilthreadEventExecutorGroup的next方法,通过chooser.next来返回一个NioEventLoop,实现chooser的原理如下。
第一个连接进来的时候,选择第一个NioEvcentLoop进行绑定,依此类推,当到N+1时,再循环开始,从0开始绑定。Netty将这样的过程做了优化
我们进入MutilthreadEventExecutorGroup的newChooser方法,
判断是否是2的幂,如果是2的幂就创建PowerOfTowEventExecutorChooser否则就创建GenericEventExecutorChooser
PowerOfTowEventExecutorChooser
index每次自增后直接和NioEventLoop的长度-1相与,这样可以达到循环取数组下标的目的,这里和HashMap的2次幂类似。
具体过程如下:
1 | idx 111010 |
GenericEventExecutorChooser
对index自增和NioEventLoop的长度取模,然后总体取绝对值,这样可以达到从0开始到最后,再循环的目的
NioEventLoop启动
服务端启动绑定端口
进入AbstractBootstrap里的doBind0方法,调用channel的eventLoop,eventLoop实际上是Netty启动过程中通过register方法绑定上去的,然后通过调用NioEventLoop的execute方法。进入SingleThreadEventExecuted的execute方法
这个inEventLoop是判断当前线程是否是NioEventLoop的线程,进入inEventLoop方法
然后再进去
这里就是判断线程是否相等,这里还没有创建,不等,返回false,然后再进入execute方法
这里inEventLoop返回false,所以进入else流程,启动一个线程startThread
首先判断当前线程是否是未启动的,如果未启动,则通过一个compareAndSett方法启动。
execute创建一个,这个Runnable的run方法就是创建一个NioEventLoop底层的Thread,首先把当前线程保存,接下来调用NioEventLoop的run方法,总结就是,主线程再启动过程中,先调用bind方法,最终会把实际绑定的一个流程绑定task,然后调用服务端Channel的方法去具体执行,ThreadPerTaskExcetor来创建一个线程,对应的就是NioEventLoop创建的一个线程,创建这个线程的具体逻辑就是首先NioEventLoop会把当前创建成的一个线程进行保存,最后调用run方法,进行NioEventLoop的启动
新连接接入
NioEventLoop执行逻辑
进入到NioEventLoop的run方法
select方法就是轮询当前事件,由于一个NioEventLoop对应一个selector,这个select就是轮询注册到selector上的事件,ioRatio是控制processSelectedKeys和runAllTasks函数的执行时间,processSelectedKeys主要处理io相关的逻辑,runAllTasks是处理web线程到taskQueue的任务,ioRatio默认情况下是50,就是处理io事件和运行任务是1:1。
select方法执行逻辑
1.deadline以及任务穿插逻辑处理
上面的wakenUp是为了标识当前select操作是否是唤醒状态,当每次进行select操作时候,都会把wakenUp设置false,未唤醒状态,我们进入select方法.用nanoTime计算下当前时间,当前时间加上一个定时任务队列:定时任务队列是按照截止时间顺序排列的队列
2.阻塞式select
如果上面截止时间未到,并且当前任务序列为空的话,就进行一个阻塞式的select操作,timeoutMillis就是每次阻塞最大的时间,默认情况下是1,
3.避免jdk空轮询的BUG
每次进行到这个地方,就进行了一次阻塞式select操作,这里会记一下当前时间,如果selectCnt空轮询次数大于SELECTOR_AUTO_REBUILD_THRESHOLD(512),接下来Netty会调用rebuildSelector方法,来避免下一次空轮询发生,进入rebuildSelector方法
通过openSelector重新创建一个selector,然后将旧的key取消,创建到新的selector上去,并且和事件以及Netty封装的Channel绑定上去,
processSelectedKey执行逻辑
1.selected keySet优化
在创建NioEventLoop时会调用一个openSelector方法去创建底层的一一对应的selector,也就是io事件的运行器,
openSelector首先会调用jdk的一个api,provider去创建一个selector,接下来判断
我们进入SelectedSelectionKeySet
SelectedSelectionKeySet继承AbstractSet
Netty对selected keySet优化就是用数组替换selector,hashSet做到add方法时间复杂度O(1),
2.processSelectedKeysOptimized
进入NioEventLoop的run方法,进去processSelectedKeys方法
这里的selectedKeys就是经过优化的selectedKeys,flip方法把数组返回过去
返回一个数组,接下来进入processSelectedKeysOptimized
每次通过for循环遍历这个SelectionKey[]数组,拿到selectedKeys ,拿到后把数组里的引用设置null,接下来拿到key的attachment,就是一个channel,然后调用processSelectedKey方法
首先拿到channel的unsafe,这个unsafe是和channel绑定的一个对象,
Netty默认情况下会通过反射将Select底层的一个HashSet转化成一个数组方式进行优化,然后在处理每一个key的时候,都会拿到对应的一个attachment,这个attachment就是再向select注册NioEventChannel事件的时候绑定的经过Netty封装后的channel。
runAllTask执行逻辑
我们进入SingleThreadEventExecutor的execute方法,addTask方法
进入offerTask方法
向taskQueue添加task,这是普通任务的创建和任务的添加,除了这普通任务队列,还有定时任务队列
我们进入AbstractScheduledEventExecutor的schedule方法
就是把你的任务包装成ScheduledFutureTask,然后调用schedule方法,我们进入schedule方法
看这个schedule方法是inEventLoop发起的schedule,还是在外部线程发起的schedule,如果是外部线程发起的schedule,就把添加操作变成线程安全的操作,如果是在inEventLoop发起的那么直接进行添加,为什么这样做,因为scheduledTaskQueue是个普通的PriorityQueue,
task的分类和添加
任务的聚合
三个问题
默认情况下,Netty服务端起多少线程?何时启动?
不传参数时,默认两倍CPU核数的线程,在调用execute方法的时候判断是否在本线程,如果在本线程说明线程已启动,如果是在外部线程调用execute方法,首先调用startThread方法,这个方法首先会判断当前线程是否有启动,如果没有启动,则启动这个线程。
Netty是如何解决jdk空轮询BUG的?
通过计数的方式去判断,如果当前阻塞的select操作,实际上并没有花这么长时间,那么旧可能触发空轮询,默认条件下,这种情况达到512次,就重建一个select,把之前select所有的key重新移交到新的select的key上,通过这种方式来巧妙的避免了jdk空轮询BUG。
Netty如何保证异步串行无锁化
在所有Web线程去调用EventLoop或Channel方法时,通过inEventLoop方法来判断得出是否是外部线程,在这种情况下就把所有外部操作封装成task,塞到MpsQueue中,然后再NioEventLoop执行逻辑的第三个过程,这一task会被挨个执行。