用Jetty 9.1运行Java WebSockets微服务

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介:   Jetty 9.1的发布将Java WebSockets (JSR-356) 带入了非Java EE环境,从而开启了微服务时代。我们可以将Jetty的容器包含在java应用程序中(注意,不是Java代码运行在容器中,而是相反),这种微服务轻量概念开始得到提倡推广,为模块化开启新的探索方向。

  Jetty 9.1的发布将Java WebSockets (JSR-356) 带入了非Java EE环境,从而开启了微服务时代。我们可以将Jetty的容器包含在java应用程序中(注意,不是Java代码运行在容器中,而是相反),这种微服务轻量概念开始得到提倡推广,为模块化开启新的探索方向。

  该案例目标是要建设一个从客户端程序接受消息并广播到当前连接的所有其他客户端WebSocket服务器。假设有一个消息模型:

package com.example.services;

public class Message {
    private String username;
    private String message;

    public Message() {
    }

    public Message( final String username, final String message ) {
        this.username = username;
        this.message = message;
    }

    public String getMessage() {
        return message;
    }

    public String getUsername() {
        return username;
    }

    public void setMessage( final String message ) {
        this.message = message;
    }

    public void setUsername( final String username ) {
        this.username = username;
    }
}

为了分离服务器端和客户端,JSR-356规定了两个元注解@ServerEndpoint 和@ClientEndpoit

客户端代码:

@ClientEndpoint
public class BroadcastClientEndpoint {
    private static final Logger log = Logger.getLogger( 
        BroadcastClientEndpoint.class.getName() );

    @OnOpen
    public void onOpen( final Session session ) throws IOException, EncodeException  {
        session.getBasicRemote().sendObject( new Message( "Client", "Hello!" ) );
    }

    @OnMessage
    public void onMessage( final Message message ) {
        log.info( String.format( "Received message '%s' from '%s'",
            message.getMessage(), message.getUsername() ) );
    }
}

@OnOpen 是当客户端连接到服务器开始调用,@OnMessage是每次服务器向客户端发送消息时调用。

消息传递使用Json,这里使用JSR-353规范的Json类将对象进行序列化。我们需要在Message里面加上一下Json反序列化,也就是将Json转为Message对象:

public class Message {
    public static class MessageDecoder implements Decoder.Text< Message > {
        private JsonReaderFactory factory = Json.createReaderFactory( Collections.< String, Object >emptyMap() );

        @Override
        public void init( final EndpointConfig config ) {
        }

        @Override
        public Message decode( final String str ) throws DecodeException {
            final Message message = new Message();

            try( final JsonReader reader = factory.createReader( new StringReader( str ) ) ) {
                final JsonObject json = reader.readObject();
                message.setUsername( json.getString( "username" ) );
                message.setMessage( json.getString( "message" ) );
            }

            return message;
        }

        @Override
        public boolean willDecode( final String str ) {
            return true;
        }

        @Override
        public void destroy() {
        }
    }
}

我们需要告诉客户端,我们有一个Json编码器和解码器,在BroadcastClientEndpoint类上加入:

@ClientEndpoint( encoders = { MessageEncoder.class }, decoders = { MessageDecoder.class } )
public class BroadcastClientEndpoint {
}

下面是调用运行代码:

public class ClientStarter {
    public static void main( final String[] args ) throws Exception {
        final String client = UUID.randomUUID().toString().substring( 0, 8 );

        final WebSocketContainer container = ContainerProvider.getWebSocketContainer();    
        final String uri = "ws://localhost:8080/broadcast"; 

        try( Session session = container.connectToServer( BroadcastClientEndpoint.class, URI.create( uri ) ) ) {
            for( int i = 1; i <= 10; ++i ) {
                session.getBasicRemote().sendObject( new Message( client, "Message #" + i ) );
                Thread.sleep( 1000 );
            }
        }

        // Application doesn't exit if container's threads are still running
        ( ( ClientContainer )container ).stop();
    }
}

这是连接URL ws://localhost:8080/broadcast,随机挑选一些客户端名称(从UUID),每1秒的延迟产生10条信息,(只是为了确保我们有时间去接收他们都回来了)。

下面是服务器端的代码:

@ServerEndpoint( 
    value = "/broadcast", 
    encoders = { MessageEncoder.class }, 
    decoders = { MessageDecoder.class } 

public class BroadcastServerEndpoint {
    private static final Set< Session > sessions = 
        Collections.synchronizedSet( new HashSet< Session >() );

    @OnOpen
    public void onOpen( final Session session ) {
        sessions.add( session );
    }

    @OnClose
    public void onClose( final Session session ) {
        sessions.remove( session );
    }

    @OnMessage
    public void onMessage( final Message message, final Session client ) 
            throws IOException, EncodeException {
        for( final Session session: sessions ) {
            session.getBasicRemote().sendObject( message );
        }
    }
}

为了使这个服务器端点能够运行,我们将其注册入Jetty服务器,Jetty9.能够在嵌入下运行:

public class ServerStarter  {
    public static void main( String[] args ) throws Exception {
        Server server = new Server( 8080 );

        // Create the 'root' Spring application context
        final ServletHolder servletHolder = new ServletHolder( new DefaultServlet() );
        final ServletContextHandler context = new ServletContextHandler();

        context.setContextPath( "/" );
        context.addServlet( servletHolder, "/*" );
        context.addEventListener( new ContextLoaderListener() );   
        context.setInitParameter( "contextClass", AnnotationConfigWebApplicationContext.class.getName() );
        context.setInitParameter( "contextConfigLocation", AppConfig.class.getName() );

        server.setHandler( context );
        WebSocketServerContainerInitializer.configureContext( context );       

        server.start();
        server.join(); 
    }
}

最重要的是WebSocketServerContainerInitializer.configureContext:,它是创建一个Websockets的容器,目前容器内什么也没有,我们没有注册进入我们的服务器端点。

Spring的AppConfig 能够帮助我们做到这点:

@Configuration
public class AppConfig  {
    @Inject private WebApplicationContext context;
    private ServerContainer container;

    public class SpringServerEndpointConfigurator extends ServerEndpointConfig.Configurator {
        @Override
        public < T > T getEndpointInstance( Class< T > endpointClass ) 
                throws InstantiationException {
            return context.getAutowireCapableBeanFactory().createBean( endpointClass );   
        }
    }

    @Bean
    public ServerEndpointConfig.Configurator configurator() {
        return new SpringServerEndpointConfigurator();
    }

    @PostConstruct
    public void init() throws DeploymentException {
        container = ( ServerContainer )context.getServletContext().
            getAttribute( javax.websocket.server.ServerContainer.class.getName() );

        container.addEndpoint( 
            new AnnotatedServerEndpointConfig( 
                BroadcastServerEndpoint.class, 
                BroadcastServerEndpoint.class.getAnnotation( ServerEndpoint.class )  
            ) {
                @Override
                public Configurator getConfigurator() {
                    return configurator();
                }
            }
        );
    }  
}

容器通过调用构造函数将创建container,然后每一次新的客户端连接创建一个服务器端点的新实例。

我们检索的WebSockets容器的方法是Jetty专用规范:查询来自名为“javax.websocket.server.ServerContainer”上下文的属性。

最后运行:

mvn clean package
java -jar target\jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-server.jar // run server
java -jar target/jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-client.jar // run yet another client

输出结果部分:

Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Hello!' from 'Client'
Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #1' from '392f68ef'
Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #2' from '8e3a869d'
Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #7' from 'ca3a06d0'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #4' from '6cb82119'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #2' from '392f68ef'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #3' from '8e3a869d'
Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #8' from 'ca3a06d0'
Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message 'Message #5' from '6cb82119'
Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage

该项目源码下载: GitHub

目录
相关文章
|
27天前
|
Java
Java关键字 —— super 详细解释!一看就懂 有代码实例运行!
文章详细解释了Java关键字`super`的用途,包括访问父类的成员变量、调用父类的构造方法和方法,并提供了相应的代码实例。
70 5
Java关键字 —— super 详细解释!一看就懂 有代码实例运行!
|
22天前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
15 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
12天前
|
IDE Java 编译器
Java:如何确定编译和运行时类路径是否一致
类路径(Classpath)是JVM用于查找类文件的路径列表,对编译和运行Java程序至关重要。编译时通过`javac -classpath`指定,运行时通过`java -classpath`指定。IDE如Eclipse和IntelliJ IDEA也提供界面管理类路径。确保编译和运行时类路径一致,特别是外部库和项目内部类的路径设置。
|
22天前
|
存储 Java 数据库
使用 AuraDB 免费版构建 Java 微服务
使用 AuraDB 免费版构建 Java 微服务
29 11
|
22天前
|
缓存 Java 数据库连接
使用 NCache 将 Java 微服务扩展到极致性能
使用 NCache 将 Java 微服务扩展到极致性能
23 8
|
27天前
|
Java
Java关键字 —— super 与 this 详细解释!一看就懂 有代码实例运行!
本文介绍了Java中this和super关键字的用法,包括在构造方法中使用this来区分参数和成员变量、使用super调用父类构造方法和方法,以及它们在同一个方法中同时使用的场景。
92 0
Java关键字 —— super 与 this 详细解释!一看就懂 有代码实例运行!
|
27天前
|
Java
Java关键字 —— static 与 final 详细解释!一看就懂 有代码实例运行!
这篇文章详细解释了Java中static和final关键字的用法,包括它们修饰类、方法、变量和代码块时的行为,并通过代码示例展示了它们的具体应用。
116 0
Java关键字 —— static 与 final 详细解释!一看就懂 有代码实例运行!
|
26天前
|
Java Maven Spring
用Spring导致的无法运行Java文件的问题的解决方案
本文提供了解决在IntelliJ IDEA社区版中使用Spring Initializr插件创建Spring项目后,Java文件无法运行的问题的方法,主要是通过加载Maven项目来解决。
51 0
|
2月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
2月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1