网络与RPC
标签 : Java基础
Java为网络编程提供的
java.net
包封装了底层通信细节, 包含了大量的基础组件以及TCP/UDP协议的编程接口, 使得开发者可以专注于解决问题, 而不用关注通信细节.
基础组件
java.net
包下包含如下网络基础组件:
InetAddress
代表一个IP地址,提供了获取IP地址、主机名、域名,以及测试IP地址是否可达等方法;Inet4Address
/Inet6Address
是其两个子类,分别代表IPv4与IPv6.URLDecoder
、URLEncoder
完成普通字符串和application/x-www-form-urlencoded
字符串的解码/编码.当字符串包含非西欧字符时,需要将其编码成
application/x-www-form-urlencoded
MIME字符. 编码规则是每个中文字符占两个字节, 每个字节转换成两个十六进制的数字,因此每个中文字符将转换成%XX%XX
形式.当然,采用不同的字符编码会使每个中文字符转换的字节数并不完全相同,所以URLDecoder
/URLEncoder
使用时需要指定字符集.URL
、URI
URL(Uniform Resource Locator)
对象代表统一资源定位器,他是一根指向互联网”资源”的指针,它包含了一条可打开的能够到达资源的输入流, 因此他提供了很多方法访问来互联网内容openStream()
.URI(Uniform Resource Identifiers)
对象代表统一资源标识符,Java中的URI
不能用访问任何资源, 其作用仅限于定位/解析.URLConnection
URL
的openConnection()
方法返回一个URLConnection
,代表一条与URL
所指向的远程资源的连接, 因此可以通过URLConnection
实例向URL
发送GET
/POST
请求,访问资源.- 发送
GET
请求只需将请求参数放在URL字符串之后,以?
/&
隔开,直接调用URLConnection
的getInputStream()
方法即可; - 发送
POST
请求则需要先设置doIn/doOut 两个请求头(对应setDoInput(boolean doinput)
/setDoOutput(boolean dooutput)
), 再使用对应的OutputStream
来发送请求数据.
- 发送
TCP协议
TCP是一种可靠的传输层协议,它在通信两端各建立一个Socket,从而在C/S之间形成一条虚拟的通信链路,然后两端基于这条虚拟链路进行通信(关于TCP协议的详细介绍可参考的博客TCP/IP入门(3) –传输层).Java对TCP提供了良好的封装,使用
Socket
对象来代表两端通信接口,并通过Socket
产生的IO流来进行网络通信.
ServerSocket
Java使用ServerSocket
接收来自Client的Socket连接(ServerSocket
向开发者隐藏了很多底层通信细节,如bind()
/listen()
等, 详见博客 Socket实践(2) -导引). 如果没有连接的话他将一直处于等待状态.因此ServerSocket
提供如下常用方法:
方法 | 描述 |
---|---|
Socket accept() |
Listens for a connection to be made to this socket and accepts it. |
void bind(SocketAddress endpoint, int backlog) |
Binds the ServerSocket to a specific address (IP address and port number). |
static void setSocketFactory(SocketImplFactory fac) |
Sets the server socket implementation factory for the application. |
void close() |
Closes this socket. |
ServerSocket
还提供了一些设置连接标志的方法如: SO_RCVBUF/ SO_REUSEADDR/SO_TIMEOUT.
Socket
当客户端执行到如下代码时,该Client会连接到指定服务器:
Socket socket = new Socket("127.0.0.1", 8081);
让服务端ServerSocket
的accept()
方法继续执行,于是Server与Client就产生一对互相连接的Socket
.此时程序就无须再分服务端/客户端,而是通过各自的Socket
进行通信. Socket
提供了如下常用方法:
方法 | 描述 |
---|---|
void bind(SocketAddress bindpoint) |
Binds the socket to a local address. |
void connect(SocketAddress endpoint, int timeout) |
Connects this socket to the server with a specified timeout value. |
InputStream getInputStream() |
Returns an input stream for this socket. |
OutputStream getOutputStream() |
Returns an output stream for this socket. |
void close() |
Closes this socket. |
Socket
同样也提供了设置连接标志位的方法:SO_KEEPALIVE/OOBINLINE/SO_RCVBUF/SO_REUSEADDR/SO_SNDBUF/SO_LINGER/SO_TIMEOUT/TCP_NODELAY
当使用getInputStream()
/getOutputStream()
拿到InputStream
/OutputStream
之后, 我们就可以使用IO相关知识来顺畅的通信了(详见博客 Java I/O).
- Server
/**
* Echo 回声服务器
*
* @author jifang
* @since 16/8/4 下午4:07.
*/
public class Server {
private static final ExecutorService executer = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(8088);
while (true) {
Socket client = server.accept();
executer.submit(new ClientTask(client));
}
}
private static final class ClientTask implements Runnable {
private Socket client;
public ClientTask(Socket client) {
this.client = client;
}
@Override
public void run() {
try (InputStream in = client.getInputStream();
OutputStream out = client.getOutputStream()) {
String line;
while (!(line = read(in)).isEmpty()) {
System.out.println(line);
out.write(String.format("echo : %s", line).getBytes());
out.flush();
}
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static final int BUFFER_SIZE = 1024;
private static String read(InputStream in) throws IOException {
byte[] buffer = new byte[BUFFER_SIZE];
int count = in.read(buffer);
return new String(buffer, 0, count);
}
}
- Client
public class Client {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8088);
try (BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream()) {
String line;
while ((line = consoleReader.readLine()) != null) {
out.write(line.getBytes());
out.flush();
String data = read(in);
System.out.println(data);
}
}
socket.close();
}
private static final int BUFFER_SIZE = 1024;
private static String read(InputStream in) throws IOException {
byte[] buffer = new byte[BUFFER_SIZE];
int count = in.read(buffer);
return new String(buffer, 0, count);
}
}
半关闭Socket
Socket
提供了两个半关闭的方法(只关闭输入流 or 输出流):
方法 | 描述 |
---|---|
void shutdownInput() |
Places the input stream for this socket at “end of stream”. |
void shutdownOutput() |
Disables the output stream for this socket. |
boolean isInputShutdown() |
Returns whether the read-half of the socket connection is closed. |
boolean isOutputShutdown() |
Returns whether the write-half of the socket connection is closed. |
当调用shutdownXxx()
方法关闭对应流之后,Socket处于“半关闭”状态,用以表示输入/输出已经完成,此时该Socket
再也无法打开输入流/输出流,因此这种做法不适合保持持久通信的交互式应用, 只适用于像HTTP之类的一站式的通信协议.
注意: 即使同一个
Socket
两个shutdownXxx()
方法都调用了,该Socket
也没有关闭, 只是不能读写数据而已.
UDP协议
UDP是一种面向非连接的传输层协议,在正式通信前不必与对方建立连接,因此它的通信效率极高,但可靠性不如TCP. 因此UDP适用于一次只传送少量数据、对可靠性要求不高 的环境.(关于UDP协议的详细介绍也可参考的博客TCP/IP入门(3) –传输层).
DatagramSocket
Java使用DatagramSocket
代表UDP的Socket, 可以将DatagramSocket
比作码头,他不维护状态,不产生IO流, 唯一作用就是发送/接收货物(数据)DatagramPacket
:
DatagramSocket |
描述 |
---|---|
DatagramSocket(int port) |
Constructs a datagram socket and binds it to the specified port on the local host machine. |
DatagramSocket(SocketAddress bindaddr) |
Creates a datagram socket, bound to the specified local socket address. |
void receive(DatagramPacket p) |
Receives a datagram packet from this socket. |
void send(DatagramPacket p) |
Sends a datagram packet from this socket. |
void bind(SocketAddress addr) |
Binds this DatagramSocket to a specific address & port. |
void connect(InetAddress address, int port) |
Connects the socket to a remote address for this socket. |
void connect(SocketAddress addr) |
Connects this socket to a remote socket address (IP address + port number). |
void close() |
Closes this datagram socket. |
DatagramSocket
同样也提供了一些设置标志的方法如:SO_SNDBUF/SO_REUSEADDR/SO_RCVBUF/SO_BROADCAST
DatagramPacket
Java中使用DatagramPacket
来代表数据报,DatagramSocket
接收/发送数据都通过DatagramPacket
完成:
DatagramPacket |
描述 |
---|---|
DatagramSocket() |
Constructs a datagram socket and binds it to any available port on the local host machine. |
DatagramPacket(byte[] buf, int length) |
Constructs a DatagramPacket for receiving packets of length length. |
DatagramPacket(byte[] buf, int length, InetAddress address, int port) |
Constructs a datagram packet for sending packets of length length to the specified port number on the specified host. |
byte[] getData() |
Returns the data buffer. |
void setData(byte[] buf) |
Set the data buffer for this packet. |
void setSocketAddress(SocketAddress address) |
Sets the SocketAddress (usually IP address + port number) of the remote host to which this datagram is being sent. |
SocketAddress getSocketAddress() |
Gets the SocketAddress (usually IP address + port number) of the remote host that this packet is being sent to or is coming from. |
DatagramSocket
造出实例之后就可通过receive
/send
方法来接收/发送数据,从方法细节可以看出, 使用DatagramSocket
发送数据但并不知道数据的目的地在哪儿,而是由DatagramPacket
自身决定.
- Server
/**
* @author jifang
* @since 16/8/6 下午4:52.
*/
public class UDPServer {
private static final byte[] RECEIVE_BUFFER = new byte[1024];
public static void main(String[] args) throws IOException {
DatagramSocket socket = new DatagramSocket(8088);
DatagramPacket packet = new DatagramPacket(RECEIVE_BUFFER, RECEIVE_BUFFER.length);
while (true) {
socket.receive(packet);
String content = new String(packet.getData(), 0, packet.getLength());
System.out.println(String.format("Client IP: %s, Port: %s, Receive: %s", packet.getAddress(), packet.getPort(), content));
String resp = String.format("Hello %s", content);
socket.send(new DatagramPacket(resp.getBytes(), resp.length(), packet.getSocketAddress()));
}
}
}
- Client
public class UDPClient {
private static final InetSocketAddress ADDRESS = new InetSocketAddress("127.0.0.1", 8088);
private static final byte[] RECEIVE_BUFFER = new byte[1024];
public static void main(String[] args) throws IOException {
DatagramSocket socket = new DatagramSocket();
DatagramPacket recePacket = new DatagramPacket(RECEIVE_BUFFER, RECEIVE_BUFFER.length);
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line;
while ((line = reader.readLine()) != null) {
socket.send(new DatagramPacket(line.getBytes(), line.length(), ADDRESS));
socket.receive(recePacket);
System.out.println(new String(recePacket.getData(), 0, recePacket.getLength()));
}
}
}
Proxy
从1.5版本开始, Java提供了Proxy
和ProxySelector
:
Proxy
代表一个代理服务器: 在打开URLConnection
连接或创建Socket
连接时指定Proxy;ProxySelector
代表一个代理选择器: 他提供对代理服务器更加灵活的配置,如可以对HTTP、 HTTPS、FTP、SOCKS等进行分别配置,还可设置针对不同主机和地址配置不使用代理.
关于代理详细介绍可参考博客: Nginx - 代理、缓存.
Proxy
Proxy
有一个构造器:Proxy(Proxy.Type type, SocketAddress sa)
用于创建表示代理服务器的Proxy
对象, 其中type
参数表示代理服务器类型:
type | 描述 |
---|---|
Proxy.Type.DIRECT |
Represents a direct connection, or the absence of a proxy. |
Proxy.Type.HTTP |
Represents proxy for high level protocols such as HTTP or FTP. |
Proxy.Type.SOCKS |
Represents a SOCKS (V4 or V5) proxy. |
一旦创建
Proxy
之后, 就可打开连接时传入, 以作为本次连接所使用的代理服务器:
public class ProxyClient {
private static final InetSocketAddress proxyAddr = new InetSocketAddress("139.129.9.166", 8001);
private static final String url = "http://www.baidu.com/";
@Test
public void client() throws Exception {
Proxy proxy = new Proxy(Proxy.Type.HTTP, proxyAddr);
URLConnection connection = new URL(url).openConnection(proxy);
Reader reader = new InputStreamReader(connection.getInputStream());
String content = CharStreams.toString(reader);
System.out.println(content);
}
}
ProxySelector
直接使用Proxy
每次连接都需显示指定Proxy
,而ProxySelector
提供了每次打开连接都具有默认代理的方法:
public class ProxyClient {
protected static final Logger LOGGER = LoggerFactory.getLogger(ProxyClient.class);
private static final String url = "http://www.jd.com/";
private static List<Proxy> proxies = new ArrayList<>();
static {
proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 8001)));
proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 8002)));
}
@Test
public void client() throws Exception {
ProxySelector.setDefault(new ProxySelector() {
@Override
public List<Proxy> select(URI uri) {
String scheme = uri.getScheme();
if (scheme.equalsIgnoreCase("HTTP")) {
return Collections.singletonList(proxies.get(0));
} else {
return Collections.singletonList(proxies.get(1));
}
}
@Override
public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
LOGGER.error("connection failed , URI: {}, SocketAddress: {} ", uri, sa, ioe);
}
});
URLConnection connection = new URL(url).openConnection();
Reader reader = new InputStreamReader(connection.getInputStream());
String content = CharStreams.toString(reader);
System.out.println(content);
}
}
- Java还为
ProxySelector
提供了一个默认实现类sun.net.spi.DefaultProxySelector
并将其注册为默认的代理选择器.其实现策略如下:
select()
: 根据系统配置属性(如http.proxyHost
、http.proxyPort
、http.nonProxyHosts
等)来决定使用哪个代理服务器,详见默认代理选择器、DefaultProxySelector.java.connectFailed()
: 连接失败则会尝试不使用代理服务器, 直接连接远程主机.
NIO
从1.4版本开始, Java提供了NIO/非阻塞IO来开发高性能网络服务器.NIO可以让服务器使用一个/有限几个线程同时处理所有客户端连接.
JDK1.5_update10版本使用epoll替代了传统的select与poll,极大的提升了NIO性能, 详细介绍可参考: select的限制与poll的使用、epoll原理与封装.
Java为NIO为提供了如下常用类:
Selector
非阻塞IO核心, 是SelectableChanel
的多路复用器, 所有希望采用非阻塞方式通信的Channel都需要注册到Selector
, 然后Selector
可以同时阻塞监听多个Channel:
Selector |
描述 |
---|---|
static Selector open() |
Opens a selector. |
Set<SelectionKey> selectedKeys() |
Returns this selector’s selected-key set. |
int select() |
Selects a set of keys whose corresponding channels are ready for I/O operations. |
int select(long timeout) |
Selects a set of keys whose corresponding channels are ready for I/O operations. |
Set<SelectionKey> keys() |
Returns this selector’s key set. |
int selectNow() |
Selects a set of keys whose corresponding channels are ready for I/O operations. |
Selector wakeup() |
Causes the first selection operation that has not yet returned to return immediately. |
SelectorProvider provider() |
Returns the provider that created this channel. |
void close() |
Closes this selector. |
SelectableChannel
代表可以支持非阻塞的Channel对象,默认阻塞,必须开启非阻塞模式才可利用非阻塞特性,被注册到Selector
上,并由SelectionKey
代表这种注册关系:
SelectableChannel |
描述 |
---|---|
SelectionKey register(Selector sel, int ops) |
Registers this channel with the given selector, returning a selection key. |
SelectionKey register(Selector sel, int ops, Object att) |
Registers this channel with the given selector, returning a selection key. |
SelectableChannel configureBlocking(boolean block) |
Adjusts this channel’s blocking mode. |
boolean isBlocking() |
Tells whether or not every I/O operation on this channel will block until it completes. |
int validOps() |
Returns an operation set identifying this channel’s supported operations. |
boolean isRegistered() |
Tells whether or not this channel is currently registered with any selectors. |
SelectionKey keyFor(Selector sel) |
Retrieves the key representing the channel’s registration with the given selector. |
validOps()
方法返回一个整数值, 表示该Channel支持的IO操作, 在SelectionKey
定义了四种IO操作常量:
常量 | 描述 |
---|---|
OP_ACCEPT |
Operation-set bit for socket-accept operations. |
OP_CONNECT |
Operation-set bit for socket-connect operations. |
OP_READ |
Operation-set bit for read operations. |
OP_WRITE |
Operation-set bit for write operations. |
所以可以根据validOps()
返回值确定该SelectableChannel
支持的IO操作(如ServerSocketChannel
只支持OP_ACCEPT
、SocketChannel
支持OP_CONNECT
/OP_READ
/OP_WRITE
等), 但SelectionKey
提供了更加方便的方法来查看发生了什么IO事件:
SelectionKey
代表SelectableChannel
和Selector
之间的注册关系:
方法 | 描述 |
---|---|
SelectableChannel channel() |
Returns the channel for which this key was created. |
Selector selector() |
Returns the selector for which this key was created. |
boolean isAcceptable() |
Tests whether this key’s channel is ready to accept a new socket connection. |
boolean isConnectable() |
Tests whether this key’s channel has either finished, or failed to finish, its socket-connection operation. |
boolean isReadable() |
Tests whether this key’s channel is ready for reading. |
boolean isWritable() |
Tests whether this key’s channel is ready for writing. |
int interestOps() |
Retrieves this key’s interest set. |
SelectionKey interestOps(int ops) |
Sets this key’s interest set to the given value. |
int readyOps() |
Retrieves this key’s ready-operation set. |
Object attach(Object ob) |
Attaches the given object to this key. |
Object attachment() |
Retrieves the current attachment. |
void cancel() |
Requests that the registration of this key’s channel with its selector be cancelled. |
boolean isValid() |
Tells whether or not this key is valid. |
NIO示例
public class Server {
private static ByteBuffer BUFFER = ByteBuffer.allocate(1024);
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(8088));
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// ServerSocket可以接受连接
if (key.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel client = serverChannel.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}
// Socket可读
else if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
String content = readFully(clientChannel);
if (!content.isEmpty()) {
System.out.println(content);
ByteBuffer buffer = ((ByteBuffer) BUFFER.clear()).put(String.format("Hello %s", content).getBytes());
for (SelectionKey aKey : selector.keys()) {
SelectableChannel channel = aKey.channel();
if (channel instanceof SocketChannel) {
((SocketChannel) channel).write((ByteBuffer) buffer.flip());
}
}
}
// 客户端断开, 取消注册
else {
key.cancel();
}
}
iterator.remove();
}
}
}
private static String readFully(SocketChannel client) throws IOException {
StringBuilder sb = new StringBuilder();
BUFFER.clear();
while (client.read(BUFFER) > 0) {
sb.append(StandardCharsets.UTF_8.decode((ByteBuffer) BUFFER.flip()));
BUFFER.clear();
}
return sb.toString();
}
}
注意每次迭代末尾的
iterator.remove()
调用.Selector
自己不会移除已触发过的SelectionKey
实例,必须在处理完SelectableChanel
时自己移除, 等下次该Channel就绪时,Selector
会再次将其放入已触发的Key中.
小结
使用
Selector
NO来处理多Channel
的好处是: 只需要更少的线程来处理多个IO(事实上可以只用一个线程处理所有通道); 因为对于操作系统而言,多个线程间上下文切换的开销很大,而且每个线程都要占用系统的部分资源(如线程私有内存等),因此使用的线程越少越好. 但现代操作系统和CPU在多任务方面的表现越来越好,而且CPU多核已成事实, 所以线程开销随着时间的推移变得越来越小,因此不使用多任务可能就是在浪费CPU,而且NIO开发相对繁琐, 因此在使用or不使用Selector
还需要做进一步的权衡.
AIO
Java7的NIO.2提供了异步Channel支持, 以提供更高效的IO. 这种基于异步Channel的IO机制被称为AIO.Java提供了一系列以Asynchronous开头的Channel
接口和类:
可以看出,AIO包含2个接口、3个实现类: 其中AsynchronousServerSocketChannel
/AsynchronousSocketChannel
是支持TCP通信的异步Channel
:
AsynchronousServerSocketChannel
负责监听, 与ServerSocketChannel
相似:
方法 | 描述 |
---|---|
static AsynchronousServerSocketChannel open() |
Opens an asynchronous server-socket channel. |
static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group) |
Opens an asynchronous server-socket channel. |
AsynchronousServerSocketChannel bind(SocketAddress local) |
Binds the channel’s socket to a local address and configures the socket to listen for connections. |
Future<AsynchronousSocketChannel> accept() |
Accepts a connection. |
<A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler) |
Accepts a connection. |
abstract <T> AsynchronousServerSocketChannel setOption(SocketOption<T> name, T value) |
Sets the value of a socket option. |
AsynchronousChannelGroup
是异步Channel
分组管理器, 创建时传入ExecutorService
, 使其绑定一个线程池,以实现资源共享. 他主要负责两个任务: 处理IO事件和触发CompletionHandler
:
public interface CompletionHandler<V,A> {
/**
* Invoked when an operation has completed.
*
* @param result
* The result of the I/O operation.
* @param attachment
* The object attached to the I/O operation when it was initiated.
*/
void completed(V result, A attachment);
/**
* Invoked when an operation fails.
*
* @param exc
* The exception to indicate why the I/O operation failed
* @param attachment
* The object attached to the I/O operation when it was initiated.
*/
void failed(Throwable exc, A attachment);
}
AsynchronousSocketChannel
客户端的异步Channel
, 了解了AsynchronousServerSocketChannel
的使用之后,AsynchronousSocketChannel
就非常简单了:
方法 | 描述 |
---|---|
static AsynchronousSocketChannel open() |
Opens an asynchronous socket channel. |
static AsynchronousSocketChannel open(AsynchronousChannelGroup group) |
Opens an asynchronous socket channel. |
Future<Void> connect(SocketAddress remote) |
Connects this channel. |
<A> void connect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler) |
Connects this channel. |
Future<Integer> read(ByteBuffer dst) |
Reads a sequence of bytes from this channel into the given buffer. |
<A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer,? super A> handler) |
Reads a sequence of bytes from this channel into the given buffer. |
Future<Integer> write(ByteBuffer src) |
Writes a sequence of bytes to this channel from the given buffer. |
<A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler) |
Writes a sequence of bytes to this channel from the given buffer. |
- 示例
public class Server {
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);
private static AsynchronousChannelGroup initChannelGroup() throws IOException {
return AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(10));
}
public static void main(String[] args) throws IOException, InterruptedException {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel
.open(initChannelGroup())
.bind(new InetSocketAddress(8088));
server.accept(null, new AcceptHandler(server));
Thread.sleep(10 * 1000 * 1000);
server.close();
}
private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private List<AsynchronousSocketChannel> clients;
private AsynchronousServerSocketChannel server;
public AcceptHandler(AsynchronousServerSocketChannel server) {
this.server = server;
this.clients = new ArrayList<>();
}
@Override
public void completed(final AsynchronousSocketChannel client, Object attachment) {
// Server继续接收下一次请求
server.accept(null, this);
clients.add(client);
// 读取数据
final ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer, null, new CompletionHandler<Integer, Objects>() {
@Override
public void completed(Integer result, Objects attachment) {
buffer.flip();
String content = StandardCharsets.UTF_8.decode(buffer).toString();
if (!content.isEmpty()) {
System.out.println(String.format("receive: %s", content));
for (AsynchronousSocketChannel client : clients) {
try {
buffer.flip();
client.write(buffer).get();
} catch (InterruptedException | ExecutionException ignored) {
}
}
// Client继续接收数据
buffer.clear();
client.read(buffer, null, this);
}
// 客户端断开连接
else {
clients.remove(client);
}
}
@Override
public void failed(Throwable exc, Objects attachment) {
LOGGER.error("read error: ", exc);
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
LOGGER.error("accept error: ", exc);
}
}
}
其他关于AIO的介绍可参考博客 Select-I/O复用、Java nio 2.0 AIO.
RPC
RPC(Remote Procedure Call: 远程过程调用)是实现SOA的基础,其主要目的是使构建分布式计算/应用更容易,在提供强大远程调用能力的同时又不损失本地调用的语义简洁性(被调代码并不在调用者本地执行,但其调用方式几乎与本地一模一样).
类似Dubbo/Hessian/Thrift的RPC框架对项目有着非常重要的现实意义:可以将庞大的系统拆分成多个模块,每个模块又可根据不同的压力启动不同数量的实例,模块间通过RPC透明通信,从而将集中式系统改造成分布式以提高其扩展能力,优化硬件资源利用率.
现在我们简单实现一个RPC框架的原型, 技术选型如下:
- 通信: TCP
- 配置: ZooKeeper
- 序列化: Hession2
- 模式: 动态代理/线程池
服务
无论是否为RPC, 都需要首先实现可调用服务Service,只是RPC方式会将Service实现部署在服务端, 并提供Server接口给客户端:
/**
* @author jifang
* @since 16/8/4 上午10:56.
*/
public class HelloServiceImpl implements IHelloService {
@Override
public String sayHello(String name) {
return String.format("Hello %s.", name);
}
@Override
public String sayGoodBye(String name) {
return String.format("%s Good Bye.", name);
}
}
public class CalcServiceImpl implements ICalcService {
@Override
public Long add(Long a, Long b) {
return a + b;
}
@Override
public Long minus(Long a, Long b) {
return a - b;
}
}
C/S通信协议
定义InterfaceWrapper: Client/Server间通信协议数据包装对象:
public class InterfaceWrapper implements Serializable {
private Class<?> inter;
private String methodName;
private Class<?>[] paramTypes;
private Object[] arguments;
public Class<?> getInter() {
return inter;
}
public void setInter(Class<?> inter) {
this.inter = inter;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
}
public Object[] getArguments() {
return arguments;
}
public void setArguments(Object[] arguments) {
this.arguments = arguments;
}
}
服务端: Producer
- Producer: 服务生产者
public class Producer {
private static final int _1H = 1000 * 60 * 60;
@Test
public void producer() throws IOException, InterruptedException {
IHelloService helloService = new HelloServiceImpl();
ICalcService calcService = new CalcServiceImpl();
ProducerClient.getInstance().publish(IHelloService.class, helloService);
ProducerClient.getInstance().publish(ICalcService.class, calcService);
Thread.sleep(_1H);
}
}
- ProducerClient
public class ProducerClient {
static {
new Thread(new ServerBootstrap()).start();
}
private static ProducerClient instance = new ProducerClient();
public static ProducerClient getInstance() {
return instance;
}
public void publish(Class<?> inter, Object impl) {
ServiceImplMap.push(inter, impl);
}
}
ProducerClient
是面向生产者的Client接口,系统启动时即启动一个新线程ServerBootstrap
建立ServerSocket
等待在accept()
上:
public class ServerBootstrap implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerBootstrap.class);
@Override
public void run() {
ServerSocket server = ServerSocketInstance.getServerSocket();
try {
while (true) {
Socket client = server.accept();
ThreadPoolInstance.getExecutor().submit(new ProducerInvokeTask(client));
}
} catch (IOException e) {
LOGGER.error("accept error: ", e);
}
}
}
每当发布一个服务,
ProducerClient
向ConcurrentMap
中插入一条记录(便于连接建立时从Map
中获得interface
对应Service
实例), 并将服务端ip:port发布到ZooKeeper, 便于客户端发现并连接:
public class ServiceImplMap {
private static final ConcurrentMap<Class<?>, Object> map = new ConcurrentHashMap<>();
private static final ZkClient zk;
private static final String FIRST_LEVEL_PATH = "/mi.rpc";
static {
zk = new ZkClient(ConfigMap.getServer("zk.servers"));
}
private static void createNode(String first, String second, String dest) {
if (!zk.exists(first)) {
zk.createPersistent(first);
}
if (!zk.exists(first + "/" + second)) {
zk.createPersistent(first + "/" + second);
}
zk.createEphemeral(first + "/" + second + "/" + dest);
}
public static void push(Class<?> inter, Object impl) {
map.put(inter, impl);
// 发布到ZK
String ip;
try {
ip = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
ip = null;
e.printStackTrace();
}
int port = ServerSocketInstance.getServerSocket().getLocalPort();
createNode(FIRST_LEVEL_PATH, inter.getName(), String.format("%s:%s", ip, port));
}
public static Object get(Class<?> inter) {
return map.get(inter);
}
}
一旦有客户端建立连接(服务消费者尝试调用服务端方法),
ServerBootstrap
为该Socket
分配一个线程:
1. 首先从Socket
读取数据并反序列化为InterfaceWrapper
对象;
2. 根据InterfaceWrapper
提供的数据调用目标服务方法;
3. 得到计算结果, 序列化后写入Socket
.
public class ProducerInvokeTask implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(ProducerInvokeTask.class);
private Socket client;
public ProducerInvokeTask(Socket client) {
this.client = client;
}
@Override
public void run() {
try (InputStream in = client.getInputStream();
OutputStream out = client.getOutputStream()) {
// 读出数据 反序列化
byte[] bytes = ReaderUtil.toByteArray(in);
InterfaceWrapper wrapper = Hessian2Serializer.deserialize(bytes);
// 执行方法调用
Object service = ServiceImplMap.get(wrapper.getInter());
Method method = service.getClass().getMethod(wrapper.getMethodName(), wrapper.getParamTypes());
Object result = method.invoke(service, wrapper.getArguments());
// 序列化 写入数据
bytes = Hessian2Serializer.serialize(result);
out.write(bytes);
} catch (IOException | NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
LOGGER.error("Producer Invoke Task Error: ", e);
}
}
}
客户端: Consumer
public class Consumer {
@Test
public void consumer() {
IHelloService helloService = ConsumerClient.getInstance().subscribe(IHelloService.class);
ICalcService calcService = ConsumerClient.getInstance().subscribe(ICalcService.class);
String hello = helloService.sayHello("翡青");
System.out.println(hello);
String goodBye = helloService.sayGoodBye("翡青");
System.out.println(goodBye);
long add = calcService.add(1L, 1L);
System.out.println(add);
long minus = calcService.minus(1L, 1L);
System.out.println(minus);
}
}
- ConsumerClient
客户端需要订阅服务: 从ZooKeeper中拉取该
interface
对应服务端ip:port, 返回一个动态代理对象:
public class ConsumerClient {
private static ConsumerClient instance = new ConsumerClient();
public static ConsumerClient getInstance() {
return instance;
}
private static final String PATH = "/mi.rpc/%s/";
private static ZkClient zk;
static {
zk = new ZkClient(ConfigMap.getClient("zk.servers"));
}
// 从ZooKeeper拉取数据, 简易负载均衡
private String getAddress(String name) {
List<String> children = zk.getChildren(String.format(PATH, name));
int index = new Random().nextInt(children.size());
return children.get(index);
}
@SuppressWarnings("all")
public <T> T subscribe(Class<T> inter) {
checkInterface(inter);
String[] address = getAddress(inter.getName()).split(":");
String ip = address[0];
int port = Integer.valueOf(address[1]);
return (T) Proxy.newProxyInstance(inter.getClassLoader(), new Class[]{inter}, new ConsumerIInvokeTask(inter, ip, port));
}
private void checkInterface(Class<?> inter) {
if (inter == null || !inter.isInterface()) {
throw new IllegalArgumentException("inter Mast a interface class");
}
}
}
待到客户端实际调用
interface
内方法, 才会执行ConsumerIInvokeTask
的invoke()
:
1. 创建与ServerSocket
连接,将需要的接口信息(接口Class
、方法名、参数类型、参数值)包装成InterfaceWrapper
使用Hession2序列化后传递给服务端;
2. 等待服务端接收数据执行方法然后将结果数据序列化返回;
3. 客户端反序列化为目标对象, 然后invoke()
返回执行结果:
public class ConsumerIInvokeTask implements InvocationHandler {
private Class<?> inter;
private String ip;
private int port;
public ConsumerIInvokeTask(Class<?> inter, String ip, int port) {
this.inter = inter;
this.ip = ip;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket client = new Socket(ip, port);
try (
OutputStream out = client.getOutputStream();
InputStream in = client.getInputStream()
) {
// 序列化 写入数据
InterfaceWrapper wrapper = new InterfaceWrapper();
wrapper.setInter(this.inter);
wrapper.setMethodName(method.getName());
wrapper.setParamTypes(method.getParameterTypes());
wrapper.setArguments(args);
byte[] bytes = Hessian2Serializer.serialize(wrapper);
out.write(bytes);
// 读出数据 反序列化
bytes = ReaderUtil.toByteArray(in);
return Hessian2Serializer.deserialize(bytes);
}
}
}
限于篇幅此只列出最核心代码, 详细参考Git地址: https://git.oschina.net/feiqing/MiRPC.git
- 参考 & 扩展
- 你应该知道的 RPC 原理
- Step by step玩转RPC
- 深入浅出 RPC - 浅出篇
- 深入浅出 RPC - 深入篇
- Effective java 中文版(第2版)
- Java并发编程实战
- 疯狂Java讲义
- 分布式服务框架:原理与实践