Netty网络框架(三)

简介: Netty网络框架

Netty网络框架(二)https://developer.aliyun.com/article/1469520


3、EventLoop

EventLoop事件循环,监听IO事件,内部封装了线程

EventLoopGroup事件循环组,是对EventLoop的管理,封装了线程池。

当新建channel时,group会为其分配一个EventLoop,封装了nio中的Selector,监听通道中的所有事件,一个通道的生命周期内,所有操作都由相同的EventLoop所封装的线程处理。

同时,多个通道可以由一个EventLoop处理,是多对一的关系

1)EventLoopGroup

类层级结构
(通过选中NioEventLoopGroup源码 - 右键 -  选中Diagrams  -  选中show diagram 展示出来)

A  初始化流程
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
[NioEventLoopGroup]
-------------------------------------------------------------------------
    public NioEventLoopGroup() {
        this(0);
    }
    
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    // 逐步增加参数
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    // 调用父类的构造器
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    
    
 [MultithreadEventLoopGroup]
------------------------------------------------------------------------- 
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    
    // 初始化线程数量的逻辑     线程数 = cpu核数 * 2
    private static final int DEFAULT_EVENT_LOOP_THREADS;
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    
    
[MultithreadEventExecutorGroup]    
------------------------------------------------------------------------- 
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    
    // 核心逻辑
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 根据线程数量  创建EventLoop的逻辑  
                // newChild()的具体实现在NioEventLoopGroup类中
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
        chooser = chooserFactory.newChooser(children);
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    
    
[NioEventLoopGroup]    
-------------------------------------------------------------------------     
     @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }

2)EventLoop

重要属性为  Selector   及其父类的父类中的 Thread
Selector用于在channel创建之后,注册其中监听后续的I/O事件
Thread用于进行轮询,在channel注册之后启动线程

A 注册channel
ChannelFuture future = serverBootstrap.bind(8888).sync();
 
 [AbstractBootstrap]
 --------------------------------------------------------------------------
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    
    
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        .....
    }
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

注册的源码调用链路

ChannelFuture regFuture = config().group().register(channel);
[MultithreadEventLoopGroup]
------------------------------------------------------------------------------
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    
[SingleThreadEventLoop]    
------------------------------------------------------------------------------
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    
    
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
  
 
[AbstractChannel]
------------------------------------------------------------------------------
//  核心逻辑是 调用了 register0()方法
 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
        
      // channel中存在EventLoop类型的属性
            // 通道初始化时,会将指定的EventLoop与channel进行关联
            AbstractChannel.this.eventLoop = eventLoop;
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
        
        
     // 核心逻辑是调用了doRegister()方法
     private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;
                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }        
  
[AbstractNioChannel]
----------------------------------------------------------------------------
// 核心注册逻辑
protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 将通道注册进Selector中 此时感兴趣的事件是0
                // 并且将当前对象作为附加对象存入其中  等价selectionKey.attach()方法
                // 使用对象时   再通过selectionkey.attachment()方法取出对象
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
B 轮询事件的状态

读取源码的思路:找到入口,找到核心逻辑。

AbstractChannel中register()方法,对eventLoop.execute()的调用,就是启动线程进行轮询的入口。

[SingleThreadEventExecutor]
-----------------------------------------------------------------------------
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }
    
    private final Queue<Runnable> taskQueue;
    //  当前类维护了一个任务队列
    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            // 启动线程
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }
        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }
    
    
    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
    
    
    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 真正的调用逻辑
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } 
                
                ......
            }
         }
     }
[NioEventLoop]
-----------------------------------------------------------------------------
protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO
                    case SelectStrategy.SELECT: 
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            // 判断队列中是否存在任务
                            if (!hasTasks()) { 
                                // 如果不存在  调用select()进行获取
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            // 处理任务的核心逻辑
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

processSelectedKeys()是处理任务的核心逻辑,来自于NioEventLoop的run()方法调用

// 处理事件集合
    private void processSelectedKeys() {
        // 如果selectedKeys(事件集合)没有值,重新获取,如果有值,直接处理
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    
     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            // 这是注册时,存储的附加对象,即为通道对象channel
            final Object a = k.attachment();
            i.remove();
            if (a instanceof AbstractNioChannel) {
                // 处理具体事件
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            if (!i.hasNext()) {
                break;
            }
            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();
                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }
    
    // 处理具体事件
    //  判断事件类型,调用对应的逻辑处理
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }
        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

其中读事件的处理

unsafe.read();
[AbstractNioByteChannel]
----------------------------------------------------------------------------
public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);
            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }

4、Bootstrap

引导,对应用程序进行配置,并让他运行起来的过程。

1)配置

必选参数:group 、 channel、 handler(服务端 -- childHandler)

group(): 指定一个到两个reactor

channel():指定channel工厂,反射的方式创建channel使用

handler():指定reactor的处理器,其中childHandler指定的是,服务端所接收到的客户端channel使用的处理器,而服务端的主reactor(bossGroup),已经默认添加了acceptor处理器,所以可以不指定。

option():指定TCP相关的参数,以及netty自定义的参数

配置参数的过程,称之为初始化。

2)运行

// 启动并设置端口号  但需要使用sync异步启动
        try {
            ChannelFuture future = serverBootstrap.bind(8888).sync();
            // 将关闭通道的方式 也设置为异步的
            //    阻塞finally中的代码执行
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

bind(),将服务端的channel绑定到端口号,然后接收客户端的连接,让整个netty运行起来。

sync(),因为绑定事件是异步的,所以使用sync同步等待结果,换句话说,bind只触发了绑定端口的事件,需要使用sync等待事件执行的结果。

future.channel().closeFuture().sync(),含义为,当通道被关闭时,才执行后续的操作,sync使当前线程执行到此处阻塞,以确保不执行后续的shutdown方法。

3)源码解析

A 类声明
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

嵌套的泛型使用,可以达到,子类中返回子类本身的效果,具体如下:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
B  init方法

工作内容

a、设置channel相关的选项参数
b、设置channel的属性键值对
c、添加对channel的IO事件处理器 (Acceptor角色)

void init(Channel channel) {
        // 设置channel相关的选项参数
        setChannelOptions(channel, newOptionsArray(), logger);
        // 设置channel的属性键值对
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                
                // 添加对channel的IO事件处理器 (Acceptor角色)
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

Acceptor分析

功能:将主reactor接收到的客户端通道,传递给从reactor

// ServerBootstrapAcceptor是ServerBootstrap的静态内部类
// netty将acceptor看作一个处理器,并且是入站处理器
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
// 具体的逻辑封装到channelRead()方法中
// 对客户端通道进行配置 , 然后注册到从Reactor中
public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 此时msg对应 服务端接收到的客户端通道
            final Channel child = (Channel) msg;
        // 设置处理链路
            child.pipeline().addLast(childHandler);
      // 设置通道的配置项和参数
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);
            try {
                // childGroup 是从reactor的资源池 调用注册方法 注册客户端通道child
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {          
                        // 增加监听 获取注册的异步结果
                        // 如果注册失败 或者抛出异常  都会关闭channel
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
        
        
        // 如果处理客户端连接失败了,暂停一秒,然后继续接受
        // 为保障服务端能够尽可能多的处理客户端的连接  不受某一次处理失败的结果影响
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final ChannelConfig config = ctx.channel().config();
            if (config.isAutoRead()) {
                // stop accept new connections for 1 second to allow the channel to recover
                // See https://github.com/netty/netty/issues/1328
                config.setAutoRead(false);
                ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
            }
            // still let the exceptionCaught event flow through the pipeline to give the user
            // a chance to do something with it
            ctx.fireExceptionCaught(cause);
        }

4) Future和Promise

Future代表的是,一个还未完成的异步任务的执行结果。可以通过addListener方法,监听执行结果后进行相应的处理,此时它的状态可以分为未完成、已完成(成功、失败、主动取消)等。

对Future而言,状态只能读取,无法更改,又出现了Promise,但是Promise只能更改一次。

参照生活中的定额发票(Future)和机打发票(Promise)。

(四)拓展篇

1、心跳检测

检测逻辑:
1) 服务端启动,客户端建立连接,连接的目的是互相发送消息。
2) 如果客户端在工作,服务端一定能收到数据,如果客户端空闲,服务端会出现资源浪费。
3) 服务端需要一种检测机制,验证客户端的活跃状态,不活跃则关闭。

需求设计:
1) 客户端向服务端发送 “I am alive” , sleep一个随机时间,模拟空闲状态
2) 服务端收到消息后,返回“over”, 客户端有空闲,记录空闲次数
3) 设定阈值,达到阈值时主动关闭连接

IdleStateHandler , 是netty提供的处理器
1)超过多长时间没有读   readerIdleTime

  1. 超过多长时间没有写   writerIdleTime
  2. 超过多长时间没有读和写   allIdleTime

底层实现检测的是 IdleStateEvent事件,通过管道传递给下一个handler处理,处理方法是userEventTriggered。

其中IdleStateEvent事件,分为READER_IDLE、WRITER_IDLE、ALL_IDLE三大类

2、TCP粘包拆包

TCP是基于流的。

当客户端发送多个包,服务端只收到一个包,此时发生了粘包。
当客户端发送多个包,服务端也接收到多个包,但是包是不完整,多了或少了数据,此时发生了拆包。

UDP会发生拆包和粘包吗?
不会,UDP是基于报文的,在UDP的首部使用16bit存储报文的长度,因此不会发生。

TCP发生粘包和拆包的本质原因:
要发送的数据先经过TCP的缓冲区,还限制了最大报文长度。
A  如果要发送的数据 > TCP剩余的缓冲区大小,发生拆包
B  如果要发送的数据 >  最大报文长度,发生拆包
C  如果要发送的数据 << TCP剩余的缓冲区大小,发生粘包
D  接收数据的应用层,没有及时读取缓冲区数据,也会发生粘包

解决办法:
A  设置出消息的长度
B  设置出消息的边界——分隔符

Netty提供的解码器,两类

A 基于长度的解码器,在包头部设置出数据的长度。(类似于UDP的头部处理)
LengthFieldBasedFrameDecoder  自定义长度的处理方式
FixedLengthFrameDecoder   固定长度的处理方式

B 基于分隔符的解码器
DelimiterBasedFrameDecoder  自定义分隔符的处理方式
LineBasedFrameDecoder   行尾("\n"或"\r\n")分隔符的处理方式

【Demo逻辑】
需求:客户端循环100次向服务端请求时间

1)第一种方式,传输的过程数据单位是字节流ByteBuf,需要自行处理分隔符以及数据的长度,此时会出现粘包和拆包的问题

2)第二种方式,使用LineBasedFrameDecoder,配合StringDecoder使用,传输的数据单位变成字符串,可以直接处理,保证业务逻辑上的包和真正传输的包是一致的

3、序列化框架——protobuf

protobuf = protocol buffers
类似于xml的生成和解析,但效率更高,生成的是字节码,可读性稍差。

【Demo逻辑】

1)安装idea插件,protobuf support

          如果安装之后,创建*.proto文件没有使用插件,手动设置关联关系

          settings ->  file types  ->  找到protobuf ->  增加正则表达式

2)引入maven依赖和插件

<properties>
    <os.detected.classifier>windows-x86_64</os.detected.classifier>
</properties>
<build>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.0</version>
                <configuration>
                    <protocArtifact>
                        com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
                    </protocArtifact>
                    <pluginId>grpc-java</pluginId>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3)在右侧maven project中可以找到相应的插件 (没有的话刷新)

4)在和java平级的目录下,创建proto文件夹,然后创建person.proto文件
5)person.proto

// 声明包名称的空间
syntax="proto3";
// 具体的类生成目录
option java_package="com.duing";
// 具体的类名
option java_outer_classname="PersonModel";
// 类结构
message Person{
    int32 id = 1;    // 此处的1代表顺序
    string name = 2;
}
  1. 使用插件进行编译,将编译生成的代码拷贝到需要的目录下
    7)编写测例进行序列化和反序列化操作
目录
相关文章
|
2月前
|
设计模式 安全 测试技术
深入理解与应用自动化测试框架 — 以Selenium为例网络防线的构筑者:洞悉网络安全与信息安全的核心要素
【5月更文挑战第29天】 在快速迭代的软件开发过程中,自动化测试已成为提高测试效率、确保软件质量的重要手段。本文将深入探讨自动化测试框架Selenium的核心概念、架构以及实际应用中的关键技巧,旨在为读者提供一篇系统性的分析与实践指南。文章首先概述了自动化测试的必要性和Selenium框架的基本特征;随后详细剖析了Selenium的组件结构,并结合实例讲解如何高效地设计和执行测试用例;最后,讨论了当前自动化测试面临的挑战及未来发展趋势。
|
2月前
|
安全 网络协议 网络安全
OWASP Top 10 网络安全10大漏洞——A01,源码+原理+手写框架
OWASP Top 10 网络安全10大漏洞——A01,源码+原理+手写框架
|
9天前
|
缓存 前端开发 API
了解python中几个主流的网络框架
【6月更文挑战第21天】探索Python Web几个流行框架,了解各框架特性以适应不同场景需求。
28 1
|
12天前
|
网络协议 Java 物联网
Netty是什么?深入理解高性能网络框架
Netty是什么?深入理解高性能网络框架
44 1
|
20天前
|
数据采集 存储 中间件
Scrapy,作为一款强大的Python网络爬虫框架,凭借其高效、灵活、易扩展的特性,深受开发者的喜爱
【6月更文挑战第10天】Scrapy是Python的高效爬虫框架,以其异步处理、多线程及中间件机制提升爬取效率。它提供丰富组件和API,支持灵活的数据抓取、清洗、存储,可扩展到各种数据库。通过自定义组件,Scrapy能适应动态网页和应对反爬策略,同时与数据分析库集成进行复杂分析。但需注意遵守法律法规和道德规范,以合法合规的方式进行爬虫开发。随着技术发展,Scrapy在数据收集领域将持续发挥关键作用。
65 4
|
19天前
|
监控 网络协议 Java
Java一分钟之-Netty:高性能异步网络库
【6月更文挑战第11天】Netty是Java的高性能异步网络框架,基于NIO,以其高吞吐量、低延迟、灵活性和安全性受到青睐。常见问题包括内存泄漏、ChannelHandler滥用和异常处理不当。要规避这些问题,需正确释放ByteBuf,精简ChannelPipeline,妥善处理异常,并深入理解Netty原理。通过代码审查、遵循最佳实践和监控日志,可提升代码质量和性能。掌握Netty,打造高效网络服务。
22 2
|
2天前
|
缓存 网络协议 安全
探秘Netty:打造高性能网络通信利器
探秘Netty:打造高性能网络通信利器
2 0
|
23天前
|
数据采集 中间件 调度
Scrapy:高效的网络爬虫框架
Scrapy是Python的网络爬虫框架,用于快速构建和开发爬虫。它提供简单API和全功能环境,包括请求调度、HTML解析、数据存储等,让开发者专注爬虫逻辑。Scrapy工作流程包括发起请求、下载响应、解析数据、处理数据和发送新请求。其核心组件有调度器、下载器、解析器(Spiders)和Item Pipeline,广泛应用于数据挖掘、信息监测、搜索引擎和自动化测试。有效技巧包括合理设置请求参数、编写高效解析器、使用代理和防反爬策略,以及利用中间件。随着大数据和AI的发展,Scrapy在爬虫领域的地位将持续巩固。【6月更文挑战第6天】
27 0
|
2月前
|
编解码 前端开发 Java
Java网络API之Netty深度解析
Java网络API之Netty深度解析
25 0
|
2月前
|
机器学习/深度学习 存储 安全
网络安全与信息安全:防护、检测与教育移动应用开发的未来:跨平台框架与原生系统的融合
【5月更文挑战第27天】在数字化时代,数据成为了新的货币。因此,确保信息的安全性和网络的完整性变得至关重要。本文将探讨网络安全漏洞的概念、加密技术的重要性以及提升安全意识的必要性。通过分析不同类型的安全威胁,我们讨论了如何利用现代加密技术和多层防御策略来保护信息系统。此外,文中还强调了教育和培训在构建坚固的安全防线中的作用。