阅读本篇可能需要的预备知识 《试图取代 TCP 的 QUIC 协议到底是什么》、《抓包与反抓包》、《趣谈网络协议》
一. 前言
webSocket
在 流媒体和im
有比较多的实践,小编有幸加入了某直播公司,刚好有时间沉淀一下自己的技术栈,所以给带大家写一篇关于webSocket
的理论教程,希望大家喜欢。
二. webSocket
背景和疑问
在im
开发中,当我们谈到长连接
和连接重试
时候,通常都第一时间想到webSocket
, 问题来了:
- 什么是
webSocket
? - 为什么要有
webSocket
? webSocket
有哪些好处?webSocket
的特点?socket
通信步骤是怎样的?
- 如果让你实现一个
socket
你会怎么做?
小朋友,你是否有很多问号?
本文介绍的内容会详细解释以上问题,并在最后给解答。稳住,别慌~
webSocket
和 Socket 没有半毛钱关系,不过在这里我们还是先温故一下 Socket
的知识。免得大家搞混了
什么是Socket?
- 即套接字,是一个对 TCP / IP协议进行封装 的编程调用接口(API) 用來描述IP地址和端口,是通信链的句柄,应用程序可以通过Socket向网络发送请求或者 应答网络请求!Socket是支持TCP/IP协议的网络通信的基本操作单元,是对网络通信过程 中端点的抽象表示,包含了进行网络通信所必须的五种信息
- 连接所使用的的协议
- 本地主机的IP地址
- 本地远程的协议端口
- 远程主机的IP地址
- 远地进程的协议端口
- 即通过
Socket
,我们才能在Andorid平台上通过TCP/IP
协议进行开发Socket
不是一种协议,而是一个编程调用接口(API
),属于传输层(主要解决数据如何在网络中传输)- 成对出现,一对套接字
Socket通信步骤
- Step 1:创建ServerSocket和Socket
- Step 2:打开连接到的Socket的输入/输出流
- Step 3:按照协议对Socket进行读/写操作
- Step 4:关闭输入输出流,以及Socket
Socket服务端的编写
- Step 1:创建ServerSocket对象,绑定监听的端口
- Step 2:调用accept()方法监听客户端的请求
- Step 3:连接建立后,通过输入流读取客户端发送的请求信息
- Step 4:通过输出流向客户端发送响应信息
- Step 5:关闭相关资源
public static void main(String[] args) throws IOException { //1.创建一个服务器端Socket,即ServerSocket,指定绑定的端口,并监听此端口 ServerSocket serverSocket = new ServerSocket(12345); InetAddress address = InetAddress.getLocalHost(); String ip = address.getHostAddress(); Socket socket = null; //2.调用accept()等待客户端连接 System.out.println("~~~服务端已就绪,等待客户端接入~,服务端ip地址: " + ip); socket = serverSocket.accept(); //3.连接后获取输入流,读取客户端信息 InputStream is=null; InputStreamReader isr=null; BufferedReader br=null; OutputStream os=null; PrintWriter pw=null; is = socket.getInputStream(); //获取输入流 isr = new InputStreamReader(is,"UTF-8"); br = new BufferedReader(isr); String info = null; while((info=br.readLine())!=null){//循环读取客户端的信息 System.out.println("客户端发送过来的信息" + info); } socket.shutdownInput();//关闭输入流 socket.close(); }
Socket客户端的编写
- Step 1:创建Socket对象,指明需要链接的服务器的地址和端号
- Step 2:链接建立后,通过输出流向服务器发送请求信息
- Step 3:通过输出流获取服务器响应的信息
- Step 4:关闭相关资源
public static void main(String ... args) throws Exception{ //1.创建客户端Socket,指定服务器地址和端口 Socket socket = new Socket("127.0.0.1", 12345); //2.获取输出流,向服务器端发送信息 OutputStream os = socket.getOutputStream();//字节输出流 PrintWriter pw = new PrintWriter(os);//将输出流包装为打印流 //获取客户端的IP地址 InetAddress address = InetAddress.getLocalHost(); String ip = address.getHostAddress(); pw.write("客户端:~" + ip + "~ 接入服务器!!"); pw.flush(); socket.shutdownOutput();//关闭输出流 socket.close(); }
三. webSocket
在客户端的应用
WebSocket组件
Java-WebSocket 在 Android客户端开发中已经非常成熟 他实现了与服务端建立长连接,并可以在连接中主动或者被动发送消息,关闭socket连接和保持长连接状态等一系列服务。下面我就基于此框架进行原理性分析,后再带大家过一遍webSocket
的基础概念,相信先实践后理解基础能更快的帮助大家吸收
3.1 构建WebSocketClient
WebSocketClient
放到最先讲,是因为他太重要了,基本上是连接IM的枢纽,首先我们看一下他核心的一个线程类WebsocketWriteThread
,WebSocketClient
的数据写入工作基本上都在这个线程完成的,在初始化WebSocketClient
我们发现其实本质上是初始化我们的WebSocketImpl
,WebSocketImpl
是干什么用的呢?他其实处理的是单个WebSocketImpl连接的一端(客户端或服务器)照顾“握手”阶段,然后允许通过基于事件的模型轻松发送ext帧和接收帧详细介绍可以参考下文的 建立socket连接通道
public void run() { InputStream istream; try { boolean isNewSocket = false; if (socketFactory != null) { socket = socketFactory.createSocket(); } else if( socket == null ) { socket = new Socket( proxy ); isNewSocket = true; } else if( socket.isClosed() ) { throw new IOException(); } // 设置Tcp没有被连接的 socket.setTcpNoDelay( isTcpNoDelay() ); // TCP连接关闭后,连接可能会保留,连接后一段时间内处于超时状态.可能会触发连接重试机制 socket.setReuseAddress( isReuseAddr() ); if (!socket.isBound()) { InetSocketAddress addr = new InetSocketAddress(dnsResolver.resolve(uri), this.getPort()); socket.connect(addr, connectTimeout); } // 如果socket设置给其他连接我们不需要设置 if (isNewSocket && "wss".equals( uri.getScheme())) { // 确定TLS版本号 SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); sslContext.init(null, null, null); // 使用工厂策阅模式创建socket连接 SSLSocketFactory factory = sslContext.getSocketFactory(); socket = factory.createSocket(socket, uri.getHost(), getPort(), true); } istream = socket.getInputStream(); ostream = socket.getOutputStream(); sendHandshake(); } catch ( /*IOException | SecurityException | UnresolvedAddressException | InvalidHandshakeException | ClosedByInterruptException | SocketTimeoutException */Exception e ) { // WebSocket 连接错误 onWebsocketError( engine, e ); engine.closeConnection( CloseFrame.NEVER_CONNECTED, e.getMessage() ); return; } // 构建一个可写的Websocket线程 writeThread = new Thread( new WebsocketWriteThread(this) ); writeThread.start(); byte[] rawbuffer = new byte[ WebSocketImpl.RCVBUF ]; int readBytes; try { while ( !isClosing() && !isClosed() && ( readBytes = istream.read( rawbuffer ) ) != -1 ) { engine.decode( ByteBuffer.wrap( rawbuffer, 0, readBytes ) ); } engine.eot(); } catch ( IOException e ) { handleIOException(e); } catch ( RuntimeException e ) { // this catch case covers internal errors only and indicates a bug in this websocket implementation onError( e ); // 关闭连接 engine.closeConnection( CloseFrame.ABNORMAL_CLOSE, e.getMessage() ); } connectReadThread = null; }
WebSocketClient
是一个必须实现 onClose
, onOpen
,onMessage
的WebSocket客户端类,在运行时,用户可以通过connect
建立连接,在onMessage
里将连接请求报文通过send
方法发送给我们服务端,这边利用的是WebSocketFactory
工厂模式设计的WebSocketServer
服务通道,我们来看一下WebSocketClient是怎么构建的吧~
3.1.1 构建WebSocketClient实例,并将其设置为连接到指定的URI。通道未尝试自动连接
public WebSocketClient( URI serverUri , Draft protocolDraft , Map<String,String> httpHeaders , int connectTimeout ) { if( serverUri == null ) { throw new IllegalArgumentException(); } else if( protocolDraft == null ) { throw new IllegalArgumentException( "null as draft is permitted for `WebSocketServer` only!" ); } this.uri = serverUri; this.draft = protocolDraft; this.dnsResolver = new DnsResolver() { @Override public InetAddress resolve(URI uri) throws UnknownHostException { return InetAddress.getByName(uri.getHost()); } }; this.headers = httpHeaders; this.connectTimeout = connectTimeout; setTcpNoDelay( false ); setReuseAddr( false ); this.engine = new WebSocketImpl( this, protocolDraft ); }
3.1.2 重置所有相关内容允许重新连接
private void reset() { Thread current = Thread.currentThread(); if (current == writeThread || current == connectReadThread) { throw new IllegalStateException("You cannot initialize a reconnect out of the websocket thread. Use reconnect in another thread to insure a successful cleanup."); } try { closeBlocking(); if( writeThread != null ) { this.writeThread.interrupt(); this.writeThread = null; } if( connectReadThread != null ) { this.connectReadThread.interrupt(); this.connectReadThread = null; } this.draft.reset(); if( this.socket != null ) { this.socket.close(); this.socket = null; } } catch ( Exception e ) { onError( e ); engine.closeConnection( CloseFrame.ABNORMAL_CLOSE, e.getMessage() ); return; } connectLatch = new CountDownLatch( 1 ); closeLatch = new CountDownLatch( 1 ); this.engine = new WebSocketImpl( this, this.draft ); }
3.1.3 connect
非阻塞式的开启websocket
连接
public void connect() { if( connectReadThread != null ) throw new IllegalStateException( "WebSocketClient objects are not reuseable" ); connectReadThread = new Thread( this ); connectReadThread.setName( "WebSocketConnectReadThread-" + connectReadThread.getId() ); connectReadThread.start(); }
3.1.4 connectBlocking
阻塞式的开启websocket
连接
public boolean connectBlocking() throws InterruptedException { connect(); connectLatch.await(); return engine.isOpen(); }
3.1.5 close
非租塞式关闭websocket
握手过程
public void close() { if( writeThread != null ) { engine.close( CloseFrame.NORMAL ); } }
3.2 建立socket
连接通道
socket最核心的功能是能通过长连接收到服务端的主动推送消息并对外通知,也能将长连接中发包和回包的关联逻辑封装为流程操作。下面我们来看一下WebSocket
连接过程吧~
3.2.1 关闭socket链接(#WebSocketImpl@line508)
/** * 会在没有握手的前提下突然关闭连接 */ public synchronized void closeConnection( int code, String message, boolean remote ) { if( readyState == ReadyState.CLOSED ) { return; } // 像eot()之类的方法无需调用onClose()即可调用此方法。因此,我们必须手动调整ReadyState if( readyState == ReadyState.OPEN ) { if( code == CloseFrame.ABNORMAL_CLOSE ) { readyState = ReadyState.CLOSING; } } if( key != null ) { // key.attach( null ); //see issue #114 key.cancel(); } if( channel != null ) { try { channel.close(); } catch ( IOException e ) { if( e.getMessage().equals( "Broken pipe" ) ) { log.trace( "Caught IOException: Broken pipe during closeConnection()", e ); } else { log.error("Exception during channel.close()", e); wsl.onWebsocketError( this, e ); } } } try { this.wsl.onWebsocketClose( this, code, message, remote ); } catch ( RuntimeException e ) { wsl.onWebsocketError( this, e ); } if( draft != null ) draft.reset(); handshakerequest = null; readyState = ReadyState.CLOSED; }
3.2.2 发送关闭握手请求(#WebSocketImpl@line446)
public synchronized void close( int code, String message, boolean remote ) { // 如果当前websocket所处状态不是关闭或者即将关闭状态 if( readyState != ReadyState.CLOSING && readyState != ReadyState.CLOSED ) { if( readyState == ReadyState.OPEN ) { if( code == CloseFrame.ABNORMAL_CLOSE ) { assert ( !remote ); readyState = ReadyState.CLOSING ; flushAndClose( code, message, false ); return; } if( draft.getCloseHandshakeType() != CloseHandshakeType.NONE ) { try { if( !remote ) { try { wsl.onWebsocketCloseInitiated( this, code, message ); } catch ( RuntimeException e ) { wsl.onWebsocketError( this, e ); } } if( isOpen() ) { CloseFrame closeFrame = new CloseFrame(); closeFrame.setReason( message ); closeFrame.setCode( code ); closeFrame.isValid(); sendFrame( closeFrame ); } } catch ( InvalidDataException e ) { log.error("generated frame is invalid", e); wsl.onWebsocketError( this, e ); flushAndClose( CloseFrame.ABNORMAL_CLOSE, "generated frame is invalid", false ); } } flushAndClose( code, message, remote ); } else if( code == CloseFrame.FLASHPOLICY ) { assert ( remote ); flushAndClose( CloseFrame.FLASHPOLICY, message, true ); } else if( code == CloseFrame.PROTOCOL_ERROR ) { // this endpoint found a PROTOCOL_ERROR flushAndClose( code, message, remote ); } else { flushAndClose( CloseFrame.NEVER_CONNECTED, message, false ); } readyState = ReadyState.CLOSING; tmpHandshakeBytes = null; return; }
3.2.2 建立webSocket
后发送请求(#WebSocketImpl@line639)
private void send( Collection<Framedata> frames ) { if( !isOpen() ) { throw new WebsocketNotConnectedException(); } if( frames == null ) { throw new IllegalArgumentException(); } ArrayList<ByteBuffer> outgoingFrames = new ArrayList<ByteBuffer>(); for( Framedata f : frames ) { log.trace( "send frame: {}", f); outgoingFrames.add( draft.createBinaryFrame( f ) ); } write( outgoingFrames );
3.2.3 获取webSocket
远程请求地址
@Override public InetSocketAddress getRemoteSocketAddress() { return engine.getRemoteSocketAddress();
3.2.4 获取webSocket local
终端地址
@Override public InetSocketAddress getRemoteSocketAddress() { return engine.getRemoteSocketAddress();
3.3 构建WebSocketServer
WebSocketServer
构建策阅用的是工厂设计模式,首先我们可以看一下WebSocketServerFactory
封装了Websocket
工厂所需方法的接口,例如:如何创建一个Websocket
,怎样彻底关闭websocket
,以及允许包装Socketchannel(key.channel())
以在ws层
之外插入协议层
等等
public interface WebSocketServerFactory extends WebSocketFactory { @Override WebSocketImpl createWebSocket( WebSocketAdapter a, Draft d); @Override WebSocketImpl createWebSocket( WebSocketAdapter a, List<Draft> drafts ); ByteChannel wrapChannel(SocketChannel channel, SelectionKey key ) throws IOException; void close();
接下来,我们看一下他具体的派生类实现吧,默认策阅DefaultSSLWebSocketServerFactory
也是基于SocketChannel
进行ssl握手连接,网关授权等操作
@Override public ByteChannel wrapChannel( SocketChannel channel, SelectionKey key ) throws IOException { SSLEngine e = sslcontext.createSSLEngine(); List<String> ciphers = new ArrayList<String>( Arrays.asList(e.getEnabledCipherSuites())); ciphers.remove("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"); e.setEnabledCipherSuites( ciphers.toArray( new String[ciphers.size()] ) ); e.setUseClientMode( false ); return new SSLSocketChannel2( channel, e, exec, key ); }
我们再跟一下SSLSocketChannel2
源码,看一下里面的内部实现。里面有一个很重要的方法processHandshake
负责执行处理sslengine握手的所有操作
private synchronized void processHandshake() throws IOException { // 当前ssl没有握手 if( sslEngine.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING ) return; // 由于可以从读取线程或写入线程中调用它,并且由于此方法是同步的,因此有必要仔细检查我们是否仍在握手。 if( !tasks.isEmpty() ) { Iterator<Future<?>> it = tasks.iterator(); while ( it.hasNext() ) { Future<?> f = it.next(); if( f.isDone() ) { it.remove(); } else { if( isBlocking() ) consumeFutureUninterruptible( f ); return; } } } // 需要从远程端接收数据,然后握手才能继续 if( sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP ) { if( !isBlocking() || readEngineResult.getStatus() == Status.BUFFER_UNDERFLOW ) { inCrypt.compact(); // socket通道里面读数据 int read = socketChannel.read( inCrypt ); if( read == -1 ) { throw new IOException( "connection closed unexpectedly by peer" ); } inCrypt.flip(); } inData.compact(); unwrap(); // 当前已经结束了握手过程 if( readEngineResult.getHandshakeStatus() == HandshakeStatus.FINISHED ) { // SSLSession 需要被回调 createBuffers( sslEngine.getSession() ); return; } } // 执行exec任务 consumeDelegatedTasks(); if( tasks.isEmpty() || sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_WRAP ) { socketChannel.write( wrap( emptybuffer ) ); if( writeEngineResult.getHandshakeStatus() == HandshakeStatus.FINISHED ) { createBuffers( sslEngine.getSession() ); return; } } // 除非出现#190,否则此函数只能在调用createBuffers之后离开NOT_HANDSHAKING, // 这意味着nio包装/展开不会返回HandshakeStatus assert ( sslEngine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING );。 // 查看变量声明为什么存在这行和#190。如果没有此行, // 重新握手发生时将不会重新创建缓冲区。 bufferallocations = 1; }
3.3.1 判断webSocket
当前连接状态
- WebSocketClient@method getReadyState
通过这个组件的接口,除了可以直接收到服务端通知的回调外,也可以像使用Http一样调方便地调用长连接,而不用关心具体协议和底层实现原理。
3.3.2 webSocket
监听事件 WebSocketListener
,回调事件交给WebSocketAdapter
处理
public interface WebSocketListener { ServerHandshakeBuilder onWebsocketHandshakeReceivedAsServer( WebSocket conn, Draft draft, ClientHandshake request ) throws InvalidDataException; void onWebsocketHandshakeReceivedAsClient( WebSocket conn, ClientHandshake request, ServerHandshake response ) throws InvalidDataException; void onWebsocketHandshakeSentAsClient( WebSocket conn, ClientHandshake request ) throws InvalidDataException; void onWebsocketMessage( WebSocket conn, String message ); void onWebsocketMessage( WebSocket conn, ByteBuffer blob ); void onWebsocketOpen( WebSocket conn, Handshakedata d ); void onWebsocketClose( WebSocket ws, int code, String reason, boolean remote ); void onWebsocketClosing( WebSocket ws, int code, String reason, boolean remote ); void onWebsocketCloseInitiated( WebSocket ws, int code, String reason ); void onWebsocketError( WebSocket conn, Exception ex ); void onWebsocketPing( WebSocket conn, Framedata f ); void onWebsocketPong( WebSocket conn, Framedata f ); void onWriteDemand( WebSocket conn ); InetSocketAddress getLocalSocketAddress( WebSocket conn ); InetSocketAddress getRemoteSocketAddress( WebSocket conn );}
我们来看一下WebSocketAdapter
具体业务实现,onWebsocketHandshakeReceivedAsServer
处理的是握手过程中的状态信息。其余方法都是空实现,方便交个子类去处理,接下来我们看一下AbstractWebSocket
派生子类是做了哪些逻辑扩展,这是典型的适配器设计模式。
- setConnectionLostTimeout 设置
WebSocket
间隔时间失效设置,小于或等于0的值取消检查
public void setConnectionLostTimeout( int connectionLostTimeout ) { synchronized (syncConnectionLost) { this.connectionLostTimeout = connectionLostTimeout; if (this.connectionLostTimeout <= 0) { log.trace("Connection lost timer stopped"); cancelConnectionLostTimer(); return; } if (this.websocketRunning) { log.trace("Connection lost timer restarted"); //重置所有的Ping事件 try { ArrayList<WebSocket> connections = new ArrayList<>(getConnections()); WebSocketImpl webSocketImpl; for (WebSocket conn : connections) { if (conn instanceof WebSocketImpl) { webSocketImpl = (WebSocketImpl) conn; webSocketImpl.updateLastPong(); } } } catch (Exception e) { log.error("Exception during connection lost restart", e); } restartConnectionLostTimer(); } } }
- startConnectionLostTimer
开始websocket
定时连接
protected void startConnectionLostTimer() { synchronized (syncConnectionLost) { if (this.connectionLostTimeout <= 0) { log.trace("Connection lost timer deactivated"); return; } log.trace("Connection lost timer started"); this.websocketRunning = true; restartConnectionLostTimer(); } }
- cancelConnectionLostTimer
取消websocket
连接
private void cancelConnectionLostTimer() { if( connectionLostTimer != null ) { connectionLostTimer.cancel(); connectionLostTimer = null; } if( connectionLostTimerTask != null ) { connectionLostTimerTask.cancel(); connectionLostTimerTask = null; } }
- isTcpNoDelay 测试Nagletcp延迟算法是否停止
public boolean isTcpNoDelay() { return tcpNoDelay; }
- stopConnectionLostTimer
- 当丢失timer的时候,我们应该立即关闭
WebSocket
连接
protected void stopConnectionLostTimer() { synchronized (syncConnectionLost) { if (connectionLostTimer != null || connectionLostTimerTask != null) { this.websocketRunning = false; log.trace("Connection lost timer stopped"); cancelConnectionLostTimer(); } } }
3.3.3 WebSocketServer
负责处理整个握手过程,派生类决定是否向否向服务器添加新的功能
public void run() { if (!doEnsureSingleThread()) { return; } if (!doSetupSelectorAndServerThread()) { return; } try { int iShutdownCount = 5; int selectTimeout = 0; while ( !selectorthread.isInterrupted() && iShutdownCount != 0) { SelectionKey key = null; WebSocketImpl conn = null; try { if (isclosed.get()) { selectTimeout = 5; } int keyCount = selector.select( selectTimeout ); if (keyCount == 0 && isclosed.get()) { iShutdownCount--; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> i = keys.iterator(); while ( i.hasNext() ) { key = i.next(); conn = null; if( !key.isValid() ) { continue; } if( key.isAcceptable() ) { // 执行Socket连接操作 doAccept(key, i); continue; } if( key.isReadable() && !doRead(key, i)) { continue; } if( key.isWritable() ) { doWrite(key); } } // 处理可读操作 doAdditionalRead(); } catch ( CancelledKeyException e ) { // an other thread may cancel the key } catch ( ClosedByInterruptException e ) { return; // do the same stuff as when InterruptedException is thrown } catch ( IOException ex ) { if( key != null ) key.cancel(); handleIOException( key, conn, ex ); } catch ( InterruptedException e ) { // 线程打断操作 Thread.currentThread().interrupt(); } } } catch ( RuntimeException e ) { // 希望永远不要发送 handleFatal( null, e ); } finally { // 处理服务器关闭操作 doServerShutdown(); } }
看完源码,我们基本上就可以解释上面问的webSocket
基础概念啦~