案例GitHub地址
- 在博客Android | UDP的C/S通信实战案例的基础上,继续进行开发;
创建TCP服务端
- 在sample模块下,
新建一个名为tcp的package,
创建TcpServer:
- **指定服务端端口号(ip 默认为本机ip)
启动循环读取消息队列的子线程,
死循环,不断等待客户端请求连接,
一旦连接上,
直接新建一个子线程(丢给ClientTask)去处理这个socket,
于是主线程又可以回到accept() 阻塞,等待下一个连接请求;
同时,将连接上的socket 对应的线程类,注册为消息队列的观察者,
让线程类担任观察者,负责接收被观察者的通知信息并做socket 通信。**
/**
* <pre>
* author : 李蔚蓬(简书_凌川江雪)
* time : 2019/10/30 16:57
* desc :指定服务端端口号(ip 默认为本机ip)
* 启动循环读取消息队列的子线程,
* 死循环,不断等待客户端请求连接,
* 一旦连接上,直接新建一个子线程(丢给ClientTask)去处理这个socket,
* 于是主线程又可以回到accept() 阻塞,等待下一个连接请求;
* 同时,将连接上的socket 对应的线程类,注册为消息队列的观察者,
* 让线程类担任观察者,负责接收被观察者的通知信息并做socket 通信
* </pre>
*/
public class TcpServer {
public void start() {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(9090);
MsgPool.getInstance().start();//启动读消息的子线程
while (true) {
// /*
// 阻塞的方法!!! 等待(客户端的) TCP 连接请求
// 客户端有 TCP 请求并连接上了 ServerSocket,.
// 那 accept() 就会返回一个 同一连接上 对应 客户一端socket 的 服务一端socket
// */
Socket socket = serverSocket.accept();
//客户端连接之后,打印相关信息
// System.out.println("ip: " + socket.getInetAddress().getHostAddress() +
// ", port = " + socket.getPort() + " is online...");
System.out.println("ip = " + "***.***.***.***" +
", port = " + socket.getPort() + " is online...");
// /*
// 连接上了之后不能直接拿IO流去读写,
// 因为getInputStream() 和 getOutputStream() 都是阻塞的!!!!
// 如果直接拿IO 流,不做其他处理,
// 那么Server端的处理流程是这样的:
// accept()-- getInputStream()处理第一个客户端 -- 处理完毕,accept()-- getInputStream()处理第二个客户端....
// 所以必须开启子线程去读写客户端,才能做成聊天室
//
// 针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信
//
// 过程:客户端连上之后,打印其信息,
// 然后直接新建一个子线程(丢给ClientTask)去处理这个socket,
// 于是主线程又可以回到accept() 阻塞,等待下一个连接请求
// */
ClientTask clientTask = new ClientTask(socket);
MsgPool.getInstance().addMsgComingListener(clientTask);
clientTask.start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new TcpServer().start();
}
}
- 针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信,
准备一个线程类,名为ClientTask,
针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信,
这里便是线程类;
run()中死循环不断读取本类实例对应的客户端发来的信息,
或者发送给对应的连接对面客户端(服务端)要发送的信息;
实现MsgPool.MsgComingListener, 成为消息队列的观察者!!!
/**
* <pre>
* author : 李蔚蓬(简书_凌川江雪)
* time : 2019/10/30 17:23
* desc :针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信,
* 这里便是线程类;
* run()中死循环不断读取客户端发来的信息,发送给客户端(服务端)要发送的信息;
* 实现MsgPool.MsgComingListener, 成为消息队列的观察者!!!
* </pre>
*/
public class ClientTask extends Thread implements MsgPool.MsgComingListener {
private Socket mSocket;
private InputStream mIs;
private OutputStream mOs;
public ClientTask(Socket socket) {
try {
mSocket = socket;
mIs = socket.getInputStream();
mOs = socket.getOutputStream();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
BufferedReader br = new BufferedReader(new InputStreamReader(mIs));
String line = null;
/*
读取并输出客户端信息。
如果没有客户端发送信息,readLine() 便会阻塞在原地
*/
try {
while ((line = br.readLine()) != null) {
System.out.println("read " + mSocket.getPort() + " = " + line);
//把信息发送加入到消息队列,
// 借助消息队列的被观察者通知方法,
// 将消息转发至其他Socket(所有socket都在创建ClientTask的时候,
// 备注成为MsgPool 的观察者)
MsgPool.getInstance().sendMsg(mSocket.getPort() + ": " + line);
}
} catch (IOException e) {
e.printStackTrace();
}
}
//作为消息队列的观察者对应的更新方法,
// 消息队列中最新的消息会推送通知到这里的msg参数,
// 这里拿到最新的推送消息后,写进输出流,
// 推到TCP 连接的客户一端的 socket
@Override
public void onMsgComing(String msg) {
try {
mOs.write(msg.getBytes());
mOs.write("\n".getBytes());
mOs.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
- **准备一个消息队列,
每一个Client发送过来的消息,
都会被加入到队列当中去,
队列中默认有一个子线程,
专门从队列中,死循环,不断去取数据(取出队列的队头),
取到数据就做相关处理,比如分发给其他的socket;**
/**
* <pre>
* author : 李蔚蓬(简书_凌川江雪)
* time : 2019/10/30 17:45
* desc :每一个Client发送过来的消息,
* 都会被加入到队列当中去,
* 队列中默认有一个子线程,
* 专门从队列中,死循环,不断去取数据,
* 取到数据就做相关处理,比如分发给其他的socket;
* </pre>
*/
public class MsgPool {
private static MsgPool mInstance = new MsgPool();
/*
这里默认消息是String类型,
或者可以自行封装一个Model 类,存储更详细的信息
block n.块; 街区;障碍物,阻碍
顾名思义,这是一个阻塞的队列,当有消息过来时,就把消息发送给这个队列,
这边会起一个线程专门从队列里面去取消息,
如果队列中没有消息,就会阻塞在原地
*/
private LinkedBlockingQueue<String> mQueue = new LinkedBlockingQueue<>();
public static MsgPool getInstance() {
return mInstance;
}
private MsgPool() {
}
//这是一个阻塞的队列,
// 当有消息过来时,即客户端接收到消息时,
// 就把消息发送(添加)到这个队列中
//现在所有的客户端都可以发送消息到这个队列中
public void sendMsg(String msg) {
try {
mQueue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//要一早就调用本方法,
// 启动这个读取消息的线程,在后台不断运行
public void start() {
//开启一个线程去读队列的数据
new Thread() {
@Override
public void run() {
//无限循环读取信息
while (true) {
try {
//取出并移除队头;没有消息时,take()是阻塞的
String msg = mQueue.take();
notifyMsgComing(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
//被观察者方法,遍历所有已注册的观察者,一次性通知更新
private void notifyMsgComing(String msg) {
for (MsgComingListener listener : mListeners) {
listener.onMsgComing(msg);
}
}
//观察者接口
public interface MsgComingListener {
void onMsgComing(String msg);//更新方法
}
//被观察者,存放观察者
private List<MsgComingListener> mListeners = new ArrayList<>();
//被观察者方法,添加观察者到列表
public void addMsgComingListener(MsgComingListener listener) {
mListeners.add(listener);
}
}
**所有的客户端都可发送消息到队列中,
然后所有的客户端都在等待
消息队列的消息新增(mQueue.put())这个时刻,
消息队列一新增消息,
即一接收到某个客户端发送过来消息(mQueue.put()),
则消息都会一次性转发给所有客户端,
所以这里涉及到一个观察者设计模式,
消息队列(MsgPool)或消息(Msg)是被观察者,
所有客户端处理线程(ClientTask)都是观察者**观察者模式实现小结:
**观察者接口准备更新(数据或UI的)方法;
被观察者接口准备三个抽象方法;
观察者实现类具体实现更新逻辑,可以有参数,参数为更新需要的数据;
被观察者实现类准备一个观察者List以及实现三个方法:
1.观察者注册方法:
参数为某观察者,功能是把观察者参数加到观察者List中;
2.注销观察者方法:
参数为某观察者,功能是把观察者参数从观察者List中移除;
3.通知观察者方法:无参数或者把需要通知的数据作为参数,
功能是遍历所有已注册的观察者,
即遍历 注册添加到 观察者List中的观察者,逐个调用List中所有观察者的更新方法;即一次性更新所有已注册的观察者!
使用时,
实例化一个被观察者和若干个观察者,
将所有观察者注册到被观察者处,
调用被观察者的通知方法,一次性更新所有已注册的观察者!**
创建TCP客户端
- 创建两个Package,整理一下项目架构,创建一个TcpClient:
/**
* <pre>
* author : 李蔚蓬(简书_凌川江雪)
* time : 2019/10/31 15:36
* desc :
* </pre>
*/
public class TcpClient {
private Scanner mScanner;
public TcpClient() {
mScanner = new Scanner(System.in);
mScanner.useDelimiter("\n");
}
/**
* 配置socket
* 准备IO 流,
* 主线程写,子线程读
*
*/
public void start() {
try {
Socket socket = new Socket("***", 9090);
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
final BufferedReader br = new BufferedReader(new InputStreamReader(is));
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));
/*
实现:
通过 reader,
在任何时候 能够读到 Server端 发来的数据
通过 writer,
在任何时候 能够向 Server端 去写数据
*/
//在等待客户端 发送消息过来的话,这里是需要阻塞的,
// 阻塞的时候又没有办法向客户端发送数据,所以读写独立的话,肯定是要起线程的
//起一个线程,专门用于
// 读Server 端 发来的数据,数据一过来就读然后输出,
// 输出服务端发送的数据
new Thread() {
@Override
public void run() {
try {
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
} catch (IOException e) {
}
}
}.start();
//给Server端 发送数据
while (true) {
//next() 是阻塞的,不断地读控制面板,有数据就会通过bufferWriter,
// 即outputStream 写给Server
String msg = mScanner.next();
bw.write(msg);
bw.newLine();
bw.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new TcpClient().start();
}
}
- 反复测试:
移植客户端到Android移动端
- 复制TcpClient到biz包下迭代,名为TcpClientBiz:
/**
* <pre>
* author : 李蔚蓬(简书_凌川江雪)
* time : 2019/10/31 15:36
* desc : 定义接口,完成客户端的收发逻辑
* </pre>
*/
public class TcpClientBiz {
private Socket mSocket;
private InputStream mIs;
private OutputStream mOs;
/**
* Looper.getMainLooper(),将主线程中的 Looper 扔进去了,
* 也就是说 handleMessage 会运行在主线程中,
* !!!!!!!!!!
* 这样可以在主线程中更新 UI 而不用把 Handler 定义在主线程中。
* !!!!!!!!!!
*/
private Handler mUiHandler = new Handler(Looper.getMainLooper());
// /*
// 注意,因为UdpClient 的send 和 receive 是绑定的,
// 所以其 返回信息的处理接口 是作为 发送信息方法 的参数的,由此产生绑定逻辑
//
// 但是这里 TcpClient 就不是send 和 receive 一一绑定了,
// 其没有数量的对应关系,只是一个持续的 任意数据包数量的 全双工的连接,
// 无需Udp 的绑定逻辑, Listener 由此不使用跟send 方法绑定的逻辑,
// 使用单独set 的逻辑表达方式
// */
public interface onMsgComingListener {
void onMsgComing(String msg);
void onError(Exception ex);
void popToast();
}
private onMsgComingListener mListener;
public void setOnMsgComingListener(onMsgComingListener listener) {
mListener = listener;
}
//------------------------------------------------------------------------
public TcpClientBiz() {
// //socket 的new 到 IO 流的获取 这几行代码是已经做了网络操作的,
// // 所以必须开一个子线程去进行,!!!!
// // 毕竟 TcpClientBiz() 在调用的时候肯定是在UI 线程进行的
//
// /*
// 另外需要注意一点!!!
// 下面的socket 和 IO 流初始化是在子线程中进行的,
// 所以我们不知道什么时候会完成初始化,
// 因此在使用的时候是需要进行一个UI 交互提醒的,
// 比如loading 动画,启动页面时使用loading动画,初始化完成之后再取消loading 动画,
//
// */
new Thread() {
@Override
public void run() {
try {
mSocket = new Socket("172.18.1.59", 9090);//连接到 Server端
mIs = mSocket.getInputStream();
mOs = mSocket.getOutputStream();
mUiHandler.post(new Runnable() {
@Override
public void run() {
mListener.popToast();
}
});
//读到消息则 借用回调 回到MainActivity 进行UI 更新
readServerMsg();
} catch (final IOException e) {
mUiHandler.post(new Runnable() {
@Override
public void run() {
if (mListener != null) {
mListener.onError(e);
}
}
});
}
}
}.start();
}
/**
* 一旦本类被实例化,马上启动
* 不断阻塞等待Server端 信息
* readLine() 没有消息时阻塞,
* 一有消息,马上发给接口处理逻辑
*
* @throws IOException
*/
private void readServerMsg() throws IOException {
final BufferedReader br = new BufferedReader(new InputStreamReader(mIs));
String line = null;
while ((line = br.readLine()) != null) {
final String finalLine = line;
/*
!!!!!!!!!!!!!!!!
基于主线程MainLooper 以及 回调机制
在 业务类内部 调用 外部实现的处理逻辑方法
!!!!!!!!!!
*/
mUiHandler.post(new Runnable() {
@Override
public void run() {
//读到消息则 借用回调 回到MainActivity 进行UI 更新
if (mListener != null) {
mListener.onMsgComing(finalLine);
}
}
});
}
}
/**
* 把参数msg 写入BufferWriter(O流),发送给Server端,
* 一般这个msg 消息 是EditText 中的内容,
*
* 调用时机:一般是EditText 右边的按钮被点击的时候
*
* 调用时,封装输出流,
* 把参数msg 写入BufferWriter(O流),发送给Server端,
*
* 在要发送消息给Server 的时候调用
* 发送的消息会在Server 端的 ClientTask 类中
* 的run() 中的while ((line = br.readLine()) != null) 处被读取到,
* 并通过 MsgPool.getInstance().sendMsg() 被添加到消息队列中
*
* @param msg 要发送的信息
*/
public void sendMsg(final String msg) {
//开一个线程去做输出,完成任务之后线程就自动回收
new Thread(){
@Override
public void run() {
try {
//一有消息过来,就封装输出流,写入并 发送信息到 Server端
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(mOs));
bw.write(msg);
bw.newLine();
bw.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
}
public void onDestroy() {
//!!!!
// 独立地try...catch...的原因:
// !!!!
// 如果把三个close 都放在同一个try 块里面
// 那假如第一个close 出现了异常,
// 后面两个close 即使没异常,
// 也处理不了了,这显然是不符合条件的
// !!!!!
try {
if (mIs != null) {
mIs.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (mOs != null) {
mOs.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (mSocket != null) {
mSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
- rename一下MainActivity为UdpActivity:
复制UdpActivity一份,原地粘贴,命名为TcpActivity:
public class TcpActivity extends AppCompatActivity {
private EditText mEtMsg;
private Button mBtnSend;
private TextView mTvContent;
private TcpClientBiz mTcpClientBiz = new TcpClientBiz();
public Context getTcpActivityContext() {
return getApplicationContext();
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
initViews();
mTcpClientBiz.setOnMsgComingListener(new TcpClientBiz.onMsgComingListener() {
@Override
public void onMsgComing(String msg) {
appendMsgToContent("Server:" + msg);
}
@Override
public void onError(Exception ex) {
ex.printStackTrace();
}
@Override
public void popToast() {
Toast.makeText(TcpActivity.this, "初始化完成!!!!可以开始发送信息了!!!", Toast.LENGTH_SHORT).show();
}
});
}
private void initViews() {
mEtMsg = findViewById(R.id.id_et_msg);
mBtnSend = findViewById(R.id.id_btn_send);
mTvContent = findViewById(R.id.id_tv_content);
mBtnSend.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
String msg = mEtMsg.getText().toString();
if (TextUtils.isEmpty(msg)) {
return;
}
//发送后清除编辑框文本
mEtMsg.setText("");
//msg 负责发送数据,onMsgReturnedListener() 则负责处理对应的返回的信息
mTcpClientBiz.sendMsg(msg);
}
});
}
private void appendMsgToContent(String msg) {
mTvContent.append(msg + "\n");
}
/*
回收资源
*/
@Override
protected void onDestroy() {
super.onDestroy();
mTcpClientBiz.onDestroy();
}
}
更改启动页面:
- 反复测试(一个模拟机和两台真机的聊天哈哈哈):
- 最终Server端聊天记录:
服务端诸类代码实现概述(TcpServer、ClientTask、MsgPool)
- TcpServer:
死循环,阻塞,等待客户端请求连接,while (true)
& .accept();
一旦连接上,获取对应的socket对象并
把它丢给ClientTask的构造方法,new ClientTask(socket)
直接新建一个子线程,去处理这个socket(.start()
),
将连接上的socket 对应的线程类,注册到消息队列类中的队列中,
成为消息队列的观察者;MsgPool.getInstance().addMsgComingListener(clientTask)
启动消息队列读读队列的线程,MsgPool.getInstance().start();
- ClientTask:
public class ClientTask extends Thread implements MsgPool.MsgComingListener
让线程类作为消息队列的观察者,
负责接收被观察者的通知信息并做socket 通信;
类中:
- 1/3 构造方法:
接收TcpServer对过来的socket对象,
用之初始化其IO流;
- 2/3 **`run()`:`<读取Client的 I流,加入 MsgPool.mQueue>`
封装输入流,
读取客户端发送过来的信息
并输出:while ((line = br.readLine()) != null){...}
System.out.println(...);
把信息发送加入到消息队列:MsgPool.getInstance().sendMsg(...);
如果没有客户端发送信息,
readLine() 便会阻塞(注意这里会阻塞!所以要放在子线程!)在原地
**
- 3/3 **`onMsgComing(String msg)`:`<取出 MsgPool.mQueue,写入Client的 O流>`
作为消息队列的观察者对应的更新方法,
消息队列中最新的消息会推送通知到这里的msg参数,
(消息队列类有一个子线程死循环阻塞读取队头,String msg = mQueue.take();
notifyMsgComing(msg);
notifyMsgComing
中遍历所有已注册的观察者,
遍历时调用观察者的onMsgComing(msg)
,
正是本方法!!!)
本方法中拿到最新的推送消息后,
写进输出流,发送给对应的 TCP 连接的客户一端
的 socket
**
class MsgPool
消息列表类- **实现单例模式
private static MsgPool mInstance = new MsgPool();
`public static MsgPool getInstance() {
return mInstance;
}`
`private MsgPool() {
}`**
- **准备消息列表底层数据结构:
private LinkedBlockingQueue<String> mQueue = new LinkedBlockingQueue<>();
**
- **sendMsg(String msg):
当有消息过来时,即客户端接收到消息时,
就把消息发送(添加)到消息队列中:mQueue.put(msg);
在ClientTask
的run()
中调用
本方法!!!;**
- **start()
启动读取消息的子线程
,在后台不断运行,
死循环 阻塞 读取队头,
一有消息取出就通知所有已注册的观察者,String msg = mQueue.take();
notifyMsgComing(msg);
在TcpServer中一开始配置好服务ip和端口就调用了;**
- **实现被观察者通知方法:`notifyMsgComing(String msg)`
实现被观察者方法,添加观察者到列表:public void addMsgComingListener(MsgComingListener listener)
观察者接口MsgComingListener
被观察者列表private List<MsgComingListener> mListeners = new ArrayList<>();
**
客户端诸类代码实现概述(TcpClientBiz、TcpActivity)
- TcpClientBiz:
连接Server端,
后台子线程不断接收Server端发送过来的信息,
前台封装、提供向Server端发送信息的方法
- 准备一个绑定了`mainLooper`的`Handler`
- **定义`<回调机制>`
回调接口及其抽象方法;
声明 全局 回调接口变量;
回调接口置入函数;
`setOnMsgComingListener(onMsgComingListener listener) {
mListener = listener;
}`**
- **构造方法:**
- **开启子线程!!!,
配置连接到Server端
的socket; mSocket = new Socket("***.**.*.**", 9090);
**
- **通过socket获得IO流;
(以上,socket,IO流都初始化给全局变量)**
- **使用全局 回调接口变量,
抽象调用业务方法;(Toast提醒、Error处理之类)**
- **调用`readServerMsg()`!!!;**
- **`readServerMsg()`
一旦本类被实例化,就会被启动!!!
开启一个子线程,
拿着全局变量I流,封装成BufferReader,
死循环 阻塞等待 读取Server端信息 while ((line = br.readLine()) != null)
一旦有信息,
借助Handler.post(),
使用全局 回调接口变量抽象调用接口方法onMsgComing()
通过回调机制交给Activity层处理;**
- `sendMsg(final String msg)`
开启一个子线程,
拿着全局变量O流,封装成BufferWriter,
把参数msg 写入BufferWriter(O流),发送给Server端;
调用时机:在要发送消息给Server 的时候调用,
一般是EditText 右边的按钮被点击的时候
- onDestroy():
回收socket、IO流
- **TcpActivity
主要是各种组件的配置,
注意几点即可:**
- **需要实例化一个全局TcpClientBiz实例
然后用匿名内部类实现回调接口及其方法,
再set 给TcpClientBiz实例;**
- **点击按钮时把EditText的内容发送给Server端;
msg = mEtMsg.getText().toString();
mTcpClientBiz.sendMsg(msg);
**
- **onDestroy()中调用`mTcpClientBiz.onDestroy();`回收资源**
**所有的客户端都可发送消息到队列中,
然后所有的客户端都在等待
消息队列的消息新增(mQueue.put())这个时刻,
消息队列一新增消息,
即一接收到某个客户端发送过来消息(mQueue.put()),
则消息都会一次性转发给所有客户端,
所以这里涉及到一个观察者设计模式,
消息队列(MsgPool)或消息(Msg)是被观察者,
所有客户端处理线程(ClientTask)都是观察者**观察者模式实现小结:
**观察者接口准备更新(数据或UI的)方法;
被观察者接口准备三个抽象方法;
观察者实现类具体实现更新逻辑,可以有参数,参数为更新需要的数据;
被观察者实现类准备一个观察者List以及实现三个方法:
1.观察者注册方法:
参数为某观察者,功能是把观察者参数加到观察者List中;
2.注销观察者方法:
参数为某观察者,功能是把观察者参数从观察者List中移除;
3.通知观察者方法:无参数或者把需要通知的数据作为参数,
功能是遍历所有已注册的观察者,
即遍历 注册添加到 观察者List中的观察者,逐个调用List中所有观察者的更新方法;即一次性更新所有已注册的观察者!
使用时,
实例化一个被观察者和若干个观察者,
将所有观察者注册到被观察者处,
调用被观察者的通知方法,一次性更新所有已注册的观察者!**