[TOC]

netty中的reactor线程

reactor线程的启动

  1. NioEventLoop的run方法是reactor线程的主体,在第一次添加任务的时候被启动
  2. NioEventLoop 父类 SingleThreadEventExecutor 的execute方法

      public void execute(Runnable task) {
         if (task == null) {
             throw new NullPointerException("task");
         }
    
         boolean inEventLoop = inEventLoop();
         if (inEventLoop) {
             addTask(task);
         } else {
             startThread();
             addTask(task);
             if (isShutdown() && removeTask(task)) {
                 reject();
             }
         }
    
         if (!addTaskWakesUp && wakesUpForTask(task)) {
             wakeup(inEventLoop);
         }
     }
    

    外部线程在往任务队列里面添加任务的时候执行 startThread() ,netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务

    private void startThread() {
         if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                 schedule(new ScheduledFutureTask<Void>(
                         this, Executors.<Void>callable(new PurgeTask(), null),
                         ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
                 thread.start();
             }
         }
     }
    

reactor 线程的执行

  1. NioEventLoop的run方法。
 protected void run() {
        for (;;) {
            boolean oldWakenUp = wakenUp.getAndSet(false);
            try {
                if (hasTasks()) {
                    selectNow();
                } else {
                    select(oldWakenUp);

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();

                    processSelectedKeys();

                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }
  1. reactor线程大概分为三个步骤
    • 首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件
           boolean oldWakenUp = wakenUp.getAndSet(false);
        select(oldWakenUp);
        if (wakenUp.get()) {
            selector.wakeup();
        }
      
    • 处理产生网络IO事件的channel
      processSelectedKeys();
      
    • 处理任务队列
      runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
      

results matching ""

    No results matching ""