Netty
Table of Contents
- Netty: Home (Main.WebHome) https://netty.io/
- The Netty Project 3.x User Guide http://static.netty.io/3.5/guide/
- All documentation pages http://netty.io/wiki/all-documents.html
1. architecture
2. concepts
2.1. Channel
可以认为是一个connection,这个connection上面有socket fd可以所以进行读写。我们只能够控制是否可读,而不能够控制是否可写,因为事实上socket永远是可写的(只要write buffer有空余)。所以要注意,一旦如果在非IO线程发起写操作的话,那么在IO线程随时都会有writeComplete等事件发生,这点对于顺序的控制至关重要。
2.2. ChannelFactory
创建Channel并且对Channel进行管理包括事件检测以及回调处理。涉及到管理那么就需要有线程池,这个线程池需要外部进行指定。
2.3. ChannelHandler
在这个connection上面的处理逻辑对象。总的来说有下面两种类型的handler
- ChannelUpstreamHandler 从net layer到app layer的数据流动处理逻辑
- ChannelDownstreamHandler 从app layer到net layer的数据流动处理逻辑
然后在这两种类型的handler上面派生很多其他的handler,比如
- SimpleChannelHandler 可以同时处理upstream和downstream
- Encoder/Decoder 协议的编码和解码
2.4. ChannelHandlerContext
这个ChannelHandler对应的context. 实际上是这个context是包含了handler. context内部有prev,next字段能够将pipeline里面的context组成链表。
实际上pipeline是将context组织成为链表结构,如果需要senndUpStream或者是sendDownStream的话,得到对应的context,从中取出handler,然后决定是否需要处理。
2.5. ChannelPipeline
1 Channel + N ChannelHandler,连接和这个连接上所有的处理逻辑对象。
所有的Handler组成为了链表,不管是upstream还是downstream都会流过所有的handler.netty框架会判断这个handler是否需要处理某个事件。
2.6. ChannelPipelineFactory
创建ChannelPipeline,主要是说明需要在这个Channel上面绑定哪些ChannelHandler.
2.7. ChannelEvent
连接上所有触发的事件,可以是但是不仅限于下面几种类型
- ChannelStateEvent 连接状态的变化事件
- MessageEvent 消息可读(可以是ChannelBuffer读取字节流,也可以是已经成帧的消息)
- WriteCompletionEvent
数据写完这里并不是指数据已经完全写完,只是指部分数据写出。实验写出大数据量的时候这个事件被触发了多次。 - ExceptionEvent 异常事件,可能是IO线程问题也可能是ChannelHandler处理异常
对于某一个Channel来说,它的事件发生检测以及回调处理都是一个IO线程里面完成的
2.8. ChannelBuffer
这个和Channel倒是没有太大关系,比较偏向buffer的管理,主要针对网络通信这种read/write交互场景进行优化。
2.9. ChannelGroup
线程安全的Channel集合,如果Channel关闭的话那么会自动从group里面移除,能够对Channel进行批量操作。
3. notes
3.1. boss/worker executor
NioServerSocketChannelFactory构造函数需要指定boss/worker两个executor,有必要解释一下这两个对象的含义。
首先了解一下netty的网络模型
- boss创建accept fd之后阻塞调用accept. 一旦accept一个connection之后,将这个connection fd交给worker.
- 分摊到哪个worker上的算法应该是round robin
- 早期netty版本只是支持一个boss线程做accept,但是一个现成做accept限制了性能,所以在后续版本支持多个现成做accept
- 允许绑定到多个端口,因为在boss线程里面也是使用epoll来做IO复用的。
- worker将这个connection fd加入到自己的epoll/selector里面,检测可读可写事件的发生
- boss/worker都是runnable对象,需要外部提供线程池来执行
worker的数量如果没有指定的话,with 2 * the number of available processors in the machine. The number of available processors is obtained by Runtime.availableProcessors() 也可以用这个构造函数可以指定 public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor, int workerCount)
所以如果传入的worker executor使用newFixedThreadPool并且线程数目比较小的话,那么就会出现这个问题 "netty的固定个数的worker线程阻塞大量的并发连接" http://www.oschina.net/question/241182_40955 现象就是如果你的CPU core为4,那么就会创建8个worker对象,而如果线程数目使用5,那么浏览器创建第6个连接之后没有办法正常都写,原因就是因为boss创建第6个连接的时候,给第6个worker处理,而这个worker没有线程池可以run起来。
这个问题解决办法也非常简单,要不就使用newCachedThreadPool(),要不就自己指定worker IO thread数目,但是 必须确保创建的线程数目>=2*CPU core.
3.2. accept connection exception
Exception in thread "pool-2-thread-1" java.lang.InternalError at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:755) at sun.misc.URLClassPath.getResource(URLClassPath.java:169) at java.net.URLClassLoader$1.run(URLClassLoader.java:194) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at sun.misc.Launcher$ExtClassLoader.findClass(Launcher.java:229) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at java.lang.ClassLoader.loadClass(ClassLoader.java:295) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.util.ResourceBundle$RBClassLoader.loadClass(ResourceBundle.java:435) at java.util.ResourceBundle$Control.newBundle(ResourceBundle.java:2289) at java.util.ResourceBundle.loadBundle(ResourceBundle.java:1364) at java.util.ResourceBundle.findBundle(ResourceBundle.java:1328) at java.util.ResourceBundle.findBundle(ResourceBundle.java:1282) at java.util.ResourceBundle.getBundleImpl(ResourceBundle.java:1224) at java.util.ResourceBundle.getBundle(ResourceBundle.java:705) at java.util.logging.Level.getLocalizedName(Level.java:223) at java.util.logging.SimpleFormatter.format(SimpleFormatter.java:64) at java.util.logging.StreamHandler.publish(StreamHandler.java:177) at java.util.logging.ConsoleHandler.publish(ConsoleHandler.java:88) at java.util.logging.Logger.log(Logger.java:478) at java.util.logging.Logger.doLog(Logger.java:500) at java.util.logging.Logger.logp(Logger.java:700) at org.jboss.netty.logging.JdkLogger.warn(JdkLogger.java:80) at org.jboss.netty.logging.InternalLoggerFactory$1.warn(InternalLoggerFactory.java:128) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:316) at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Caused by: java.util.zip.ZipException: error in opening zip file at java.util.zip.ZipFile.open(Native Method) at java.util.zip.ZipFile.<init>(ZipFile.java:127) at java.util.jar.JarFile.<init>(JarFile.java:135) at java.util.jar.JarFile.<init>(JarFile.java:72) at sun.misc.URLClassPath$JarLoader.getJarFile(URLClassPath.java:646) at sun.misc.URLClassPath$JarLoader.access$600(URLClassPath.java:540) at sun.misc.URLClassPath$JarLoader$1.run(URLClassPath.java:607) at java.security.AccessController.doPrivileged(Native Method) at sun.misc.URLClassPath$JarLoader.ensureOpen(URLClassPath.java:599) at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:753) ... 32 more
从这个excetion backtrace上看,似乎是log找不到出现的问题。
Bug 745866 – Possible netty logging config problem
这个同学在压力测试下面也出现了这个问题,主要原因还是netty没有办法accept connection. 这个可能还是和我们的资源配置有关,有可能是某个内核参数。
Got this error - Syahreza Pahlevi Ginting
- http://www.mentby.com/syahreza-pahlevi-ginting/got-this-error.html
- http://gleamynode.net/articles/1557/
这个同学的建议还是说和file-max/file limits有关。
之后我调整了file limits之后便没有遇到这个问题了,所以可能确实和文件句柄数目限制有关
3.3. control timeout
网络上并没有太多如何关于netty读写超时信息的控制。下面是一篇相对来说比较有启发性的回答:
- Setting socket timeout on netty channel - Stack Overflow : http://stackoverflow.com/questions/3726696/setting-socket-timeout-on-netty-channel
但是这种方式在现实中意义却不大。分析ReadTimeoutHandler代码实现会发现, 我们没有办法将超时计算reset, 也没有办法将超时计算停止。而且一旦完成一次timeout计算之后,又会和触发下一轮的timeout计算。 对于WriteTimeoutHandler也是如此。
事实上我们是可以通过使用ReadTimeoutHandler/WriteTimeoutHandler来完成读写超时控制的,只不过不能够像在SO回答的那样写在PipelineFactory里面,而必须动态创建,而Pipeline和ChannelHandlerContext的设计为这种方法提供了可能。
以ReadTimeoutHandler为例
- 在发起读之前,我们可以通过channel.setReadable(false)来关闭读取
- 如果需要发起读的话,假设我们处理逻辑的ChannelHandlerContext为ctx
- 首先在ctx之前创建一个ReadTimeoutHandler ctx.getPipeline().addBefore(ctx.getName(),"rto", new ReadTimeoutHandler(timer, 10));
- 然后允许channel读数据 channel.setReadable(true)
- 如果10s之内没有数据的话,那么会触发一个ReadTimeoutException, 这样我们可以做后续处理。
- 这个ReadTimeoutException是timer内部线程触发的,但是无须担心多线程问题,因为timer会将这个Exception事件交给IO线程来触发
private void fireReadTimedOut(final ChannelHandlerContext ctx) throws Exception { ctx.getPipeline().execute(new Runnable() { public void run() { try { readTimedOut(ctx); } catch (Throwable t) { fireExceptionCaught(ctx, t); } } }); } @Override public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { Channel ch = pipeline.getChannel(); if (ch instanceof AbstractNioChannel<?>) { AbstractNioChannel<?> channel = (AbstractNioChannel<?>) ch; ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); channel.worker.executeInIoThread(wrapper); return wrapper; } return super.execute(pipeline, task); }
- 如果在10s内有数据被处理的话,那么就会调用messageReceived回调,在回调里面我们可以删除这个handler ctx.getPipeline().remove("rto"); 这样便不会触发ReadTimeoutException
对WriteTimeoutHandler同理,因为我们不能够setWritable,所以必须在write之前就安装好handler
- ctx.getPipeline().addBefore(ctx.getName(),"wto",new WriteTimeoutHandler(timer,10));
- ctx.getChannel.write()
- 如果在10s内没有写完的话,那么就会触发一个WriteTimeoutException
- 如果在10s内写完的话,那么就会触发writeComplete回调,在回调里面我们可以删除这个handler ctx.getPipeline().remove("wto");
- writeComplete只要部分数据写成功的时候就会触发,所以一次write可能会触发多次writeComplete事件,所以这里remove需要注意只能够remove一次
其实writeTimeout这个事件其实大部分时候是不需要的:对于server而言write之后就不care了,而对于client而言write之后直接使用read来触发readTimeout更加合适。
3.4. HashedWheelTimer
定时器的实现,相关其接口如下
public interface Timer { Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); // 发起定时任务 Set<Timeout> stop(); // 返回所有因为stop取消的定时任务 } public interface TimerTask { void run(Timeout timeout) throws Exception; // 超时触发或者取消 } public interface Timeout { Timer getTimer(); TimerTask getTask(); boolean isExpired(); // 是否超时 boolean isCancelled(); // 是否取消 void cancel(); // 发起取消操作 }
数据结构如下:
- 整个数据结构是一个ring
- wheelSize是ring大小
- wheelCursor是当前在ring上的index
- 每个unit分配的时间单元称为tickDuration
- 整个ring分配的时间单元成为roundDuration = tickDuration * wheelSize
- 每个unit上对应一个Set<HashedWheelTimeout>结构,表示在这个unit上面需要检查超时的Timeout
使用Set来管理一个环上面的对象还是稍微有点不太好。虽然移除任务比较方便,但是每次都需要检查许多没有timeout的TimerTask.
public class HashedWheelTimer implements Timer { private static final AtomicInteger id = new AtomicInteger(); // 用来为实例分配编号 private static final SharedResourceMisuseDetector misuseDetector = new SharedResourceMisuseDetector(HashedWheelTimer.class); // 用来限制创建实例 private final Worker worker = new Worker(); // 后台线程 final Thread workerThread; final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down private final long roundDuration; final long tickDuration; final Set<HashedWheelTimeout>[] wheel; final ReusableIterator<HashedWheelTimeout>[] iterators; // wheel里面Set对应的iterator. final int mask; // wheelSize = (1 << n). mask = (1 << n)-1 这样好做%操作 final ReadWriteLock lock = new ReentrantReadWriteLock(); // 涉及到多线程安全问题 volatile int wheelCursor; }
整个逻辑大致是这样的:
- 每次请求的Timeout会根据delay,当前时间,转换成为 a)round(需要检查多少轮) b)据当前wheelCursor的偏移offset(放置在ring什么位置上)
- 也就是将delay这个时间概念,转换成为两个状态变量。timer内部通过判断这两个状态变量来判断超时与否
- 后台线程每隔tickDuration会检查下一个wheelCursor上的Timeout请求,判断那些存在超时,如果超时的话那么触发TimerTask这个操作。
初始化
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) { // Normalize ticksPerWheel to power of two and initialize the wheel. wheel = createWheel(ticksPerWheel); // 创建wheel iterators = createIterators(wheel); // 创建iterators mask = wheel.length - 1; // Convert tickDuration to milliseconds. this.tickDuration = tickDuration = unit.toMillis(tickDuration); roundDuration = tickDuration * wheel.length; workerThread = threadFactory.newThread(new ThreadRenamingRunnable( worker, "Hashed wheel timer #" + id.incrementAndGet())); // 构造线程,但是注意没有启动 // Misuse check misuseDetector.increase(); // 检测创建实例数量 } private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) { ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new MapBackedSet<HashedWheelTimeout>( new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4)); } return wheel; } private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) { ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length]; for (int i = 0; i < wheel.length; i ++) { iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator(); } return iterators; } private static int normalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1; } return normalizedTicksPerWheel; }
提交Timeout
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { final long currentTime = System.currentTimeMillis(); start(); // 启动工作线程 delay = unit.toMillis(delay); HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay); // 构造Timeout对象,比较trival. scheduleTimeout(timeout, delay); // 将Timeout对象放置到wheel内部 return timeout; } public void start() { switch (workerState.get()) { case 0: if (workerState.compareAndSet(0, 1)) { // 确保只是启动一次 workerThread.start(); } break; case 1: break; case 2: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error(); } } void scheduleTimeout(HashedWheelTimeout timeout, long delay) { // delay must be equal to or greater than tickDuration so that the // worker thread never misses the timeout. if (delay < tickDuration) { // 如果delay时间过短的话那么修正到tickDuration. delay = tickDuration; } // Prepare the required parameters to schedule the timeout object. final long lastRoundDelay = delay % roundDuration; final long lastTickDelay = delay % tickDuration; final long relativeIndex = lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0); // 计算相距当前的wheelCursor偏移 final long remainingRounds = delay / roundDuration - (delay % roundDuration == 0? 1 : 0); // 计算需要轮转多少次才会触发超时 // Add the timeout to the wheel. lock.readLock().lock(); try { int stopIndex = (int) (wheelCursor + relativeIndex & mask); timeout.stopIndex = stopIndex; timeout.remainingRounds = remainingRounds; wheel[stopIndex].add(timeout); } finally { lock.readLock().unlock(); } }
后台检查超时触发线程. 注意里面的deadline并不是每次调用currentTimeMillis, 而是在startTime上面不断叠加的,然后在sleep过程中进行修正。可能会出现略微时间偏差。
@org.jboss.netty.util.HashedWheelTimer.Worker
public void run() { List<HashedWheelTimeout> expiredTimeouts = new ArrayList<HashedWheelTimeout>(); startTime = System.currentTimeMillis(); tick = 1; // 初始tick = 1 while (workerState.get() == 1) { // 当前处于工作状态 final long deadline = waitForNextTick(); // 等待到下一个tick if (deadline > 0) { // 判断返回值,如果>0表示deadline, 否则认为无效 fetchExpiredTimeouts(expiredTimeouts, deadline); // 判断那些Timeout需要触发,保存到expiredTimouts notifyExpiredTimeouts(expiredTimeouts); // 触发expiredTimeouts里面的Timeout } } } private long waitForNextTick() { // 这个过程非常好理解,就是等待一段时间 long deadline = startTime + tickDuration * tick; for (;;) { final long currentTime = System.currentTimeMillis(); long sleepTime = tickDuration * tick - (currentTime - startTime); // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (DetectionUtil.isWindows()) { sleepTime = sleepTime / 10 * 10; } if (sleepTime <= 0) { break; } try { Thread.sleep(sleepTime); } catch (InterruptedException e) { if (workerState.get() != 1) { // 如果不是工作状态就返回-1 return -1; } } } // Increase the tick. tick ++; return deadline; } private void fetchExpiredTimeouts( List<HashedWheelTimeout> expiredTimeouts, long deadline) { // Find the expired timeouts and decrease the round counter // if necessary. Note that we don't send the notification // immediately to make sure the listeners are called without // an exclusive lock. lock.writeLock().lock(); try { int newWheelCursor = wheelCursor = wheelCursor + 1 & mask; ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor]; fetchExpiredTimeouts(expiredTimeouts, i, deadline); // 检查当前unit下面的iterators是否存在超时 } finally { lock.writeLock().unlock(); } } private void fetchExpiredTimeouts( List<HashedWheelTimeout> expiredTimeouts, ReusableIterator<HashedWheelTimeout> i, long deadline) { List<HashedWheelTimeout> slipped = null; i.rewind(); while (i.hasNext()) { HashedWheelTimeout timeout = i.next(); if (timeout.remainingRounds <= 0) { / i.remove(); if (timeout.deadline <= deadline) { // 判断超时之后需要检查deadline. expiredTimeouts.add(timeout); } else { // Handle the case where the timeout is put into a wrong // place, usually one tick earlier. For now, just add // it to a temporary list - we will reschedule it in a // separate loop. if (slipped == null) { // 有可能存在一些计时偏差情况,单独处理这种情况 slipped = new ArrayList<HashedWheelTimeout>(); } slipped.add(timeout); } } else { timeout.remainingRounds --; } } // Reschedule the slipped timeouts. if (slipped != null) { // 将存在偏差的Timeout重新设置timeout. for (HashedWheelTimeout timeout: slipped) { scheduleTimeout(timeout, timeout.deadline - deadline); } } } private void notifyExpiredTimeouts( List<HashedWheelTimeout> expiredTimeouts) { // 触发超时事件 // Notify the expired timeouts. for (int i = expiredTimeouts.size() - 1; i >= 0; i --) { expiredTimeouts.get(i).expire(); } // Clean up the temporary list. expiredTimeouts.clear(); }