【Golang 快速入门】项目实战:即时通信系统

简介: Golang 实战之即时通信系统服务端版本一:构建基础 Server版本二:用户上线功能版本三:用户消息广播机制版本四:用户业务层封装版本五:在线用户查询版本六:修改用户名版本七:超时强退功能版本八:私聊功能
学习视频: 8 小时转职 Golang 工程师,这门课很适合有一定开发经验的小伙伴,强推!
完整代码: https://gitee.com/szluyu99/golang-study

即时通信系统 - 服务端

项目架构图:

版本迭代:

  • 版本一:构建基础 Server
  • 版本二:用户上线功能
  • 版本三:用户消息广播机制
  • 版本四:用户业务层封装
  • 版本五:在线用户查询
  • 版本六:修改用户名
  • 版本七:超时强踢功能
  • 版本八:私聊功能
  • 版本九:客户端实现

版本一:构建基础 Server

server.go,其中包含以下内容:

  • 定义 Server 结构体,包含 IP、Port 字段
  • NewServer(ip string, port int) 创建 Server 对象的方法
  • (s *Server) Start() 启动 Server 服务的方法
  • (s *Server) Handler(conn net.Conn) 处理连接业务
package main

import (
    "fmt"
    "net"
)

type Server struct {
    Ip   string
    Port int
}

// 创建一个server的接口
func NewServer(ip string, port int) *Server {
    server := &Server{
        Ip:   ip,
        Port: port,
    }
    return server
}

func (s *Server) Handler(conn net.Conn) {
    // 当前连接的业务
    fmt.Println("连接建立成功!")
}

// 启动服务器的接口
func (s *Server) Start() {
    // socket listen
    listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.Ip, s.Port))
    if err != nil {
        fmt.Println("net.Listen err: ", err)
        return
    }
    // close listen socket
    defer listener.Close()

    for {
        // accpet
        conn, err := listener.Accept()
        if err != nil {
            fmt.Println("listener accept err: ", err)
            continue
        }
        // do handler
        go s.Handler(conn)
    }

}

main.go,启动我们编写的 Server:

package main

func main() {
    server := NewServer("127.0.0.1", 8888)
    server.Start()
}
以下命令都是在 Linux 或 macOS 下运行,Windows 略有不同

同时编译编写的两个文件:go build -o server main.go server.go

然后运行编译出的文件:./server

使用命令侦听我们构建的服务:nc 127.0.0.1 8888

版本二:用户上线功能

user.go:

  • NewUser(conn net.Conn) *User 创建一个 user 对象
  • (u *User) ListenMessage() 监听 user 对应的 channel 消息
type User struct {
    Name string
    Addr string
    C    chan string
    conn net.Conn
}

// 创建一个用户的API
func NewUser(conn net.Conn) *User {
    userAddr := conn.RemoteAddr().String()

    user := &User{
        Name: userAddr,
        Addr: userAddr,
        C:    make(chan string),
        conn: conn,
    }

    // 启动监听当前user channel消息的goroutine
    go user.ListenMessage()

    return user
}

// 监听当前user channel的方法,一旦有消息,直接发送给客户端
func (u *User) ListenMessage() {
    for {
        msg := <-u.C
        u.conn.Write([]byte(msg + "\n"))
    }
}

Server.go:

  • 新增 OnlineMap 和 Message 属性
  • 在处理客户端上线的 Handler 创建并添加用户
  • 新增广播消息方法
  • 新增监听广播消息 channel 方法
  • 用一个 goroutine 单独监听 Message
type Server struct {
    Ip   string
    Port int

    // 在线用户的列表
    OnlineMap map[string]*User
    mapLock   sync.RWMutex

    // 消息广播的channel
    Message chan string
}

// 创建一个server的接口
func NewServer(ip string, port int) *Server {
    server := &Server{
        Ip:        ip,
        Port:      port,
        OnlineMap: make(map[string]*User),
        Message:   make(chan string),
    }
    return server
}

// 监听Message广播消息的channel的goroutine,一旦有消息就发送给全部的在线user
func (s *Server) ListenMessager() {
    for {
        msg := <-s.Message
        // 将msg发送给全部的在线user
        s.mapLock.Lock()
        for _, cli := range s.OnlineMap {
            cli.C <- msg
        }
        s.mapLock.Unlock()
    }
}

// 广播消息的方法
func (s *Server) BroadCast(user *User, msg string) {
    sendMsg := "[" + user.Addr + "]" + user.Name + ":" + msg

    s.Message <- sendMsg
}

func (s *Server) Handler(conn net.Conn) {
    // 当前连接的业务
    // fmt.Println("连接建立成功!")

    user := NewUser(conn)

    // 用户上线,将用户加入到onlineMap中
    s.mapLock.Lock()
    s.OnlineMap[user.Name] = user
    s.mapLock.Unlock()

    // 广播当前用户上线消息
    s.BroadCast(user, "已上线")

    // 当前handler阻塞
    select {}
}

// 启动服务器的接口
func (s *Server) Start() {
    // socket listen
    listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.Ip, s.Port))
    if err != nil {
        fmt.Println("net.Listen err: ", err)
        return
    }
    // close listen socket
    defer listener.Close()

    // 启动监控Message的goroutine
    go s.ListenMessager()

    for {
        // accpet
        conn, err := listener.Accept()
        if err != nil {
            fmt.Println("listener accept err: ", err)
            continue
        }
        // do handler
        go s.Handler(conn)
    }

}

学习到的编程思路:

  • 结构体中的 channel 基本都需要开个循环去监听其变化(尝试取出值,发送给其他 channel)

版本三:用户消息广播机制

server.go:完善 handle 处理业务方法,启动一个针对当前客户端的读 routine

版本四:用户业务层封装

user.go:

  • user 类型新增 server 关联
  • 新增 Online、Offline、DoMessage 方法
type User struct {
    Name string
    Addr string
    C    chan string
    conn net.Conn

    server *Server
}

// 创建一个用户的API
func NewUser(conn net.Conn, server *Server) *User {
    userAddr := conn.RemoteAddr().String()

    user := &User{
        Name:   userAddr,
        Addr:   userAddr,
        C:      make(chan string),
        conn:   conn,
        server: server,
    }

    // 启动监听当前user channel消息的goroutine
    go user.ListenMessage()

    return user
}

// 用户的上线业务
func (u *User) Online() {
    // 用户上线,将用户加到onlineMap中
    u.server.mapLock.Lock()
    u.server.OnlineMap[u.Name] = u
    u.server.mapLock.Unlock()

    // 广播当前用户上线消息
    u.server.BroadCast(u, "已上线")
}

// 用户的下线业务
func (u *User) Offline() {
    // 用户下线,将用户从onlineMap中删除
    u.server.mapLock.Lock()
    delete(u.server.OnlineMap, u.Name)
    u.server.mapLock.Unlock()

    // 广播当前用户下线消息
    u.server.BroadCast(u, "已下线")
}

// 用户处理消息的业务
func (u *User) DoMessage(msg string) {
    u.server.BroadCast(u, msg)
}

// 监听当前user channel的方法,一旦有消息,直接发送给客户端
func (u *User) ListenMessage() {
    for {
        msg := <-u.C
        u.conn.Write([]byte(msg + "\n"))
    }
}

server.go:

  • 使用 user 封装好的业务替换之前的代码
func (s *Server) Handler(conn net.Conn) {
    // 当前连接的业务
    // fmt.Println("连接建立成功!")

    user := NewUser(conn, s)

  // 用户上线
    user.Online()

    // 接受客户端发送的消息
    go func() {
        buf := make([]byte, 4096)
        for {
            n, err := conn.Read(buf)
            if n == 0 {
        // 用户下线
                user.Offline()
                return
            }
            if err != nil && err != io.EOF {
                fmt.Println("Conn Read err:", err)
                return
            }

            // 提取用户的消息(去除'\n')
            msg := string(buf[:n-1])

            // 将得到的消息进行广播
            user.DoMessage(msg)
        }
    }()

    // 当前handler阻塞
    select {}
}

版本五:在线用户查询

若某个用户输入的消息为 who 则查询当前在线用户列表。

user.go:

  • 提供 SendMsg 向对象客户端发送消息 API
func (u *User) SendMsg(msg string) {
    u.conn.Write([]byte(msg))
}
  • 在 DoMessage() 方法中,加上对 "who" 指令的处理,返回在线用户信息
func (u *User) DoMessage(msg string) {
    if msg == "who" {
        // 查询当前在线用户都有哪些
        u.server.mapLock.Lock()
        for _, user := range u.server.OnlineMap {
            onlineMsg := "[" + user.Addr + "]" + user.Name + ":" + "在线...\n"
            u.SendMsg(onlineMsg)
        }
        u.server.mapLock.Unlock()
    } else {
        u.server.BroadCast(u, msg)
    }
}

版本六:修改用户名

若某个用户输入的消息为 rename张三 则将自己的 Name 修改为张三。

user.go:

  • 在 DoMessage() 方法中,加上对 "rename|张三" 指令的处理
func (u *User) DoMessage(msg string) {
    if msg == "who" {
        // 查询当前在线用户都有哪些
        u.server.mapLock.Lock()
        for _, user := range u.server.OnlineMap {
            onlineMsg := "[" + user.Addr + "]" + user.Name + ":" + "在线...\n"
            u.SendMsg(onlineMsg)
        }
        u.server.mapLock.Unlock()
    } else if len(msg) > 7 && msg[:7] == "rename|" {
        // 消息格式:rename|张三
        newName := strings.Split(msg, "|")[1]
        // 判断name是否存在
        _, ok := u.server.OnlineMap[newName]
        if ok {
            u.SendMsg("当前用户名被使用\n")
        } else {
            u.server.mapLock.Lock()
            delete(u.server.OnlineMap, newName)
            u.server.OnlineMap[newName] = u
            u.server.mapLock.Unlock()

            u.Name = newName
            u.SendMsg("您已经更新用户名:" + u.Name + "\n")
        }
    } else {
        u.server.BroadCast(u, msg)
    }
}

版本七:超时强推功能

用户的任意消息表示用户为活跃,长实践不发消息认为超时,就才一强制关闭用户连接。

server.go:

  • 在用户 Handler() goroutine 中,添加活跃用户 channel,一旦用户有消息,就向该 channel 发送数据
func (s *Server) Handler(conn net.Conn) {
    // 当前连接的业务
    // fmt.Println("连接建立成功!")

    user := NewUser(conn, s)

    user.Online()

    // 监听用户是否活跃的channel
    isLive := make(chan bool)

    // 接受客户端发送的消息
    go func() {
        buf := make([]byte, 4096)
        for {
            n, err := conn.Read(buf)
            if n == 0 {
                user.Offline()
                return
            }
            if err != nil && err != io.EOF {
                fmt.Println("Conn Read err:", err)
                return
            }

            // 提取用户的消息(去除'\n')
            msg := string(buf[:n-1])

            // 用户针对msg进行消息处理
            user.DoMessage(msg)

            // 用户的任意消息,代表当前用户是活跃状态
            isLive <- true
        }
    }()

    // 当前handler阻塞
    for {
        select {
        case <-isLive:
            // 当前用户是活跃的,应该重置定时器
            // 不做任何事情,为了激活select,更新下面的定时器
        case <-time.After(time.Second * 10): // 10s后触发定时器
            // 已经超时
            // 将当前的user强制关闭
            user.SendMsg("你被踢了。")

            // 销毁资源
            close(user.C)

            // 关闭连接
            conn.Close()

            // 退出当前Handler
            // runtime.Goexit()
            return
        }
    }
}

版本八:私聊功能

消息格式:to|张三|你好啊,我是...

user.go,在 DoMessage() 方法中,加上对 "to|张三|你好啊" 指令的处理:

func (this *User) DoMessage(msg string) {
    if msg == "who" {
        //查询当前在线用户都有哪些

        this.server.mapLock.Lock()
        for _, user := range this.server.OnlineMap {
            onlineMsg := "[" + user.Addr + "]" + user.Name + ":" + "在线...\n"
            this.SendMsg(onlineMsg)
        }
        this.server.mapLock.Unlock()

    } else if len(msg) > 7 && msg[:7] == "rename|" {
        //消息格式: rename|张三
        newName := strings.Split(msg, "|")[1]

        //判断name是否存在
        _, ok := this.server.OnlineMap[newName]
        if ok {
            this.SendMsg("当前用户名被使用\n")
        } else {
            this.server.mapLock.Lock()
            delete(this.server.OnlineMap, this.Name)
            this.server.OnlineMap[newName] = this
            this.server.mapLock.Unlock()

            this.Name = newName
            this.SendMsg("您已经更新用户名:" + this.Name + "\n")
        }

    } else {
        this.server.BroadCast(this, msg)
    }
}

即时通信系统 - 客户端

以下代码都是在 client.go 文件中

客户端类型定义与链接

client.go:

type Client struct {
    ServerIp   string
    ServerPort int
    Name       string
    conn       net.Conn
}

func NewClient(serverIp string, serverPort int) *Client {
    // 创建客户端对象
    client := &Client{
        ServerIp:   serverIp,
        ServerPort: serverPort,
    }
    // 连接server
    conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", serverIp, serverPort))
    if err != nil {
        fmt.Println("net.Dial error:", err)
        return nil
    }
    client.conn = conn
    // 返回对象
    return client
}

func main() {
    client := NewClient("127.0.0.1", 8888)
    if client == nil {
        fmt.Println(">>>>> 连接服务器失败")
        return
    }
    fmt.Println(">>>>> 连接服务器成功")

    // 启动客户端业务
    select {}
}

编译指令:go build -o client client.go

运行编译后的文件:./client

解析命令行

在 init 函数中初始化命令行参数并解析:

var serverIp string
var serverPort int

func init() {
    flag.StringVar(&serverIp, "ip", "127.0.0.1", "设置服务器IP地址(默认是127.0.0.1)")
    flag.IntVar(&serverPort, "port", 8888, "设置服务器端口(默认是8888)")

    // 命令行解析
    flag.Parse()
}

然后在运行客户端时可以通过 命令行传参运行:

./client -ip 127.0.0.1 -port 8888

菜单显示

给 Client 新增 flag 属性:

type Client struct {
    ServerIp   string
    ServerPort int
    Name       string
    conn       net.Conn
    flag       int // 当前客户端的模式
}

新增 menu() 方法,获取用户输入的模式:

// 菜单
func (client *Client) menu() bool {
    var flag int

    fmt.Println("1.公聊模式")
    fmt.Println("2.私聊模式")
    fmt.Println("3.更新用户名")
    fmt.Println("0.退出")

    fmt.Scanln(&flag)

    if flag >= 0 && flag <= 3 {
        client.flag = flag
        return true
    } else {
        fmt.Println(">>>>请输入合法范围内的数字<<<<")
        return false
    }
}

新增 Run() 主业务循环:

func (client *Client) Run() {
    for client.flag != 0 {
        for !client.menu() {
        }

        // 根据不同的模式处理不同的业务
        switch client.flag {
        case 1:
            // 公聊模式
            fmt.Println("公聊模式")
        case 2:
            // 私聊模式
            fmt.Println("私聊模式")
        case 3:
            // 更新用户名
            fmt.Println("更新用户名")
        }
    }
    fmt.Println("退出!")
}

更新用户名

新增 UpdateName() 更新用户名:

func (client *Client) UpdateName() bool {
    fmt.Println(">>>>请输入用户名:")
    fmt.Scanln(&client.Name)

    sendMsg := "rename|" + client.Name + "\n" // 封装协议
    _, err := client.conn.Write([]byte(sendMsg))
    if err != nil {
        fmt.Println("conn.Write err: ", err)
        return false
    }

    return true
}

添加 server 回执消息方法 DealResponse()

// 处理server回应的消息,直接显示到标准输出
func (client *Client) DealResponse() {
    // 一旦client.conn有数据,直接copy到stdout标准输出上,永久阻塞监听
    io.Copy(os.Stdout, client.conn)
}

在 main 中开启一个 goroutine,去承载 DealResponse() 流程:

func main() {
    client := NewClient(serverIp, serverPort)
    if client == nil {
        fmt.Println(">>>>> 连接服务器 失败")
        return
    }
    fmt.Println(">>>>> 连接服务器成功")

    // 单独开启一个goroutine去处理server的回执消息
    go client.DealResponse()

    // 启动客户端业务
    client.Run()
}

公聊模式

新增 PublicChat() 公聊模式:

func (client *Client) PublicChat() {
    // 提示用户输入消息
    var chatMsg string

    fmt.Println(">>>>请输入聊天内容,exit退出.")
    fmt.Scanln(&chatMsg)

    for chatMsg != "exit" {
        // 发给服务器
        // 消息不为空立即发送
        if len(chatMsg) != 0 {
            sendMsg := chatMsg + "\n"
            _, err := client.conn.Write([]byte(sendMsg))
            if err != nil {
                fmt.Println("conn Write err: ", err)
                break
            }
        }
        chatMsg = ""
        fmt.Println(">>>>请输入聊天内容,exit退出.")
        fmt.Scanln(&chatMsg)
    }
}

私聊模式

查询当前有哪些用户在线:

func (client *Client) SelectUsers() {
    sendMsg := "who\n"
    _, err := client.conn.Write([]byte(sendMsg))
    if err != nil {
        fmt.Println("conn Write err: ", err)
        return
    }
}

新增私聊业务:

func (client *Client) PrivateChat() {
    var remoteName string
    var chatMsg string

    client.SelectUsers()
    fmt.Println(">>>>请输入聊天对象的[用户名], exit退出: ")
    fmt.Scanln(&remoteName)

    for remoteName != "exit" {
        fmt.Println(">>>>请输入消息内容,exit退出:")
        fmt.Scanln(&chatMsg)

        for chatMsg != "exit" {
            // 消息不为空则发送
            if len(chatMsg) != 0 {
                sendMsg := "to|" + remoteName + "|" + chatMsg + "\n\n"
                _, err := client.conn.Write([]byte(sendMsg))
                if err != nil {
                    fmt.Println("conn Write err: ", err)
                    break
                }
            }
            chatMsg = ""
            fmt.Println(">>>>请输入消息内容,exit退出:")
            fmt.Scanln(&chatMsg)
        }

        client.SelectUsers()
        fmt.Println(">>>>请输入聊天对象的[用户名], exit退出: ")
        fmt.Scanln(&remoteName)

    }

}
相关文章
|
Unix Go
Golang 语言中怎么拦截系统信号和优雅退出 http server?
Golang 语言中怎么拦截系统信号和优雅退出 http server?
84 0
|
缓存 机器人 Go
Golang 语言 beego 学习之安装和快速入门
Golang 语言 beego 学习之安装和快速入门
75 0
|
2月前
|
Go
Golang语言之管道channel快速入门篇
这篇文章是关于Go语言中管道(channel)的快速入门教程,涵盖了管道的基本使用、有缓冲和无缓冲管道的区别、管道的关闭、遍历、协程和管道的协同工作、单向通道的使用以及select多路复用的详细案例和解释。
111 4
Golang语言之管道channel快速入门篇
|
2月前
|
Go
Golang语言文件操作快速入门篇
这篇文章是关于Go语言文件操作快速入门的教程,涵盖了文件的读取、写入、复制操作以及使用标准库中的ioutil、bufio、os等包进行文件操作的详细案例。
66 4
Golang语言文件操作快速入门篇
|
2月前
|
Go
Golang语言之映射(map)快速入门篇
这篇文章是关于Go语言中映射(map)的快速入门教程,涵盖了map的定义、创建方式、基本操作如增删改查、遍历、嵌套map的使用以及相关练习题。
36 5
|
2月前
|
Go
Golang语言之切片(slice)快速入门篇
这篇文章是关于Go语言中切片(slice)的快速入门教程,详细介绍了切片的概念、定义方式、遍历、扩容机制、使用注意事项以及相关练习题。
31 5
|
2月前
|
Go
Golang语言之数组(array)快速入门篇
这篇文章是关于Go语言中数组的详细教程,包括数组的定义、遍历、注意事项、多维数组的使用以及相关练习题。
33 5
|
3月前
|
Go 开发者
|
4月前
|
自然语言处理 Go 数据处理
云计算自旋锁问题之引入Golang插件系统后iLogtail的输入输出通道和处理能力如何解决
云计算自旋锁问题之引入Golang插件系统后iLogtail的输入输出通道和处理能力如何解决
36 1
|
6月前
|
Go
第十三章 Golang客户信息关系系统
第十三章 Golang客户信息关系系统
58 3