Netty之EventLoop自定义任务
简介
EventLoop可以执行用户自定义任务
结构
EventLoop内部有任务队列存放用户自定义任务
taskQueue
:自定义任务队列
EventLoopTaskQueueFactory
接口定义为
public interface EventLoopTaskQueueFactory
{Queue<Runnable> newTaskQueue(int maxCapacity);
}
创建
以NioEventLoop
为例,NioEventLoopGroup
中newChild
中根据args的参数个数来设置任务队列创建工厂,默认是null
protected EventLoop newChild(Executor executor, Object... args) throws Exception
{SelectorProvider selectorProvider = (SelectorProvider) args[0];SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];EventLoopTaskQueueFactory taskQueueFactory = null;EventLoopTaskQueueFactory tailTaskQueueFactory = null;int argsLength = args.length;if (argsLength > 3) {taskQueueFactory = (EventLoopTaskQueueFactory) args[3];}if (argsLength > 4) {tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];}return new NioEventLoop(this, executor, selectorProvider,selectStrategyFactory.newSelectStrategy(),rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);}
创建任务队列代码为,默认是多生产者单消费者队列
private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory)
{if (queueFactory == null) {return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);}return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}private static Queue<Runnable> newTaskQueue0(int maxPendingTasks)
{// This event loop never calls takeTask()return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue(): PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
运行
任务队列的运行时间与ioRatio
有关,其表示io运行时间比例,范围为[1,100]
- ioRatio小于100时,如果io运行时间为t,则任务的运行时间为 t ∗ ( 100 − i o R a t i o ) i o R a t i o \frac{t * (100 - ioRatio)}{ioRatio} ioRatiot∗(100−ioRatio)
- ioRatio等于100时,io运行完后,会执行队列中所有的任务
执行任务是通过调用runAllTasks
,先从scheduledTaskQueue
队列中取出任务添加到taskQueue
,每执行64个任务后,判断任务的执行时间是否超时
- 超时就停止从任务队列中取任务,退出循环
- 队列为空时,退出
任务执行完后,调用afterRunningAllTasks
protected boolean runAllTasks(long timeoutNanos)
{fetchFromScheduledTaskQueue();Runnable task = pollTask();if (task == null) {afterRunningAllTasks();return false;}final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;long runTasks = 0;long lastExecutionTime;for (;;) {safeExecute(task)runTasks ++;if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}afterRunningAllTasks();this.lastExecutionTime = lastExecutionTime;return true;
}protected void afterRunningAllTasks() { }