Netty网络框架(三)

简介: Netty网络框架

Netty网络框架(二)https://developer.aliyun.com/article/1469520


3、EventLoop

EventLoop事件循环,监听IO事件,内部封装了线程

EventLoopGroup事件循环组,是对EventLoop的管理,封装了线程池。

当新建channel时,group会为其分配一个EventLoop,封装了nio中的Selector,监听通道中的所有事件,一个通道的生命周期内,所有操作都由相同的EventLoop所封装的线程处理。

同时,多个通道可以由一个EventLoop处理,是多对一的关系

1)EventLoopGroup

类层级结构
(通过选中NioEventLoopGroup源码 - 右键 -  选中Diagrams  -  选中show diagram 展示出来)

A  初始化流程
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
[NioEventLoopGroup]
-------------------------------------------------------------------------
    public NioEventLoopGroup() {
        this(0);
    }
    
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    // 逐步增加参数
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    // 调用父类的构造器
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    
    
 [MultithreadEventLoopGroup]
------------------------------------------------------------------------- 
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    
    // 初始化线程数量的逻辑     线程数 = cpu核数 * 2
    private static final int DEFAULT_EVENT_LOOP_THREADS;
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    
    
[MultithreadEventExecutorGroup]    
------------------------------------------------------------------------- 
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    
    // 核心逻辑
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 根据线程数量  创建EventLoop的逻辑  
                // newChild()的具体实现在NioEventLoopGroup类中
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
        chooser = chooserFactory.newChooser(children);
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    
    
[NioEventLoopGroup]    
-------------------------------------------------------------------------     
     @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }

2)EventLoop

重要属性为  Selector   及其父类的父类中的 Thread
Selector用于在channel创建之后,注册其中监听后续的I/O事件
Thread用于进行轮询,在channel注册之后启动线程

A 注册channel
ChannelFuture future = serverBootstrap.bind(8888).sync();
 
 [AbstractBootstrap]
 --------------------------------------------------------------------------
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    
    
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        .....
    }
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

注册的源码调用链路

ChannelFuture regFuture = config().group().register(channel);
[MultithreadEventLoopGroup]
------------------------------------------------------------------------------
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    
[SingleThreadEventLoop]    
------------------------------------------------------------------------------
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    
    
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
  
 
[AbstractChannel]
------------------------------------------------------------------------------
//  核心逻辑是 调用了 register0()方法
 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
        
      // channel中存在EventLoop类型的属性
            // 通道初始化时,会将指定的EventLoop与channel进行关联
            AbstractChannel.this.eventLoop = eventLoop;
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
        
        
     // 核心逻辑是调用了doRegister()方法
     private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;
                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }        
  
[AbstractNioChannel]
----------------------------------------------------------------------------
// 核心注册逻辑
protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 将通道注册进Selector中 此时感兴趣的事件是0
                // 并且将当前对象作为附加对象存入其中  等价selectionKey.attach()方法
                // 使用对象时   再通过selectionkey.attachment()方法取出对象
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
B 轮询事件的状态

读取源码的思路:找到入口,找到核心逻辑。

AbstractChannel中register()方法,对eventLoop.execute()的调用,就是启动线程进行轮询的入口。

[SingleThreadEventExecutor]
-----------------------------------------------------------------------------
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }
    
    private final Queue<Runnable> taskQueue;
    //  当前类维护了一个任务队列
    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            // 启动线程
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }
        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }
    
    
    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
    
    
    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 真正的调用逻辑
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } 
                
                ......
            }
         }
     }
[NioEventLoop]
-----------------------------------------------------------------------------
protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO
                    case SelectStrategy.SELECT: 
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            // 判断队列中是否存在任务
                            if (!hasTasks()) { 
                                // 如果不存在  调用select()进行获取
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            // 处理任务的核心逻辑
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

processSelectedKeys()是处理任务的核心逻辑,来自于NioEventLoop的run()方法调用

// 处理事件集合
    private void processSelectedKeys() {
        // 如果selectedKeys(事件集合)没有值,重新获取,如果有值,直接处理
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    
     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            // 这是注册时,存储的附加对象,即为通道对象channel
            final Object a = k.attachment();
            i.remove();
            if (a instanceof AbstractNioChannel) {
                // 处理具体事件
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            if (!i.hasNext()) {
                break;
            }
            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();
                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }
    
    // 处理具体事件
    //  判断事件类型,调用对应的逻辑处理
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }
        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

其中读事件的处理

unsafe.read();
[AbstractNioByteChannel]
----------------------------------------------------------------------------
public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);
            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }

4、Bootstrap

引导,对应用程序进行配置,并让他运行起来的过程。

1)配置

必选参数:group 、 channel、 handler(服务端 -- childHandler)

group(): 指定一个到两个reactor

channel():指定channel工厂,反射的方式创建channel使用

handler():指定reactor的处理器,其中childHandler指定的是,服务端所接收到的客户端channel使用的处理器,而服务端的主reactor(bossGroup),已经默认添加了acceptor处理器,所以可以不指定。

option():指定TCP相关的参数,以及netty自定义的参数

配置参数的过程,称之为初始化。

2)运行

// 启动并设置端口号  但需要使用sync异步启动
        try {
            ChannelFuture future = serverBootstrap.bind(8888).sync();
            // 将关闭通道的方式 也设置为异步的
            //    阻塞finally中的代码执行
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

bind(),将服务端的channel绑定到端口号,然后接收客户端的连接,让整个netty运行起来。

sync(),因为绑定事件是异步的,所以使用sync同步等待结果,换句话说,bind只触发了绑定端口的事件,需要使用sync等待事件执行的结果。

future.channel().closeFuture().sync(),含义为,当通道被关闭时,才执行后续的操作,sync使当前线程执行到此处阻塞,以确保不执行后续的shutdown方法。

3)源码解析

A 类声明
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

嵌套的泛型使用,可以达到,子类中返回子类本身的效果,具体如下:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
B  init方法

工作内容

a、设置channel相关的选项参数
b、设置channel的属性键值对
c、添加对channel的IO事件处理器 (Acceptor角色)

void init(Channel channel) {
        // 设置channel相关的选项参数
        setChannelOptions(channel, newOptionsArray(), logger);
        // 设置channel的属性键值对
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                
                // 添加对channel的IO事件处理器 (Acceptor角色)
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

Acceptor分析

功能:将主reactor接收到的客户端通道,传递给从reactor

// ServerBootstrapAcceptor是ServerBootstrap的静态内部类
// netty将acceptor看作一个处理器,并且是入站处理器
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
// 具体的逻辑封装到channelRead()方法中
// 对客户端通道进行配置 , 然后注册到从Reactor中
public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 此时msg对应 服务端接收到的客户端通道
            final Channel child = (Channel) msg;
        // 设置处理链路
            child.pipeline().addLast(childHandler);
      // 设置通道的配置项和参数
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);
            try {
                // childGroup 是从reactor的资源池 调用注册方法 注册客户端通道child
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {          
                        // 增加监听 获取注册的异步结果
                        // 如果注册失败 或者抛出异常  都会关闭channel
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
        
        
        // 如果处理客户端连接失败了,暂停一秒,然后继续接受
        // 为保障服务端能够尽可能多的处理客户端的连接  不受某一次处理失败的结果影响
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final ChannelConfig config = ctx.channel().config();
            if (config.isAutoRead()) {
                // stop accept new connections for 1 second to allow the channel to recover
                // See https://github.com/netty/netty/issues/1328
                config.setAutoRead(false);
                ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
            }
            // still let the exceptionCaught event flow through the pipeline to give the user
            // a chance to do something with it
            ctx.fireExceptionCaught(cause);
        }

4) Future和Promise

Future代表的是,一个还未完成的异步任务的执行结果。可以通过addListener方法,监听执行结果后进行相应的处理,此时它的状态可以分为未完成、已完成(成功、失败、主动取消)等。

对Future而言,状态只能读取,无法更改,又出现了Promise,但是Promise只能更改一次。

参照生活中的定额发票(Future)和机打发票(Promise)。

(四)拓展篇

1、心跳检测

检测逻辑:
1) 服务端启动,客户端建立连接,连接的目的是互相发送消息。
2) 如果客户端在工作,服务端一定能收到数据,如果客户端空闲,服务端会出现资源浪费。
3) 服务端需要一种检测机制,验证客户端的活跃状态,不活跃则关闭。

需求设计:
1) 客户端向服务端发送 “I am alive” , sleep一个随机时间,模拟空闲状态
2) 服务端收到消息后,返回“over”, 客户端有空闲,记录空闲次数
3) 设定阈值,达到阈值时主动关闭连接

IdleStateHandler , 是netty提供的处理器
1)超过多长时间没有读   readerIdleTime

  1. 超过多长时间没有写   writerIdleTime
  2. 超过多长时间没有读和写   allIdleTime

底层实现检测的是 IdleStateEvent事件,通过管道传递给下一个handler处理,处理方法是userEventTriggered。

其中IdleStateEvent事件,分为READER_IDLE、WRITER_IDLE、ALL_IDLE三大类

2、TCP粘包拆包

TCP是基于流的。

当客户端发送多个包,服务端只收到一个包,此时发生了粘包。
当客户端发送多个包,服务端也接收到多个包,但是包是不完整,多了或少了数据,此时发生了拆包。

UDP会发生拆包和粘包吗?
不会,UDP是基于报文的,在UDP的首部使用16bit存储报文的长度,因此不会发生。

TCP发生粘包和拆包的本质原因:
要发送的数据先经过TCP的缓冲区,还限制了最大报文长度。
A  如果要发送的数据 > TCP剩余的缓冲区大小,发生拆包
B  如果要发送的数据 >  最大报文长度,发生拆包
C  如果要发送的数据 << TCP剩余的缓冲区大小,发生粘包
D  接收数据的应用层,没有及时读取缓冲区数据,也会发生粘包

解决办法:
A  设置出消息的长度
B  设置出消息的边界——分隔符

Netty提供的解码器,两类

A 基于长度的解码器,在包头部设置出数据的长度。(类似于UDP的头部处理)
LengthFieldBasedFrameDecoder  自定义长度的处理方式
FixedLengthFrameDecoder   固定长度的处理方式

B 基于分隔符的解码器
DelimiterBasedFrameDecoder  自定义分隔符的处理方式
LineBasedFrameDecoder   行尾("\n"或"\r\n")分隔符的处理方式

【Demo逻辑】
需求:客户端循环100次向服务端请求时间

1)第一种方式,传输的过程数据单位是字节流ByteBuf,需要自行处理分隔符以及数据的长度,此时会出现粘包和拆包的问题

2)第二种方式,使用LineBasedFrameDecoder,配合StringDecoder使用,传输的数据单位变成字符串,可以直接处理,保证业务逻辑上的包和真正传输的包是一致的

3、序列化框架——protobuf

protobuf = protocol buffers
类似于xml的生成和解析,但效率更高,生成的是字节码,可读性稍差。

【Demo逻辑】

1)安装idea插件,protobuf support

          如果安装之后,创建*.proto文件没有使用插件,手动设置关联关系

          settings ->  file types  ->  找到protobuf ->  增加正则表达式

2)引入maven依赖和插件

<properties>
    <os.detected.classifier>windows-x86_64</os.detected.classifier>
</properties>
<build>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.0</version>
                <configuration>
                    <protocArtifact>
                        com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
                    </protocArtifact>
                    <pluginId>grpc-java</pluginId>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3)在右侧maven project中可以找到相应的插件 (没有的话刷新)

4)在和java平级的目录下,创建proto文件夹,然后创建person.proto文件
5)person.proto

// 声明包名称的空间
syntax="proto3";
// 具体的类生成目录
option java_package="com.duing";
// 具体的类名
option java_outer_classname="PersonModel";
// 类结构
message Person{
    int32 id = 1;    // 此处的1代表顺序
    string name = 2;
}
  1. 使用插件进行编译,将编译生成的代码拷贝到需要的目录下
    7)编写测例进行序列化和反序列化操作
目录
相关文章
|
23天前
|
监控 安全
从 Racket 语言出发,创新员工网络监控软件的框架
在数字化企业环境中,员工网络监控软件对于保障信息安全和提升效率至关重要。Racket 语言凭借其独特特性和强大功能,为开发创新的监控软件提供了新可能。通过捕获和分析网络数据包、记录员工网络活动日志,甚至构建复杂的监控框架,Racket 能够满足企业的定制化需求,为企业信息安全和管理提供强有力支持。未来,基于 Racket 的创新解决方案将不断涌现。
35 6
|
6天前
|
机器学习/深度学习 人工智能
类人神经网络再进一步!DeepMind最新50页论文提出AligNet框架:用层次化视觉概念对齐人类
【10月更文挑战第18天】这篇论文提出了一种名为AligNet的框架,旨在通过将人类知识注入神经网络来解决其与人类认知的不匹配问题。AligNet通过训练教师模型模仿人类判断,并将人类化的结构和知识转移至预训练的视觉模型中,从而提高模型在多种任务上的泛化能力和稳健性。实验结果表明,人类对齐的模型在相似性任务和出分布情况下表现更佳。
19 3
|
25天前
|
安全 网络安全 区块链
网络安全与信息安全:构建数字世界的防线在当今数字化时代,网络安全已成为维护个人隐私、企业机密和国家安全的重要屏障。随着网络攻击手段的不断升级,从社交工程到先进的持续性威胁(APT),我们必须采取更加严密的防护措施。本文将深入探讨网络安全漏洞的形成原因、加密技术的应用以及提高公众安全意识的重要性,旨在为读者提供一个全面的网络安全知识框架。
在这个数字信息日益膨胀的时代,网络安全问题成为了每一个网民不可忽视的重大议题。从个人信息泄露到企业数据被盗,再到国家安全受到威胁,网络安全漏洞如同隐藏在暗处的“黑洞”,时刻准备吞噬掉我们的信息安全。而加密技术作为守护网络安全的重要工具之一,其重要性不言而喻。同时,提高公众的安全意识,也是防范网络风险的关键所在。本文将从网络安全漏洞的定义及成因出发,解析当前主流的加密技术,并强调提升安全意识的必要性,为读者提供一份详尽的网络安全指南。
|
2月前
|
存储 SQL 安全
网络安全与信息安全:守护数字世界的坚盾在这个高度数字化的时代,网络安全和信息安全已经成为个人、企业乃至国家安全的重要组成部分。本文将深入探讨网络安全漏洞、加密技术以及安全意识的重要性,旨在为读者提供一个全面的网络安全知识框架。
随着互联网技术的飞速发展,网络安全问题日益凸显。从个人信息泄露到企业数据被盗,再到国家安全受到威胁,网络安全事件层出不穷。本文将从网络安全漏洞的定义与分类入手,探讨常见的网络攻击手段;随后深入解析加密技术的原理及其在保护信息安全中的作用;最后强调提升公众与企业的安全意识的重要性,并提出具体的建议。通过综合运用这些知识点,我们可以更好地构建起一道道坚固的防线,守护我们的数字世界。
|
2月前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
25天前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
30 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
17天前
|
机器学习/深度学习 数据采集 算法
目标分类笔记(一): 利用包含多个网络多种训练策略的框架来完成多目标分类任务(从数据准备到训练测试部署的完整流程)
这篇博客文章介绍了如何使用包含多个网络和多种训练策略的框架来完成多目标分类任务,涵盖了从数据准备到训练、测试和部署的完整流程,并提供了相关代码和配置文件。
31 0
目标分类笔记(一): 利用包含多个网络多种训练策略的框架来完成多目标分类任务(从数据准备到训练测试部署的完整流程)
|
2月前
|
存储 机器人 Linux
Netty(二)-服务端网络编程常见网络IO模型讲解
Netty(二)-服务端网络编程常见网络IO模型讲解
完成切换网络+修改网络连接图标提示的代码框架
完成切换网络+修改网络连接图标提示的代码框架
|
2月前
|
设计模式 缓存 算法
Netty框架的重要性
Netty框架的重要性