简介
流媒体如今已经成为工业上的一个重要技术了,比如:直播网站、视频监控传输、APP直播等,如何实现一个高并发的视频网站,这就涉及到语言技术的选型以及流媒体技术的使用,本节将主要介绍如何使用Golang来实现一个流媒体视频网站。
大纲
- 背景
- 为什么选择Go以及Go的一些优势
- GoLang的简介以及实现一个webserver工具链
- Golang的channel并发模式
- 用Golang完成一个流媒体网站
- 网站部署
背景
为什么选择Go以及Go的一些优势
为什么会选择Go来开发视频网站呢?这其实主要体现在Go语言的优势。那么Go有哪些优势呢?
- 开发效率高,不管用其他语言,都需要很多其他的配置或插件,就连全家桶配套齐全的Java语言都会需要一个Servlet引擎,如:tomcat、jetty等。但Go在这方面,提供了得天独厚的能力。大部分功能和内容已经集成在了pkg。包括开发完整的开发工具链(tools、test、benchmark、builtin.etc),包括Go命令(go test、go install、go build)。这些都是完整的,直接下载Go后即可使用。
- 另一方面,部署简单,go属于编译性语言,而且是能够编译多个平台可执行文件的语言。Compile once,run everywhere,直接编译后生成二进制文件,直接运行。
- 良好的native http库、集成模板引擎,无需添加第三方框架。
GoLang的简介以及实现一个webserver工具链
Go语言是一种编译性语言,而且它的目标是兼具Python等动态语言的开发速度和集成C/C++等编译语言的性能与安全性。
Go中有一些常见的工具链,比如:
- go build,编译go文件,可以跨平台编译:env GOOS=linux GOARCH=amd64 go build,在CI/CD中,这是一个非常有用的命令。
- go install,这也是编译,但与build的区别是编译后将输出文件打包成库放在pkg下。
- go get,用于获取go的第三方包,常见的是:go get -u git地址,表示从git上获取某个资源并安装到本地。
- go fmt,统一代码风格、排版。
- go test,运行当前目录下的tests,“go test -v” 会打印所有的信息。
- go的test文件一般以XXX_test.go命名
要点:
- 使用TestMain作为初始化test,并且使用Run()来调用其它tests可以完成一些需要初始化操作的testing,如:数据库、文件加载等。
func TestMain(m *testing.M) {
fmt.Println("Test begin")
m.Run()
}
- 如果没在其中加Run(),除了TestMain的其它的tests都不被执行。
func TestPrint(t *testing.T) {
fmt.Println("Test print")
}
func TestMain(m *testing.M) {
fmt.Println("Test begin")
//m.Run()
}
按照上面说的,如果没有执行Run()方法,则TestPrint函数不会被执行。
Golang的channel并发模式
在 Go 中,既然有了协程,那么这些协程之间如何通信呢?Go 提供了一个 channel(通道) 来解决。
声明一个 channel
在 Go 语言中,声明一个 channel 非常简单,使用内置的 make 函数即可,如下:
ch:=make(chan string)
其中 chan 是一个关键字,表示是 channel 类型。后面的 string 表示 channel 里的数据是 string 类型。通过 channel 的声明也可以看到,chan 是一个集合类型。
定义好 chan 后就可以使用了,一个 chan 的操作只有两种:发送和接收:
- 发送:向 chan 发送值,把值放在 chan 中,操作符为 chan <-
- 接收:获取 chan 中的值,操作符为 <- chan
示例:
package main
import "fmt"
func main() {
ch := make(chan string)
go func() {
fmt.Println("码疯窝在香嗝喱辣")
ch <- "发送数据者:码疯窝在香嗝喱辣"
}()
fmt.Println("I am main goroutine")
v := <- ch
fmt.Println("接收到的chan中的值为:",v)
}
我们先来执行看看打印结果:
I am main goroutine
码疯窝在香嗝喱辣
接收到的chan中的值为:送数据者:码疯窝在香嗝喱辣
从运行结果可以看出:达到了使用 time.Sleep 函数的效果。
相信应该明白为什么程序不会在新的 goroutine 完成之前退出了,因为通过 make 创建的 chan 中没有值,而 main goroutine 又想从 chan 中获取值,获取不到就一直等待,等到另一个 goroutine 向 chan 发送值为止。
无缓冲 channel
上面的示例中,使用 make 创建的 chan 就是一个无缓冲 channel,它的容量是 0,不能存储任何数据。所以无缓冲 channel 只起到传输数据的作用,数据并不会在 channel 中做任何停留。这也意味着,无缓冲 channel 的发送和接收操作是同时进行的,它也被称为同步 channel。
有缓冲 channel
有缓冲 channel 类似一个可阻塞的队列,内部的元素先进先出。通过 make 函数的第二个参数可以指定 channel 容量的大小,进而创建一个有缓冲 channel,如:
cacheCh := make(chan int,5)
定义了一个容量为 5 的元素为 int 类型的 chan。
一个有缓冲 channel 具备以下特点:
- 有缓冲 channel 的内部有一个缓冲队列
- 发送操作是向队列的尾部插入元素,如果队列已满,则阻塞等待,直到另一个 goroutine 执行,接收操作释放队列的空间
- 接收操作是从队列的头部获取元素并把它从队列中删除,如果队列为空,则阻塞等待,直到另一个 goroutine 执行,发送操作插入新的元素
cache := make(chan int,5)
cache <- 2
cache <- 3
fmt.Println("容量:",cap(cache),",元素个数:",len(cache))
无缓冲 channel 其实就是一个容量大小为 0 的 channel。比如 make(chan int,0)
关闭 channel
通过内置函数 close 即可关闭 channel。如果一个 channel 被关闭了,就不能向里面发送数据了,如果发送的话,会引起 painc 异常。但是还可以接收 channel 里的数据,如果 channel 里没有数据的话,接收的数据是元素类型的零值。
单向 channel
所谓单向,即可要不发送,要么只能接收。所以单向 channel 的声明也很简单,只需要在声明的时候带上 <- 操作符即可,如下:
send := make(chan <- int)
receive := make(<- chan int)
用Golang完成一个流媒体网站
业务模块
API接口设计
- 分层
- Restful风格设计
- CRUD区分资源操作
- 返回码规范
首先,我们写个启动类:
package main
import (
"net/http"
"github.com/julienschmidt/httprouter"
)
type middleWareHandler struct {
r *httprouter.Router
}
func NewMiddleWareHandler(r *httprouter.Router) http.Handler {
m := middleWareHandler{}
m.r = r
return m
}
func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//check session
validateUserSession(r)
m.r.ServeHTTP(w, r)
}
func RegisterHandlers() *httprouter.Router {
router := httprouter.New()
router.POST("/user", CreateUser)
router.POST("/user/:user_name", Login)
return router
}
func main() {
r := RegisterHandlers()
mh := NewMiddleWareHandler(r)
http.ListenAndServe(":1000", mh)
}
在这里我们实现了注册、登录以及一些初始化监听端口。接下来,我们需要看看对于后端视频处理时,主要关心的是session:
package session
import (
"time"
"sync"
"github.com/avenssi/video_server/api/defs"
"github.com/avenssi/video_server/api/dbops"
"github.com/avenssi/video_server/api/utils"
)
var sessionMap *sync.Map
func init() {
sessionMap = &sync.Map{}
}
func nowInMilli() int64{
return time.Now().UnixNano()/1000000
}
func deleteExpiredSession(sid string) {
sessionMap.Delete(sid)
dbops.DeleteSession(sid)
}
func LoadSessionsFromDB() {
r, err := dbops.RetrieveAllSessions()
if err != nil {
return
}
r.Range(func(k, v interface{}) bool{
ss := v.(*defs.SimpleSession)
sessionMap.Store(k, ss)
return true
})
}
func GenerateNewSessionId(un string) string {
id, _ := utils.NewUUID()
ct := nowInMilli()
ttl := ct + 30 * 60 * 1000// Severside session valid time: 30 min
ss := &defs.SimpleSession{Username: un, TTL: ttl}
sessionMap.Store(id, ss)
dbops.InsertSession(id, ttl, un)
return id
}
func IsSessionExpired(sid string) (string, bool) {
ss, ok := sessionMap.Load(sid)
if ok {
ct := nowInMilli()
if ss.(*defs.SimpleSession).TTL < ct {
deleteExpiredSession(sid)
return "", true
}
return ss.(*defs.SimpleSession).Username, false
}
return "", true
}
从上面的代码中,可以看到,Go主要引用了相关的视频插件库:avenssi/video_server等来处理缓存session。这也是为什么选择go开发后端的一个原因。
同时,我们还定义了一个错误码信息:
package defs
type Err struct {
Error string `json:"error"`
ErrorCode string `json:"error_code"`
}
type ErrResponse struct {
HttpSC int
Error Err
}
var (
ErrorRequestBodyParseFailed = ErrResponse{HttpSC: 400, Error: Err{Error: "Request body is not correct", ErrorCode: "001"}}
ErrorNotAuthUser = ErrResponse{HttpSC: 401, Error: Err{Error: "User authentication failed.", ErrorCode: "002"}}
ErrorDBError = ErrResponse{HttpSC: 500, Error: Err{Error: "DB ops failed", ErrorCode: "003"}}
ErrorInternalFaults = ErrResponse{HttpSC: 500, Error: Err{Error: "Internal service error", ErrorCode: "004"}}
)
以上对于业务层中处理主要逻辑就是这些了,下面主要讲scheduler和stream server。
scheduler
scheduler主要是来调度任务,那么主要是哪些任务呢?主要是那些普通api无法立即给结果的任务。比如:我们视频网站需要一些视频审核、数据恢复的需求。这时候,我们需要做一些short delay,用户看不到,但后台还是存在的。这就需要scheduler异步处理。还比如有些周期性的任务。
在Scheduler中,还存在Timer,定时器主要用来作定时处理task的。
在本小节中,我们采用runner的生产、消费者模式实现。具体代码如下:
package taskrunner
import (
)
type Runner struct {
Controller controlChan
Error controlChan
Data dataChan
dataSize int
longLived bool
Dispatcher fn
Executor fn
}
func NewRunner(size int, longlived bool, d fn, e fn) *Runner {
return &Runner {
Controller: make(chan string, 1),
Error: make(chan string, 1),
Data: make(chan interface{}, size),
longLived: longlived,
dataSize: size,
Dispatcher: d,
Executor: e,
}
}
func (r *Runner) startDispatch() {
defer func() {
if !r.longLived {
close(r.Controller)
close(r.Data)
close(r.Error)
}
}()
for {
select {
case c :=<- r.Controller:
if c == READY_TO_DISPATCH {
err := r.Dispatcher(r.Data)
if err != nil {
r.Error <- CLOSE
} else {
r.Controller <- READY_TO_EXECUTE
}
}
if c == READY_TO_EXECUTE {
err := r.Executor(r.Data)
if err != nil {
r.Error <- CLOSE
} else {
r.Controller <- READY_TO_DISPATCH
}
}
case e :=<- r.Error:
if e == CLOSE {
return
}
default:
}
}
}
func (r *Runner) StartAll() {
r.Controller <- READY_TO_DISPATCH
r.startDispatch()
}
Runner是可以复用的,而接下来介绍的Task是定制Runner的。比如:我们延迟删除视频。
我们先拿到数据,看看:
package dbops
import (
"log"
_ "github.com/go-sql-driver/mysql"
)
func ReadVideoDeletionRecord(count int) ([]string, error) {
stmtOut, err := dbConn.Prepare("SELECT video_id FROM video_del_rec LIMIT ?")
var ids []string
if err != nil {
return ids, err
}
rows, err := stmtOut.Query(count)
if err != nil {
log.Printf("Query VideoDeletionRecord error: %v", err)
return ids, err
}
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return ids, err
}
ids = append(ids, id)
}
defer stmtOut.Close()
return ids, nil
}
func DelVideoDeletionRecord(vid string) error {
stmtDel, err := dbConn.Prepare("DELETE FROM video_del_rec WHERE video_id=?")
if err != nil {
return err
}
_, err = stmtDel.Exec(vid)
if err != nil {
log.Printf("Deleting VideoDeletionRecord error: %v", err)
return err
}
defer stmtDel.Close()
return nil
}
拿到数据后,需要处理,这时需要task:
package taskrunner
import (
"os"
"errors"
"log"
"sync"
"github.com/avenssi/video_server/scheduler/dbops"
)
func deleteVideo(vid string) error {
err := os.Remove(VIDEO_PATH + vid)
if err != nil && !os.IsNotExist(err) {
log.Printf("Deleting video error: %v", err)
return err
}
return nil
}
func VideoClearDispatcher(dc dataChan) error {
res, err := dbops.ReadVideoDeletionRecord(3)
if err != nil {
log.Printf("Video clear dispatcher error: %v", err)
return err
}
if len(res) == 0 {
return errors.New("All tasks finished")
}
for _, id := range res {
dc <- id
}
return nil
}
func VideoClearExecutor(dc dataChan) error {
errMap := &sync.Map{}
var err error
forloop:
for {
select {
case vid :=<- dc:
go func(id interface{}) {
if err := deleteVideo(id.(string)); err != nil {
errMap.Store(id, err)
return
}
if err := dbops.DelVideoDeletionRecord(id.(string)); err != nil {
errMap.Store(id, err)
return
}
}(vid)
default:
break forloop
}
}
errMap.Range(func(k, v interface{}) bool {
err = v.(error)
if err != nil {
return false
}
return true
})
return err
}
以上就是关于异步、定时处理视频流信息过程。
stream server
- Streaming
- Upload files
Streaming主要区别于普通的链接,它需要保持长链接,与短链接是不一样的,当发送一个request过来,会不断与客户端输出数据流,而且会很长。所以在多路长链接同时保持的时候,出现一个问题,如果不断的发起链接、打开网页,最终会把我们的服务给crash掉,所以,我们需要进行流控:limit,这里的流控可能只在connect时候进行限制。
package main
import (
"log"
)
type ConnLimiter struct {
concurrentConn int
bucket chan int
}
func NewConnLimiter(cc int) *ConnLimiter {
return &ConnLimiter {
concurrentConn: cc,
bucket: make(chan int, cc),
}
}
func (cl *ConnLimiter) GetConn() bool {
if len(cl.bucket) >= cl.concurrentConn {
log.Printf("Reached the rate limitation.")
return false
}
cl.bucket <- 1
return true
}
func (cl *ConnLimiter) ReleaseConn() {
c :=<- cl.bucket
log.Printf("New connction coming: %d", c)
}
加了流控后,我们需要在http middleware中嵌入流控,同样,我们在启动时,都需要注册router以及http server,所以代码如下:
package main
import (
"net/http"
"github.com/julienschmidt/httprouter"
)
type middleWareHandler struct {
r *httprouter.Router
l *ConnLimiter
}
func NewMiddleWareHandler(r *httprouter.Router, cc int) http.Handler {
m := middleWareHandler{}
m.r = r
m.l = NewConnLimiter(cc)
return m
}
func RegisterHandlers() *httprouter.Router {
router := httprouter.New()
router.GET("/videos/:vid-id", streamHandler)
router.POST("/upload/:vid-id", uploadHandler)
router.GET("/testpage", testPageHandler)
return router
}
func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !m.l.GetConn() {
sendErrorResponse(w, http.StatusTooManyRequests, "Too many requests")
return
}
m.r.ServeHTTP(w, r)
defer m.l.ReleaseConn()
}
func main() {
r := RegisterHandlers()
mh := NewMiddleWareHandler(r, 2)
http.ListenAndServe(":2000", mh)
}
最后,我们来看看streamHandler如何来处理:
func streamHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
vid := p.ByName("vid-id")
vl := VIDEO_DIR + vid
video, err := os.Open(vl)
if err != nil {
log.Printf("Error when try to open file: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
w.Header().Set("Content-Type", "video/mp4")
http.ServeContent(w, r, "", time.Now(), video)
defer video.Close()
}
我们这里采用比较通用的做法:在拿到流唯一信息后,直接处理。
Upload files时,我们需要做静态检查,然后把数据从中读取:
func uploadHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
r.Body = http.MaxBytesReader(w, r.Body, MAX_UPLOAD_SIZE)
if err := r.ParseMultipartForm(MAX_UPLOAD_SIZE); err != nil {
sendErrorResponse(w, http.StatusBadRequest, "File is too big")
return
}
file, _, err := r.FormFile("file")
if err != nil {
log.Printf("Error when try to get file: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
data, err := ioutil.ReadAll(file)
if err != nil {
log.Printf("Read file error: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
}
fn := p.ByName("vid-id")
err = ioutil.WriteFile(VIDEO_DIR + fn, data, 0666)
if err != nil {
log.Printf("Write file error: %v", err)
sendErrorResponse(w, http.StatusInternalServerError, "Internal Error")
return
}
w.WriteHeader(http.StatusCreated)
io.WriteString(w, "Uploaded successfully")
}
网站部署
先想给之前的代码进行编译打包:
FROM ubuntu:16.04 as build
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN apt-get update && apt-get install -y --no-install-recommends \
g++ \
ca-certificates \
wget && \
rm -rf /var/lib/apt/lists/*
ENV GOLANG_VERSION 1.15.1
RUN wget -nv -O - https://studygolang.com/dl/golang/go1.15.1.linux-amd64.tar.gz \
| tar -C /usr/local -xz
ENV GOPROXY=https://goproxy.cn,direct
ENV GO111MODULE=on
ENV GOPATH /go
ENV PATH $GOPATH/bin:/usr/local/go/bin:$PATH
WORKDIR /go/src
COPY . .
WORKDIR /go/src/video-service
RUN sed -i "/runmode/crunmode=pro" /go/src/video-service/conf/app.conf
RUN export CGO_LDFLAGS_ALLOW='-Wl,--unresolved-symbols=ignore-in-object-files' && \
go install -ldflags="-s -w" -v /go/src/video-service
FROM ubuntu:16.04
WORKDIR /video-service
RUN mkdir -p log
COPY --from=build /go/bin/video-service /video-service
CMD ["./video-service"]
接下来,补充部署脚本:
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
labels:
app: video-service
name: video-service
namespace: system-server
spec:
replicas: 1
selector:
matchLabels:
app: video-service
template:
metadata:
labels:
app: video-service
spec:
containers:
- image: {{ cluster_cfg['cluster']['docker-registry']['prefix'] }}video-service
imagePullPolicy: Always
name: video-service
ports:
- containerPort: 1000
#livenessProbe:
#httpGet:
#path: /api/v1/healthz
#port: 1000
#scheme: HTTP
#initialDelaySeconds: 15
#periodSeconds: 10
#timeoutSeconds: 3
#failureThreshold: 5
volumeMounts:
- name: video-service-config
mountPath: /video-service/conf
volumes:
- name: video-service-config
configMap:
name: video-service-config
nodeSelector:
video-service: "true"
restartPolicy: Always
执行编译命令:
sh build/build.sh
kubectl create -f deploy.yml
这里使用K8s部署到机器上。
部署后的服务访问地址:
10.11.3.4:1000