Table of Contents
- Netty: Home (Main.WebHome)
- The Netty Project 3.x User Guide
- All documentation pages
1. architecture
2. concepts
2.1. Channel
可以认为是一个connection,这个connection上面有socket fd可以所以进行读写。我们只能够控制是否可读,而不能够控制是否可写,因为事实上socket永远是可写的(只要write buffer有空余)。所以要注意,一旦如果在非IO线程发起写操作的话,那么在IO线程随时都会有writeComplete等事件发生,这点对于顺序的控制至关重要。
2.2. ChannelFactory
2.3. ChannelHandler
- ChannelUpstreamHandler 从net layer到app layer的数据流动处理逻辑
- ChannelDownstreamHandler 从app layer到net layer的数据流动处理逻辑
- SimpleChannelHandler 可以同时处理upstream和downstream
- Encoder/Decoder 协议的编码和解码
2.4. ChannelHandlerContext
这个ChannelHandler对应的context. 实际上是这个context是包含了handler. context内部有prev,next字段能够将pipeline里面的context组成链表。
2.5. ChannelPipeline
1 Channel + N ChannelHandler,连接和这个连接上所有的处理逻辑对象。
2.6. ChannelPipelineFactory
2.7. ChannelEvent
- ChannelStateEvent 连接状态的变化事件
- MessageEvent 消息可读(可以是ChannelBuffer读取字节流,也可以是已经成帧的消息)
- WriteCompletionEvent
数据写完这里并不是指数据已经完全写完,只是指部分数据写出。实验写出大数据量的时候这个事件被触发了多次。 - ExceptionEvent 异常事件,可能是IO线程问题也可能是ChannelHandler处理异常
2.8. ChannelBuffer
2.9. ChannelGroup
3. notes
3.1. boss/worker executor
- boss创建accept fd之后阻塞调用accept. 一旦accept一个connection之后,将这个connection fd交给worker.
- 分摊到哪个worker上的算法应该是round robin
- 早期netty版本只是支持一个boss线程做accept,但是一个现成做accept限制了性能,所以在后续版本支持多个现成做accept
- 允许绑定到多个端口,因为在boss线程里面也是使用epoll来做IO复用的。
- worker将这个connection fd加入到自己的epoll/selector里面,检测可读可写事件的发生
- boss/worker都是runnable对象,需要外部提供线程池来执行
worker的数量如果没有指定的话,with 2 * the number of available processors in the machine. The number of available processors is obtained by Runtime.availableProcessors() 也可以用这个构造函数可以指定 public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor, int workerCount)
所以如果传入的worker executor使用newFixedThreadPool并且线程数目比较小的话,那么就会出现这个问题 "netty的固定个数的worker线程阻塞大量的并发连接" 现象就是如果你的CPU core为4,那么就会创建8个worker对象,而如果线程数目使用5,那么浏览器创建第6个连接之后没有办法正常都写,原因就是因为boss创建第6个连接的时候,给第6个worker处理,而这个worker没有线程池可以run起来。
这个问题解决办法也非常简单,要不就使用newCachedThreadPool(),要不就自己指定worker IO thread数目,但是 必须确保创建的线程数目>=2*CPU core.
3.2. accept connection exception
Exception in thread "pool-2-thread-1" java.lang.InternalError at sun.misc.URLClassPath$JarLoader.getResource( at sun.misc.URLClassPath.getResource( at$ at Method) at at sun.misc.Launcher$ExtClassLoader.findClass( at java.lang.ClassLoader.loadClass( at java.lang.ClassLoader.loadClass( at sun.misc.Launcher$AppClassLoader.loadClass( at java.lang.ClassLoader.loadClass( at java.util.ResourceBundle$RBClassLoader.loadClass( at java.util.ResourceBundle$Control.newBundle( at java.util.ResourceBundle.loadBundle( at java.util.ResourceBundle.findBundle( at java.util.ResourceBundle.findBundle( at java.util.ResourceBundle.getBundleImpl( at java.util.ResourceBundle.getBundle( at java.util.logging.Level.getLocalizedName( at java.util.logging.SimpleFormatter.format( at java.util.logging.StreamHandler.publish( at java.util.logging.ConsoleHandler.publish( at java.util.logging.Logger.log( at java.util.logging.Logger.doLog( at java.util.logging.Logger.logp( at org.jboss.netty.logging.JdkLogger.warn( at org.jboss.netty.logging.InternalLoggerFactory$1.warn( at at at at org.jboss.netty.util.internal.DeadLockProofWorker$ at java.util.concurrent.ThreadPoolExecutor$Worker.runTask( at java.util.concurrent.ThreadPoolExecutor$ at Caused by: error in opening zip file at Method) at<init>( at java.util.jar.JarFile.<init>( at java.util.jar.JarFile.<init>( at sun.misc.URLClassPath$JarLoader.getJarFile( at sun.misc.URLClassPath$JarLoader.access$600( at sun.misc.URLClassPath$JarLoader$ at Method) at sun.misc.URLClassPath$JarLoader.ensureOpen( at sun.misc.URLClassPath$JarLoader.getResource( ... 32 more
从这个excetion backtrace上看,似乎是log找不到出现的问题。
Bug 745866 – Possible netty logging config problem
这个同学在压力测试下面也出现了这个问题,主要原因还是netty没有办法accept connection. 这个可能还是和我们的资源配置有关,有可能是某个内核参数。
Got this error - Syahreza Pahlevi Ginting
这个同学的建议还是说和file-max/file limits有关。
之后我调整了file limits之后便没有遇到这个问题了,所以可能确实和文件句柄数目限制有关
3.3. control timeout
- Setting socket timeout on netty channel - Stack Overflow :
但是这种方式在现实中意义却不大。分析ReadTimeoutHandler代码实现会发现, 我们没有办法将超时计算reset, 也没有办法将超时计算停止。而且一旦完成一次timeout计算之后,又会和触发下一轮的timeout计算。 对于WriteTimeoutHandler也是如此。
- 在发起读之前,我们可以通过channel.setReadable(false)来关闭读取
- 如果需要发起读的话,假设我们处理逻辑的ChannelHandlerContext为ctx
- 首先在ctx之前创建一个ReadTimeoutHandler ctx.getPipeline().addBefore(ctx.getName(),"rto", new ReadTimeoutHandler(timer, 10));
- 然后允许channel读数据 channel.setReadable(true)
- 如果10s之内没有数据的话,那么会触发一个ReadTimeoutException, 这样我们可以做后续处理。
- 这个ReadTimeoutException是timer内部线程触发的,但是无须担心多线程问题,因为timer会将这个Exception事件交给IO线程来触发
private void fireReadTimedOut(final ChannelHandlerContext ctx) throws Exception { ctx.getPipeline().execute(new Runnable() { public void run() { try { readTimedOut(ctx); } catch (Throwable t) { fireExceptionCaught(ctx, t); } } }); } @Override public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { Channel ch = pipeline.getChannel(); if (ch instanceof AbstractNioChannel<?>) { AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch; ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); channel.worker.executeInIoThread(wrapper); return wrapper; } return super.execute(pipeline, task); }
- 如果在10s内有数据被处理的话,那么就会调用messageReceived回调,在回调里面我们可以删除这个handler ctx.getPipeline().remove("rto"); 这样便不会触发ReadTimeoutException
- ctx.getPipeline().addBefore(ctx.getName(),"wto",new WriteTimeoutHandler(timer,10));
- ctx.getChannel.write()
- 如果在10s内没有写完的话,那么就会触发一个WriteTimeoutException
- 如果在10s内写完的话,那么就会触发writeComplete回调,在回调里面我们可以删除这个handler ctx.getPipeline().remove("wto");
- writeComplete只要部分数据写成功的时候就会触发,所以一次write可能会触发多次writeComplete事件,所以这里remove需要注意只能够remove一次
3.4. HashedWheelTimer
public interface Timer { Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); // 发起定时任务 Set<Timeout> stop(); // 返回所有因为stop取消的定时任务 } public interface TimerTask { void run(Timeout timeout) throws Exception; // 超时触发或者取消 } public interface Timeout { Timer getTimer(); TimerTask getTask(); boolean isExpired(); // 是否超时 boolean isCancelled(); // 是否取消 void cancel(); // 发起取消操作 }
- 整个数据结构是一个ring
- wheelSize是ring大小
- wheelCursor是当前在ring上的index
- 每个unit分配的时间单元称为tickDuration
- 整个ring分配的时间单元成为roundDuration = tickDuration * wheelSize
- 每个unit上对应一个Set<HashedWheelTimeout>结构,表示在这个unit上面需要检查超时的Timeout
public class HashedWheelTimer implements Timer { private static final AtomicInteger id = new AtomicInteger(); // 用来为实例分配编号 private static final SharedResourceMisuseDetector misuseDetector = new SharedResourceMisuseDetector(HashedWheelTimer.class); // 用来限制创建实例 private final Worker worker = new Worker(); // 后台线程 final Thread workerThread; final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down private final long roundDuration; final long tickDuration; final Set<HashedWheelTimeout>[] wheel; final ReusableIterator<HashedWheelTimeout>[] iterators; // wheel里面Set对应的iterator. final int mask; // wheelSize = (1 << n). mask = (1 << n)-1 这样好做%操作 final ReadWriteLock lock = new ReentrantReadWriteLock(); // 涉及到多线程安全问题 volatile int wheelCursor; }
- 每次请求的Timeout会根据delay,当前时间,转换成为 a)round(需要检查多少轮) b)据当前wheelCursor的偏移offset(放置在ring什么位置上)
- 也就是将delay这个时间概念,转换成为两个状态变量。timer内部通过判断这两个状态变量来判断超时与否
- 后台线程每隔tickDuration会检查下一个wheelCursor上的Timeout请求,判断那些存在超时,如果超时的话那么触发TimerTask这个操作。
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) { // Normalize ticksPerWheel to power of two and initialize the wheel. wheel = createWheel(ticksPerWheel); // 创建wheel iterators = createIterators(wheel); // 创建iterators mask = wheel.length - 1; // Convert tickDuration to milliseconds. this.tickDuration = tickDuration = unit.toMillis(tickDuration); roundDuration = tickDuration * wheel.length; workerThread = threadFactory.newThread(new ThreadRenamingRunnable( worker, "Hashed wheel timer #" + id.incrementAndGet())); // 构造线程,但是注意没有启动 // Misuse check misuseDetector.increase(); // 检测创建实例数量 } private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) { ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new MapBackedSet<HashedWheelTimeout>( new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4)); } return wheel; } private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) { ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length]; for (int i = 0; i < wheel.length; i ++) { iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator(); } return iterators; } private static int normalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1; } return normalizedTicksPerWheel; }
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { final long currentTime = System.currentTimeMillis(); start(); // 启动工作线程 delay = unit.toMillis(delay); HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay); // 构造Timeout对象,比较trival. scheduleTimeout(timeout, delay); // 将Timeout对象放置到wheel内部 return timeout; } public void start() { switch (workerState.get()) { case 0: if (workerState.compareAndSet(0, 1)) { // 确保只是启动一次 workerThread.start(); } break; case 1: break; case 2: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error(); } } void scheduleTimeout(HashedWheelTimeout timeout, long delay) { // delay must be equal to or greater than tickDuration so that the // worker thread never misses the timeout. if (delay < tickDuration) { // 如果delay时间过短的话那么修正到tickDuration. delay = tickDuration; } // Prepare the required parameters to schedule the timeout object. final long lastRoundDelay = delay % roundDuration; final long lastTickDelay = delay % tickDuration; final long relativeIndex = lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0); // 计算相距当前的wheelCursor偏移 final long remainingRounds = delay / roundDuration - (delay % roundDuration == 0? 1 : 0); // 计算需要轮转多少次才会触发超时 // Add the timeout to the wheel. lock.readLock().lock(); try { int stopIndex = (int) (wheelCursor + relativeIndex & mask); timeout.stopIndex = stopIndex; timeout.remainingRounds = remainingRounds; wheel[stopIndex].add(timeout); } finally { lock.readLock().unlock(); } }
后台检查超时触发线程. 注意里面的deadline并不是每次调用currentTimeMillis, 而是在startTime上面不断叠加的,然后在sleep过程中进行修正。可能会出现略微时间偏差。
public void run() { List<HashedWheelTimeout> expiredTimeouts = new ArrayList<HashedWheelTimeout>(); startTime = System.currentTimeMillis(); tick = 1; // 初始tick = 1 while (workerState.get() == 1) { // 当前处于工作状态 final long deadline = waitForNextTick(); // 等待到下一个tick if (deadline > 0) { // 判断返回值,如果>0表示deadline, 否则认为无效 fetchExpiredTimeouts(expiredTimeouts, deadline); // 判断那些Timeout需要触发,保存到expiredTimouts notifyExpiredTimeouts(expiredTimeouts); // 触发expiredTimeouts里面的Timeout } } } private long waitForNextTick() { // 这个过程非常好理解,就是等待一段时间 long deadline = startTime + tickDuration * tick; for (;;) { final long currentTime = System.currentTimeMillis(); long sleepTime = tickDuration * tick - (currentTime - startTime); // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See if (DetectionUtil.isWindows()) { sleepTime = sleepTime / 10 * 10; } if (sleepTime <= 0) { break; } try { Thread.sleep(sleepTime); } catch (InterruptedException e) { if (workerState.get() != 1) { // 如果不是工作状态就返回-1 return -1; } } } // Increase the tick. tick ++; return deadline; } private void fetchExpiredTimeouts( List<HashedWheelTimeout> expiredTimeouts, long deadline) { // Find the expired timeouts and decrease the round counter // if necessary. Note that we don't send the notification // immediately to make sure the listeners are called without // an exclusive lock. lock.writeLock().lock(); try { int newWheelCursor = wheelCursor = wheelCursor + 1 & mask; ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor]; fetchExpiredTimeouts(expiredTimeouts, i, deadline); // 检查当前unit下面的iterators是否存在超时 } finally { lock.writeLock().unlock(); } } private void fetchExpiredTimeouts( List<HashedWheelTimeout> expiredTimeouts, ReusableIterator<HashedWheelTimeout> i, long deadline) { List<HashedWheelTimeout> slipped = null; i.rewind(); while (i.hasNext()) { HashedWheelTimeout timeout =; if (timeout.remainingRounds <= 0) { / i.remove(); if (timeout.deadline <= deadline) { // 判断超时之后需要检查deadline. expiredTimeouts.add(timeout); } else { // Handle the case where the timeout is put into a wrong // place, usually one tick earlier. For now, just add // it to a temporary list - we will reschedule it in a // separate loop. if (slipped == null) { // 有可能存在一些计时偏差情况,单独处理这种情况 slipped = new ArrayList<HashedWheelTimeout>(); } slipped.add(timeout); } } else { timeout.remainingRounds --; } } // Reschedule the slipped timeouts. if (slipped != null) { // 将存在偏差的Timeout重新设置timeout. for (HashedWheelTimeout timeout: slipped) { scheduleTimeout(timeout, timeout.deadline - deadline); } } } private void notifyExpiredTimeouts( List<HashedWheelTimeout> expiredTimeouts) { // 触发超时事件 // Notify the expired timeouts. for (int i = expiredTimeouts.size() - 1; i >= 0; i --) { expiredTimeouts.get(i).expire(); } // Clean up the temporary list. expiredTimeouts.clear(); }