Golang微服务框架kratos实现SignalR服务

简介: SignalR 在可用的情况下使用新的 WebSocket 传输,并在必要时回退到旧传输。 虽然当然可以直接使用 WebSocket 编写应用,但使用 SignalR 意味着需要实现的许多额外功能已经为你完成。 最重要的是,这意味着你可以编写应用代码以利用 WebSocket,而无需担心为旧客户端创建单独的代码路径。 SignalR 还可以避免担心 WebSocket 的更新,因为 SignalR 已更新以支持基础传输中的更改,从而为应用程序提供跨 WebSocket 版本的一致接口。

Golang微服务框架kratos实现SignalR服务

基于 SignalR 可以实现客户端和服务器之间进行即时通信。

适合 SignalR 的应用场景:

需要从服务器进行高频率更新的应用。 示例包括游戏、社交网络、投票、拍卖、地图和 GPS 应用。
仪表板和监视应用。
协作应用。 协作应用的示例包括白板应用和团队会议软件。
需要通知的应用。 社交网络、电子邮件、聊天、游戏、旅行警报和很多其他应用都需使用通知。

SignalR 自动选择服务器和客户端能力范围内的最佳传输方法,如WebSockets、Server-Sent Events、长轮询。Hub 是一种高级管道,允许客户端和服务器相互调用方法。 SignalR 自动处理跨计算机边界的调度,并允许客户端调用服务器上的方法,反之亦然。SignalR 提供两个内置协议:基于 JSON 的文本协议和基于 MessagePack 的二进制协议。

什么是 SignalR?

ASP.NET SignalR 是一个面向 ASP.NET 开发人员的库,可简化向应用程序添加实时 Web 功能的过程。 实时 Web 功能是让服务器代码在可用时立即将内容推送到连接的客户端,而不是让服务器等待客户端请求新数据。

SignalR 可用于向 ASP.NET 应用程序添加任何类型的“实时”Web 功能。 虽然聊天通常用作示例,但你可以执行更多操作。 每当用户刷新网页以查看新数据,或页面实现 长时间轮询 以检索新数据时,它都是使用 SignalR 的候选项。 示例包括仪表板和监视应用程序、协作应用程序 (,例如同时编辑文档) 、作业进度更新和实时表单。

SignalR 还支持需要服务器进行高频率更新的全新 Web 应用程序类型,例如实时游戏。

SignalR 提供了一个简单的 API,用于创建服务器到客户端远程过程调用, (RPC) 调用客户端浏览器 (和其他客户端平台中的 JavaScript 函数,) 从服务器端 .NET 代码。 SignalR 还包括用于连接管理的 API (例如,连接和断开连接事件) ,以及分组连接。

SignalR 和 WebSocket

SignalR 在可用的情况下使用新的 WebSocket 传输,并在必要时回退到旧传输。 虽然当然可以直接使用 WebSocket 编写应用,但使用 SignalR 意味着需要实现的许多额外功能已经为你完成。 最重要的是,这意味着你可以编写应用代码以利用 WebSocket,而无需担心为旧客户端创建单独的代码路径。 SignalR 还可以避免担心 WebSocket 的更新,因为 SignalR 已更新以支持基础传输中的更改,从而为应用程序提供跨 WebSocket 版本的一致接口。

Kratos服务端

首先安装库:

go get -u github.com/tx7do/kratos-transport/transport/signalr

然后实现一个简单的服务端:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"

    signalR "github.com/philippseith/signalr"
    "github.com/tx7do/kratos-transport/transport/signalr"
)

type chat struct {
   
    signalR.Hub
}

func (c *chat) OnConnected(connectionID string) {
   
    fmt.Printf("%s connected\n", connectionID)
    c.Groups().AddToGroup("group", connectionID)
}

func (c *chat) OnDisconnected(connectionID string) {
   
    fmt.Printf("%s disconnected\n", connectionID)
    c.Groups().RemoveFromGroup("group", connectionID)
}

func (c *chat) Broadcast(message string) {
   
    // Broadcast to all clients
    c.Clients().Group("group").Send("receive", message)
}

func (c *chat) Echo(message string) {
   
    c.Clients().Caller().Send("receive", message)
}

func (c *chat) Panic() {
   
    panic("Don't panic!")
}

func (c *chat) RequestAsync(message string) <-chan map[string]string {
   
    r := make(chan map[string]string)
    go func() {
   
        defer close(r)
        time.Sleep(4 * time.Second)
        m := make(map[string]string)
        m["ToUpper"] = strings.ToUpper(message)
        m["ToLower"] = strings.ToLower(message)
        m["len"] = fmt.Sprint(len(message))
        r <- m
    }()
    return r
}

func (c *chat) RequestTuple(message string) (string, string, int) {
   
    return strings.ToUpper(message), strings.ToLower(message), len(message)
}

func (c *chat) DateStream() <-chan string {
   
    r := make(chan string)
    go func() {
   
        defer close(r)
        for i := 0; i < 50; i++ {
   
            r <- fmt.Sprint(time.Now().Clock())
            time.Sleep(time.Second)
        }
    }()
    return r
}

func (c *chat) UploadStream(upload1 <-chan int, factor float64, upload2 <-chan float64) {
   
    ok1 := true
    ok2 := true
    u1 := 0
    u2 := 0.0
    c.Echo(fmt.Sprintf("f: %v", factor))
    for {
   
        select {
   
        case u1, ok1 = <-upload1:
            if ok1 {
   
                c.Echo(fmt.Sprintf("u1: %v", u1))
            } else if !ok2 {
   
                c.Echo("Finished")
                return
            }
        case u2, ok2 = <-upload2:
            if ok2 {
   
                c.Echo(fmt.Sprintf("u2: %v", u2))
            } else if !ok1 {
   
                c.Echo("Finished")
                return
            }
        }
    }
}

func (c *chat) Abort() {
   
    fmt.Println("Abort")
    c.Hub.Abort()
}

func main() {
   
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

    ctx := context.Background()

    srv := signalr.NewServer(
        signalr.WithAddress(":8100"),
        signalr.WithCodec("json"),
        signalr.WithHub(&chat{
   }),
    )

    srv.MapHTTP("/chat")

    if err := srv.Start(ctx); err != nil {
   
        panic(err)
    }

    defer func() {
   
        if err := srv.Stop(ctx); err != nil {
   
            t.Errorf("expected nil got %v", err)
        }
    }()

    <-interrupt
}

JS客户端

<!DOCTYPE html>
<html lang="en">

<head>
    <title>SignalR Client</title>
    <meta charset='utf-8'>
    <meta http-equiv='X-UA-Compatible' content='IE=edge'>
    <meta name='viewport' content='width=device-width, initial-scale=1'>
</head>

<body>
<input type="text" id="message"/>
<input type="button" value="Broadcast" id="broadcast"/>
<input type="button" value="Echo" id="echo"/>
<input type="button" value="Panic" id="panic"/>
<input type="button" value="RequestTuple" id="requesttuple"/>
<input type="button" value="RequestAsync" id="requestasync"/>
<input type="button" value="Stream" id="stream"/>
<input type="button" value="Stop Stream" id="stopstream"/>
<input type="button" value="Upstream" id="upstream"/>
<input type="button" value="Stop/Start Client Connection" id="stop"/>
<input type="button" value="Abort Server Connection" id="abort"/>
<ul id="messages">
</ul>

<script src="https://cdnjs.cloudflare.com/ajax/libs/microsoft-signalr/7.0.7/signalr.js"></script>

<script>
    let subscription;
    const connection = new signalR.HubConnectionBuilder()
        .withUrl('http://localhost:8100/chat')
        .withAutomaticReconnect()
        .build();

    document.getElementById('broadcast').addEventListener('click', () => {
    
        let val = document.getElementById('message').value;
        if (val) {
    
            connection.invoke('broadcast', val);
        }
    });
    document.getElementById('echo').addEventListener('click', () => {
    
        let val = document.getElementById('message').value;
        if (val) {
    
            connection.invoke('echo', val);
        }
    });
    document.getElementById('panic').addEventListener('click', () => {
    
        connection.invoke('panic').catch((err) => {
    
            let li = document.createElement('li');
            li.innerText = err;
            document.getElementById('messages').appendChild(li);
        });
    });

    document.getElementById('requestasync').addEventListener('click', () => {
    
        let val = document.getElementById('message').value;
        if (val) {
    
            connection.invoke('requestasync', val).then(val => {
    
                let li = document.createElement('li');
                li.innerText = 'received finally ' + JSON.stringify(val);
                document.getElementById('messages').appendChild(li);
            })
        }
    });

    document.getElementById('requesttuple').addEventListener('click', () => {
    
        let val = document.getElementById('message').value;
        if (val) {
    
            connection.invoke('requesttuple', val).then(val => {
    
                let li = document.createElement('li');
                li.innerText = 'received ' + JSON.stringify(val);
                document.getElementById('messages').appendChild(li);
            })
        }
    });
    document.getElementById('stream').addEventListener('click', () => {
    
        subscription = connection.stream('datestream').subscribe({
    
            next: (item) => {
    
                let li = document.createElement('li');
                li.innerText = 'item ' + item;
                document.getElementById('messages').appendChild(li);
            },
            complete: () => {
    
                let li = document.createElement('li');
                li.innerText = 'complete';
                document.getElementById('messages').appendChild(li);
            }
        })
    });
    document.getElementById('stopstream').addEventListener('click', () => {
    
        if (subscription) {
    
            subscription.dispose()
        }
    });
    document.getElementById('upstream').addEventListener('click', () => {
    
        const subject1 = new signalR.Subject();
        const subject2 = new signalR.Subject();
        connection.send("uploadstream", subject1, 3, subject2);
        let iteration1 = 0;
        const intervalHandle1 = setInterval(() => {
    
            iteration1++;
            subject1.next(iteration1);
            if (iteration1 === 5) {
    
                clearInterval(intervalHandle1);
                subject1.complete();
            }
        }, 500);
        let iteration2 = 0;
        const intervalHandle2 = setInterval(() => {
    
            iteration2++;
            subject2.next(iteration2);
            if (iteration2 === 10) {
    
                clearInterval(intervalHandle2);
                subject2.complete();
            }
        }, 100);
    });
    document.getElementById('stop').addEventListener('click', () => {
    
        connection.stop().then(() => {
    
            connection.start();
        });
    });
    document.getElementById('abort').addEventListener('click', () => {
    
        connection.send('abort')
    });

    connection.on('receive', message => {
    
        let li = document.createElement('li');
        li.innerText = 'sent ' + message;
        document.getElementById('messages').appendChild(li);
    });

    connection.onclose(error => {
    
        console.assert(connection.state === signalR.HubConnectionState.Disconnected);
        console.log('Connection closed due to error. Try refreshing this page to restart the connection', error);
    });
    connection.onreconnecting(error => {
    
        console.assert(connection.state === signalR.HubConnectionState.Reconnecting);
        console.log('Connection lost due to error. Reconnecting.', error);
    });
    connection.onreconnected(connectionId => {
    
        console.assert(connection.state === signalR.HubConnectionState.Connected);
        console.log('Connection reestablished. Connected with connectionId', connectionId);
    });

    async function start() {
    
        try {
    
            await connection.start();
            console.assert(connection.state === signalR.HubConnectionState.Connected);
            console.log('SignalR connection established');
        } catch (err) {
    
            console.assert(connection.state === signalR.HubConnectionState.Disconnected);
            console.error('SignalR Connection Error: ', err);
            setTimeout(() => this.start(), 5000);
        }
    }

    start();

</script>
</body>

</html>

参考资料 (Reference)

目录
相关文章
|
9天前
|
Java API 微服务
【Spring Boot系列】通过OpenAPI规范构建微服务服务接口
【4月更文挑战第5天】通过OpenAPI接口构建Spring Boot服务RestAPI接口
|
1月前
|
监控 网络协议 Go
应用监控 eBPF 版:实现 Golang 微服务的无侵入应用监控
应用监控 eBPF 版:实现 Golang 微服务的无侵入应用监控
109647 118
|
2月前
|
传感器 监控 物联网
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
201 3
|
1月前
|
监控 持续交付 开发者
深入探讨后端服务的微服务架构设计与实践
【2月更文挑战第10天】随着互联网应用的不断发展,微服务架构作为一种灵活、高效的解决方案在后端服务开发中备受关注。本文将深入探讨微服务架构设计与实践,从服务拆分、通信机制到部署管理等方面进行详细剖析,旨在帮助开发者更好地理解和运用微服务架构。
|
2月前
|
网络协议 Go 数据安全/隐私保护
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
253 2
|
1月前
|
SpringCloudAlibaba Java 网络架构
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
105 0
|
3天前
|
Java 数据安全/隐私保护 Sentinel
微服务学习 | Spring Cloud 中使用 Sentinel 实现服务限流
微服务学习 | Spring Cloud 中使用 Sentinel 实现服务限流
|
16天前
|
监控 Sentinel 微服务
微服务的防御之道:服务雪崩、服务熔断、服务降级
微服务的防御之道:服务雪崩、服务熔断、服务降级
21 1
|
30天前
|
Java fastjson 数据安全/隐私保护
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
40 0
|
1月前
|
消息中间件 SpringCloudAlibaba Java
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(八)Config服务配置+bus消息总线+stream消息驱动+Sleuth链路追踪
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(八)Config服务配置+bus消息总线+stream消息驱动+Sleuth链路追踪
785 0

热门文章

最新文章