Netty

Table of Contents

1 architecture

netty-components.png

2 concepts

2.1 Channel

可以认为是一个connection,这个connection上面有socket fd可以所以进行读写。我们只能够控制是否可读,而不能够控制是否可写,因为事实上socket永远是可写的(只要write buffer有空余)。所以要注意,一旦如果在非IO线程发起写操作的话,那么在IO线程随时都会有writeComplete等事件发生,这点对于顺序的控制至关重要。

2.2 ChannelFactory

创建Channel并且对Channel进行管理包括事件检测以及回调处理。涉及到管理那么就需要有线程池,这个线程池需要外部进行指定。

2.3 ChannelHandler

在这个connection上面的处理逻辑对象。总的来说有下面两种类型的handler

  • ChannelUpstreamHandler 从net layer到app layer的数据流动处理逻辑
  • ChannelDownstreamHandler 从app layer到net layer的数据流动处理逻辑

然后在这两种类型的handler上面派生很多其他的handler,比如

  • SimpleChannelHandler 可以同时处理upstream和downstream
  • Encoder/Decoder 协议的编码和解码

2.4 ChannelHandlerContext

这个ChannelHandler对应的context. 实际上是这个context是包含了handler. context内部有prev,next字段能够将pipeline里面的context组成链表。

实际上pipeline是将context组织成为链表结构,如果需要senndUpStream或者是sendDownStream的话,得到对应的context,从中取出handler,然后决定是否需要处理。

2.5 ChannelPipeline

1 Channel + N ChannelHandler,连接和这个连接上所有的处理逻辑对象。

所有的Handler组成为了链表,不管是upstream还是downstream都会流过所有的handler.netty框架会判断这个handler是否需要处理某个事件。

2.6 ChannelPipelineFactory

创建ChannelPipeline,主要是说明需要在这个Channel上面绑定哪些ChannelHandler.

2.7 ChannelEvent

连接上所有触发的事件,可以是但是不仅限于下面几种类型

  • ChannelStateEvent 连接状态的变化事件
  • MessageEvent 消息可读(可以是ChannelBuffer读取字节流,也可以是已经成帧的消息)
  • WriteCompletionEvent 数据写完 #note: 这里并不是指数据已经完全写完,只是指部分数据写出。实验写出大数据量的时候这个事件被触发了多次。
  • ExceptionEvent 异常事件,可能是IO线程问题也可能是ChannelHandler处理异常

#note: 对于某一个Channel来说,它的事件发生检测以及回调处理都是一个IO线程里面完成的

2.8 ChannelBuffer

这个和Channel倒是没有太大关系,比较偏向buffer的管理,主要针对网络通信这种read/write交互场景进行优化。

2.9 ChannelGroup

线程安全的Channel集合,如果Channel关闭的话那么会自动从group里面移除,能够对Channel进行批量操作。

3 notes

3.1 boss/worker executor

NioServerSocketChannelFactory构造函数需要指定boss/worker两个executor,有必要解释一下这两个对象的含义。

首先了解一下netty的网络模型

  • boss创建accept fd之后阻塞调用accept. 一旦accept一个connection之后,将这个connection fd交给worker.
    • #note: 分摊到哪个worker上的算法应该是round robin
    • #note: 早期netty版本只是支持一个boss线程做accept,但是一个现成做accept限制了性能,所以在后续版本支持多个现成做accept
    • 允许绑定到多个端口,因为在boss线程里面也是使用epoll来做IO复用的。
  • worker将这个connection fd加入到自己的epoll/selector里面,检测可读可写事件的发生
  • boss/worker都是runnable对象,需要外部提供线程池来执行

worker的数量如果没有指定的话,with 2 * the number of available processors in the machine. The number of available processors is obtained by Runtime.availableProcessors() 也可以用这个构造函数可以指定 public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor, int workerCount)

所以如果传入的worker executor使用newFixedThreadPool并且线程数目比较小的话,那么就会出现这个问题 "netty的固定个数的worker线程阻塞大量的并发连接" http://www.oschina.net/question/241182_40955 现象就是如果你的CPU core为4,那么就会创建8个worker对象,而如果线程数目使用5,那么浏览器创建第6个连接之后没有办法正常都写,原因就是因为boss创建第6个连接的时候,给第6个worker处理,而这个worker没有线程池可以run起来。

这个问题解决办法也非常简单,要不就使用newCachedThreadPool(),要不就自己指定worker IO thread数目,但是 必须确保创建的线程数目>=2*CPU core.

3.2 accept connection exception

Exception in thread "pool-2-thread-1" java.lang.InternalError
        at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:755)
        at sun.misc.URLClassPath.getResource(URLClassPath.java:169)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:194)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
        at sun.misc.Launcher$ExtClassLoader.findClass(Launcher.java:229)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:295)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
        at java.util.ResourceBundle$RBClassLoader.loadClass(ResourceBundle.java:435)
        at java.util.ResourceBundle$Control.newBundle(ResourceBundle.java:2289)
        at java.util.ResourceBundle.loadBundle(ResourceBundle.java:1364)
        at java.util.ResourceBundle.findBundle(ResourceBundle.java:1328)
        at java.util.ResourceBundle.findBundle(ResourceBundle.java:1282)
        at java.util.ResourceBundle.getBundleImpl(ResourceBundle.java:1224)
        at java.util.ResourceBundle.getBundle(ResourceBundle.java:705)
        at java.util.logging.Level.getLocalizedName(Level.java:223)
        at java.util.logging.SimpleFormatter.format(SimpleFormatter.java:64)
        at java.util.logging.StreamHandler.publish(StreamHandler.java:177)
        at java.util.logging.ConsoleHandler.publish(ConsoleHandler.java:88)
        at java.util.logging.Logger.log(Logger.java:478)
        at java.util.logging.Logger.doLog(Logger.java:500)
        at java.util.logging.Logger.logp(Logger.java:700)
        at org.jboss.netty.logging.JdkLogger.warn(JdkLogger.java:80)
        at org.jboss.netty.logging.InternalLoggerFactory$1.warn(InternalLoggerFactory.java:128)
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:316)
        at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
        at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.util.zip.ZipException: error in opening zip file
        at java.util.zip.ZipFile.open(Native Method)
        at java.util.zip.ZipFile.<init>(ZipFile.java:127)
        at java.util.jar.JarFile.<init>(JarFile.java:135)
        at java.util.jar.JarFile.<init>(JarFile.java:72)
        at sun.misc.URLClassPath$JarLoader.getJarFile(URLClassPath.java:646)
        at sun.misc.URLClassPath$JarLoader.access$600(URLClassPath.java:540)
        at sun.misc.URLClassPath$JarLoader$1.run(URLClassPath.java:607)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.misc.URLClassPath$JarLoader.ensureOpen(URLClassPath.java:599)
        at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:753)
        ... 32 more

从这个excetion backtrace上看,似乎是log找不到出现的问题。

Bug 745866 – Possible netty logging config problem

这个同学在压力测试下面也出现了这个问题,主要原因还是netty没有办法accept connection. 这个可能还是和我们的资源配置有关,有可能是某个内核参数。

Got this error - Syahreza Pahlevi Ginting

这个同学的建议还是说和file-max/file limits有关。

#note: 之后我调整了file limits之后便没有遇到这个问题了,所以可能确实和文件句柄数目限制有关

3.3 control timeout

网络上并没有太多如何关于netty读写超时信息的控制。下面是一篇相对来说比较有启发性的回答:

但是这种方式在现实中意义却不大。分析ReadTimeoutHandler代码实现会发现, 我们没有办法将超时计算reset, 也没有办法将超时计算停止。而且一旦完成一次timeout计算之后,又会和触发下一轮的timeout计算。 对于WriteTimeoutHandler也是如此。

事实上我们是可以通过使用ReadTimeoutHandler/WriteTimeoutHandler来完成读写超时控制的,只不过不能够像在SO回答的那样写在PipelineFactory里面,而必须动态创建,而Pipeline和ChannelHandlerContext的设计为这种方法提供了可能。

以ReadTimeoutHandler为例

  • 在发起读之前,我们可以通过channel.setReadable(false)来关闭读取
  • 如果需要发起读的话,假设我们处理逻辑的ChannelHandlerContext为ctx
    • 首先在ctx之前创建一个ReadTimeoutHandler ctx.getPipeline().addBefore(ctx.getName(),"rto", new ReadTimeoutHandler(timer, 10));
    • 然后允许channel读数据 channel.setReadable(true)
  • 如果10s之内没有数据的话,那么会触发一个ReadTimeoutException, 这样我们可以做后续处理。
  • #note: 这个ReadTimeoutException是timer内部线程触发的,但是无须担心多线程问题,因为timer会将这个Exception事件交给IO线程来触发
private void fireReadTimedOut(final ChannelHandlerContext ctx) throws Exception {
            ctx.getPipeline().execute(new Runnable() {

                public void run() {
                    try {
                        readTimedOut(ctx);
                    } catch (Throwable t) {
                        fireExceptionCaught(ctx, t);
                    }
                }
            });
        }

@Override
    public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) {
        Channel ch = pipeline.getChannel();
        if (ch instanceof AbstractNioChannel<?>) {
            AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch;
            ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task);
            channel.worker.executeInIoThread(wrapper);
            return wrapper;
        }
        return super.execute(pipeline, task);
    }
  • 如果在10s内有数据被处理的话,那么就会调用messageReceived回调,在回调里面我们可以删除这个handler ctx.getPipeline().remove("rto"); 这样便不会触发ReadTimeoutException

对WriteTimeoutHandler同理,因为我们不能够setWritable,所以必须在write之前就安装好handler

  • ctx.getPipeline().addBefore(ctx.getName(),"wto",new WriteTimeoutHandler(timer,10));
  • ctx.getChannel.write()
  • 如果在10s内没有写完的话,那么就会触发一个WriteTimeoutException
  • 如果在10s内写完的话,那么就会触发writeComplete回调,在回调里面我们可以删除这个handler ctx.getPipeline().remove("wto");
  • #note: writeComplete只要部分数据写成功的时候就会触发,所以一次write可能会触发多次writeComplete事件,所以这里remove需要注意只能够remove一次

#note: 其实writeTimeout这个事件其实大部分时候是不需要的:对于server而言write之后就不care了,而对于client而言write之后直接使用read来触发readTimeout更加合适。

4 all-about-code

4.1 HashedWheelTimer

定时器的实现,相关其接口如下

public interface Timer {
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); // 发起定时任务
    Set<Timeout> stop(); // 返回所有因为stop取消的定时任务
}

public interface TimerTask {
    void run(Timeout timeout) throws Exception; // 超时触发或者取消
}

public interface Timeout {
    Timer getTimer();
    TimerTask getTask();
    boolean isExpired(); // 是否超时
    boolean isCancelled(); // 是否取消
    void cancel(); // 发起取消操作
}

数据结构如下: netty-hashed-wheel-timer.png

  • 整个数据结构是一个ring
  • wheelSize是ring大小
  • wheelCursor是当前在ring上的index
  • 每个unit分配的时间单元称为tickDuration
  • 整个ring分配的时间单元成为roundDuration = tickDuration * wheelSize
  • 每个unit上对应一个Set<HashedWheelTimeout>结构,表示在这个unit上面需要检查超时的Timeout
public class HashedWheelTimer implements Timer {
    private static final AtomicInteger id = new AtomicInteger(); // 用来为实例分配编号
    private static final SharedResourceMisuseDetector misuseDetector =
        new SharedResourceMisuseDetector(HashedWheelTimer.class); // 用来限制创建实例

    private final Worker worker = new Worker(); // 后台线程
    final Thread workerThread;
    final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down

    private final long roundDuration;
    final long tickDuration;
    final Set<HashedWheelTimeout>[] wheel;
    final ReusableIterator<HashedWheelTimeout>[] iterators; // wheel里面Set对应的iterator.
    final int mask; // wheelSize = (1 << n). mask = (1 << n)-1 这样好做%操作
    final ReadWriteLock lock = new ReentrantReadWriteLock(); // 涉及到多线程安全问题
    volatile int wheelCursor;
}

整个逻辑大致是这样的:

  • 每次请求的Timeout会根据delay,当前时间,转换成为 a)round(需要检查多少轮) b)据当前wheelCursor的偏移offset(放置在ring什么位置上)
  • #note: 也就是将delay这个时间概念,转换成为两个状态变量。timer内部通过判断这两个状态变量来判断超时与否
  • 后台线程每隔tickDuration会检查下一个wheelCursor上的Timeout请求,判断那些存在超时,如果超时的话那么触发TimerTask这个操作。

初始化

public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel) {
    // Normalize ticksPerWheel to power of two and initialize the wheel.
    wheel = createWheel(ticksPerWheel); // 创建wheel
    iterators = createIterators(wheel); // 创建iterators
    mask = wheel.length - 1;

    // Convert tickDuration to milliseconds.
    this.tickDuration = tickDuration = unit.toMillis(tickDuration);
    roundDuration = tickDuration * wheel.length;

    workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
                    worker, "Hashed wheel timer #" + id.incrementAndGet())); // 构造线程,但是注意没有启动

    // Misuse check
    misuseDetector.increase(); // 检测创建实例数量
}

private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
    Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
    for (int i = 0; i < wheel.length; i ++) {
        wheel[i] = new MapBackedSet<HashedWheelTimeout>(
                new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
    }
    return wheel;
}

private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
    ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
    for (int i = 0; i < wheel.length; i ++) {
        iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
    }
    return iterators;
}

private static int normalizeTicksPerWheel(int ticksPerWheel) {
    int normalizedTicksPerWheel = 1;
    while (normalizedTicksPerWheel < ticksPerWheel) {
        normalizedTicksPerWheel <<= 1;
    }
    return normalizedTicksPerWheel;
}

提交Timeout

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    final long currentTime = System.currentTimeMillis();
    start(); // 启动工作线程

    delay = unit.toMillis(delay);
    HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay); // 构造Timeout对象,比较trival.
    scheduleTimeout(timeout, delay); // 将Timeout对象放置到wheel内部
    return timeout;
}

public void start() {
    switch (workerState.get()) {
    case 0:
        if (workerState.compareAndSet(0, 1)) { // 确保只是启动一次
            workerThread.start();
        }
        break;
    case 1:
        break;
    case 2:
        throw new IllegalStateException("cannot be started once stopped");
    default:
        throw new Error();
    }
}

void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
    // delay must be equal to or greater than tickDuration so that the
    // worker thread never misses the timeout.
    if (delay < tickDuration) { // 如果delay时间过短的话那么修正到tickDuration.
        delay = tickDuration;
    }

    // Prepare the required parameters to schedule the timeout object.
    final long lastRoundDelay = delay % roundDuration;
    final long lastTickDelay = delay % tickDuration;
    final long relativeIndex =
        lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0); // 计算相距当前的wheelCursor偏移

    final long remainingRounds =
        delay / roundDuration - (delay % roundDuration == 0? 1 : 0); // 计算需要轮转多少次才会触发超时

    // Add the timeout to the wheel.
    lock.readLock().lock();
    try {
        int stopIndex = (int) (wheelCursor + relativeIndex & mask);
        timeout.stopIndex = stopIndex;
        timeout.remainingRounds = remainingRounds;

        wheel[stopIndex].add(timeout);
    } finally {
        lock.readLock().unlock();
    }
}

后台线程 (#note: 注意里面的deadline并不是每次调用currentTimeMillis, 而是在startTime上面不断叠加的,然后在sleep过程中进行修正。可能会出现略微时间偏差)

@org.jboss.netty.util.HashedWheelTimer.Worker

public void run() {
    List<HashedWheelTimeout> expiredTimeouts =
        new ArrayList<HashedWheelTimeout>();

    startTime = System.currentTimeMillis();
    tick = 1; // 初始tick = 1

    while (workerState.get() == 1) { // 当前处于工作状态
        final long deadline = waitForNextTick(); // 等待到下一个tick
        if (deadline > 0) { // 判断返回值,如果>0表示deadline, 否则认为无效
            fetchExpiredTimeouts(expiredTimeouts, deadline); // 判断那些Timeout需要触发,保存到expiredTimouts
            notifyExpiredTimeouts(expiredTimeouts); // 触发expiredTimeouts里面的Timeout
        }
    }
}

private long waitForNextTick() { // 这个过程非常好理解,就是等待一段时间
    long deadline = startTime + tickDuration * tick;

    for (;;) {
        final long currentTime = System.currentTimeMillis();
        long sleepTime = tickDuration * tick - (currentTime - startTime);

        // Check if we run on windows, as if thats the case we will need
        // to round the sleepTime as workaround for a bug that only affect
        // the JVM if it runs on windows.
        //
        // See https://github.com/netty/netty/issues/356
        if (DetectionUtil.isWindows()) {
            sleepTime = sleepTime / 10 * 10;
        }

        if (sleepTime <= 0) {
            break;
        }
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            if (workerState.get() != 1) { // 如果不是工作状态就返回-1
                return -1;
            }
        }
    }

    // Increase the tick.
    tick ++;
    return deadline;
}

 private void fetchExpiredTimeouts(
        List<HashedWheelTimeout> expiredTimeouts, long deadline) {

    // Find the expired timeouts and decrease the round counter
    // if necessary.  Note that we don't send the notification
    // immediately to make sure the listeners are called without
    // an exclusive lock.
    lock.writeLock().lock();
    try {
        int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
        ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
        fetchExpiredTimeouts(expiredTimeouts, i, deadline); // 检查当前unit下面的iterators是否存在超时
    } finally {
        lock.writeLock().unlock();
    }
}

private void fetchExpiredTimeouts(
        List<HashedWheelTimeout> expiredTimeouts,
        ReusableIterator<HashedWheelTimeout> i, long deadline) {

    List<HashedWheelTimeout> slipped = null;
    i.rewind();
    while (i.hasNext()) {
        HashedWheelTimeout timeout = i.next();
        if (timeout.remainingRounds <= 0) { /
            i.remove();
            if (timeout.deadline <= deadline) { // 判断超时之后需要检查deadline.
                expiredTimeouts.add(timeout);
            } else {
                // Handle the case where the timeout is put into a wrong
                // place, usually one tick earlier.  For now, just add
                // it to a temporary list - we will reschedule it in a
                // separate loop.
                if (slipped == null) { // 有可能存在一些计时偏差情况,单独处理这种情况
                    slipped = new ArrayList<HashedWheelTimeout>();
                }
                slipped.add(timeout);
            }
        } else {
            timeout.remainingRounds --;
        }
    }

    // Reschedule the slipped timeouts.
    if (slipped != null) { // 将存在偏差的Timeout重新设置timeout.
        for (HashedWheelTimeout timeout: slipped) {
            scheduleTimeout(timeout, timeout.deadline - deadline);
        }
    }
}

private void notifyExpiredTimeouts(
        List<HashedWheelTimeout> expiredTimeouts) { // 触发超时事件
    // Notify the expired timeouts.
    for (int i = expiredTimeouts.size() - 1; i >= 0; i --) {
        expiredTimeouts.get(i).expire();
    }

    // Clean up the temporary list.
    expiredTimeouts.clear();
}
comments powered by Disqus