程序员社区

重学Springboot系列之服务器推送技术

重学Springboot系列之服务器推送技术

  • 主流服务器推送技术说明
    • 需求与背景
    • 服务端推送常用技术
      • 全双工通信:WebSocket
      • 服务端主动推送:SSE (Server Send Event)
      • websocket与SSE比较
  • 服务端推送事件SSE
    • 模拟网络支付场景
    • 应用场景
    • sse 规范
    • 模拟实现
      • 服务端实现
      • SseEmitter api介绍
      • 访问测试
    • 对连接超时异常进行全局处理
    • SSE技术推荐参考文章
  • 双向实时通信websocket
    • 整合websocket
    • 兼容HTTPS协议
    • WebSocket编程基础
      • 连接的建立
      • 全双工数据交互
      • 数据发送
        • 浏览器与服务器交换数据
        • 一个用户向其他用户群发
    • websocket实现聊天软件
      • 测试
    • websocke深入学习资料
  • 即时通信 IM
  • Goeasy
  • 详解消息系统
  • 新手入门:史上最全Web端即时通讯技术原理详解

主流服务器推送技术说明

需求与背景

若干年前,所有的请求都是由浏览器端发起,浏览器本身并没有接受请求的能力。所以一些特殊需求都是用ajax轮询的方式来实现的。比如:

  • 股价展示页面实时的获取股价更新
  • 赛事的文字直播,实时更新赛况
  • 通过页面启动一个任务,前端想知道任务后台的实时运行状态

通常的做法就是需要以较小的间隔,频繁的向服务器建立http连接询问任务状态的更新,然后刷新页面显示状态。但这样做的后果就是浪费大量流量,对服务端造成了非常大的压力。


服务端推送常用技术

在html5被广泛推广之后,我们可以使用服务端主动推送数据,浏览器接收数据的方式来解决上面提到的问题。下面我们就为大家介绍两种服务端数据推送技术

全双工通信:WebSocket

全双工的,全双工就是双向通信。如果说http协议是“对讲机”之间的通话(你一句我一句,有来有回),那我们的websocket就是移动电话(可以随时发送信息与接收信息,就是全双工)。

在这里插入图片描述
本质上是一个额外的tcp连接,建立和关闭时握手使用http协议,其他数据传输不使用http协议 ,更加复杂一些,比较适用于需要进行复杂双向实时数据通讯的场景。在web网页上面的客服、聊天室一般都是使用WebSocket 协议来开发的。


服务端主动推送:SSE (Server Send Event)

html5新标准,用来从服务端实时推送数据到浏览器端, 直接建立在当前http连接上,本质上是保持一个http长连接,轻量协议 。客户端发送一个请求到服务端 ,服务端保持这个请求连接直到一个新的消息准备好,将消息返回至客户端。除非主动关闭,这个连接会一直保持。

  • 建立连接
  • 服务端 -> 浏览器(连接保持)
  • 关闭连接

SSE的一大特色就是重复利用1个连接来接收服务端发送的消息(又称event),从而避免不断轮询请求建立连接,造成服务资源紧张。


websocket与SSE比较

在这里插入图片描述
但是IE和Edge浏览器不支持SSE,所以SSE目前的应用场景比较少。 虽然websocket在很多比较旧的版本浏览器上面也不兼容,但是总体上比SSE要好不少。另外还有一些开源的JS前端产品,如 SockJSSocket.IO,在浏览器端提供了更好的websocket前端js编程体验,与浏览器有更好的兼容性。

在这里插入图片描述


服务端推送事件SSE

模拟网络支付场景

大家应该都用过支付系统,比如淘宝买一个产品之后进行扫码支付。我们来看看如果结合SSE,该如何实现这个过程。

在这里插入图片描述

  • 用户扫码向支付系统(支付宝)进行支付
  • 支付完成之后,告知商户系统(淘宝卖家系统)我已经发起支付了(建立SSE连接)
  • 支付系统(支付宝)告诉商户系统(淘宝卖家系统),这个用户确实支付成功了
  • 商户系统(淘宝卖家系统)向用户发送消息:你已经支付成功,跳转到支付成功页面。(通过SSE连接,由服务器端告知用户客户端浏览器)

注意:在返回最终支付结果的操作,实现了服务端向客户端的事件推送,可以使用SSE来实现


应用场景

从 sse 的特点出发,我们可以大致的判断出它的应用场景,需要轮询获取服务端最新数据的 case 下,多半是可以用它的

比如显示当前网站在线的实时人数,法币汇率显示当前实时汇率,电商大促的实时成交额等等…


sse 规范

在 html5 的定义中,服务端 sse,一般需要遵循以下要求

请求头

开启长连接 + 流方式传递

Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive

数据格式

服务端发送的消息,由 message 组成,其格式如下:

field:value\n\n

其中 field 有五种可能

: 即以:开头,表示注释,可以理解为服务端向客户端发送的心跳,确保连接不中断
data:数据
event: 事件,默认值
id: 数据标识符用 id 字段表示,相当于每一条数据的编号
retry: 重连时间

模拟实现

如果下面的代码理解不了的时候,回头看看这张图
在这里插入图片描述

我们写代码来模拟上面时序图中的2、3、4四个步骤的实现。

浏览器前端实现

对于服务器端向浏览器发送的数据,浏览器端需要在 JavaScript 中使用 EventSource 对象来进行处理。EventSource 使用的是标准的事件监听器方式,只需要在对象上添加相应的事件处理方法即可。EventSource 提供了三个标准事件

在这里插入图片描述
除了使用标准的事件处理方法,还可以使用addEventListener 方法对事件进行监听。

var es = new EventSource('事件源名称') ;  //与事件源建立连接
//标准事件处理方法,还有onopen、onerror
es.onmessage = function(e) {
};
//可以监听自定义的事件名称
es.addEventListener('自定义事件名称', function(e) {
});

ssetest.html(商户系统的用户支付页面)

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE</title>
</head>
<body>
<div id = "message">

</div>
<script>
    if (window.EventSource) { //判断浏览器是否支持SSE
        //第2步,主动进行建立长连接,表明用户已经发起支付
        let source = new EventSource(
            'http://localhost/dhy/orderpay?payid=1');
        let innerHTML = '';

        //监听服务器端发来的事件:open
        source.onopen = function(e) {
            console.log("连接建立")
            innerHTML += "onopen:准备就绪,可以开始接收服务器数据" + "<br/>"; //支付结果
            document.getElementById("message").innerHTML = innerHTML;
        };
        //监听服务器端发来的事件:message
        source.onmessage = function(e) {
            console.log("服务器发送的消息为: "+e)
            innerHTML += "onmessage:" + e.data + "<br/>"; //支付结果
            document.getElementById("message").innerHTML = innerHTML;
        };
        //自定义finish事件,主动关闭EventSource
        source.addEventListener('finish', function(e) {
            console.log("服务器发送的事件: "+e)
            source.close();
            innerHTML += "支付结果接收完毕,通知服务端关闭EventSource" +  "<br/>";
            document.getElementById("message").innerHTML = innerHTML;
        }, false);
        //监听服务器端发来的事件:error
        source.onerror = function(e) {
            console.log("服务器出现的异常: "+e)
            if (e.readyState === EventSource.CLOSED) {
                innerHTML += "sse连接已关闭" +  "<br/>";
            } else {
                console.log(e);
            }
        };
    } else {
        console.log("你的浏览器不支持SSE");
    }
</script>

</body>
</html>

服务端实现

Controller代码(商户系统服务端代码)

@RestController
public class SSEControler {
    //建立之后根据订单id,将SseEmitter存到ConcurrentHashMap
    //正常应该存到数据库里面,生成数据库订单,这里我们只是模拟一下
    public static final ConcurrentHashMap<Long, SseEmitter> sseEmitters
            = new ConcurrentHashMap<>();

    //第2步:接受用户建立长连接,表示该用户已支付,已支付就可以生成订单(未确认状态)
    @GetMapping("/orderpay")
    public SseEmitter orderpay(@RequestParam Long payid) throws IOException {
        System.out.println("=======orderpay方法执行========");
        //设置默认的超时时间3秒,超时之后服务端主动关闭连接。
        //超时时间指的是服务器不发送数据给客户端的时间间隔
        SseEmitter emitter = new SseEmitter(3 * 1000L);
        sseEmitters.put(payid,emitter);
        emitter.onTimeout(() -> sseEmitters.remove(payid));
        emitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
        //执行完毕后的回调接口触发
        emitter.onCompletion(() -> System.out.println("完成!!!"));
        return emitter;
    }

    //第3步:接受支付系统的支付结果告知,表明用户支付成功
    @GetMapping("/payback")
    public void payback (@RequestParam Long payid){
        System.out.println("=======payback方法执行========");
        //把SSE连接取出来
        SseEmitter emitter = sseEmitters.get(payid);
        try {
            //第4步:由服务端告知浏览器端:该用户支付成功了
            emitter.send("用户支付成功"); //触发前端message事件。
            //触发前端自定义的finish事件
            emitter.send(SseEmitter.event().name("finish").id("6666").data("哈哈"));
        } catch (IOException e) {
            emitter.completeWithError(e);   //出发前端onerror事件
        }
    }
}

SseEmitter api介绍

  • send(): 发送数据,如果传入的是一个非SseEventBuilder对象,那么传递参数会被封装到 data 中
  • complete(): 表示执行完毕,会断开连接
  • onTimeout(): 超时回调触发
  • onCompletion(): 结束之后的回调触发

访问测试

模拟测试第2步

用户访问 http://localhost:8888/ssetest.html 页面。将自动执行ssetest.html页面的js代码,

let source = new EventSource( 'https://localhost:8888/orderpay?payid=1');

从而模拟用户在浏览器发起支付之后告知“商户系统”:用户已经发起支付。

在这里插入图片描述
模拟测试第3步

用PostMan模拟支付系统(支付宝),向商户系统接口 https://localhost:8888/payback?payid=1 发送请求,模拟“支付系统”向我们自己开发的商户系统请求,告知:该用户支付成功。

在这里插入图片描述

模拟测试第4步

商户系统告知用户所在的浏览器,你支付成功了(服务器数据推送)。自动在浏览器上将“支付成功”的信息打印出来。

在这里插入图片描述
因为是第一次接收服务器端的数据推送,所以打印了图中的第一行文字onopen

因为是接收了服务端的send message,所以打印了图中的第2行文字onmessage

服务端在数据send之后触发了自定义的finish事件,所以打印了图中的第3行文字


对连接超时异常进行全局处理

@ExceptionHandler(AsyncRequestTimeoutException.class)
@ResponseBody
public String handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e) {
    return SseEmitter.event().data("timeout!!").build().stream()
            .map(d -> d.getData().toString())
            .collect(Collectors.joining());
}


SSE技术推荐参考文章

【SringBoot WEB 系列】SSE 服务器发送事件详解

【SpringBoot WEB 系列】SSE 服务器发送事件详解

SSE技术详解:一种全新的HTML5服务器推送事件技术


双向实时通信websocket

整合websocket

<!-- 引入websocket依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

开启websocket功能

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

兼容HTTPS协议

  • WebSocket的ws协议是基于HTTP协议实现的
  • WebSocket的wss协议是基于HTTPS协议实现的

一旦你的项目里面使用了https协议,你的websocket就要使用wss协议才可以。怎么让Spring Boot项目支持WSS协议?

参考我之前的文章 为Web容器配置HTTPS,在那一节TomcatCustomizer 配置的基础之上加上如下的代码,就可以支持wss协议。

@Bean
public TomcatContextCustomizer tomcatContextCustomizer() {

    return new TomcatContextCustomizer() {
        @Override
        public void customize(Context context) {
            context.addServletContainerInitializer(new WsSci(), null);
        }

    };
}

WebSocket编程基础

连接的建立

前端js向后端发送wss连接建立请求

如果使用http协议,改为ws即可

socket = new WebSocket("wss://localhost:8888/ws/asset");

SpringBoot服务端WebSocket服务接收类定义如下:

@Component
@Slf4j
@ServerEndpoint(value = "/ws/asset")
public class WebSocketServer {  

全双工数据交互

前端后端都有

  • onopen事件监听,处理连接建立事件
  • onmessage事件监听,处理对方发过来的消息数据
  • onclose事件监听,处理连接关闭
  • onerror事件监听,处理交互过程中的异常

在这里插入图片描述


数据发送

浏览器与服务器交换数据

在这里插入图片描述
前端JS

socket.send(message);

后端Java,向某一个javax.websocket.Session用户发送消息。

/** 
 * 发送消息,实践表明,每次浏览器刷新,session会发生变化。 
 * @param session  session
 * @param message  消息
 */  
private static void sendMessage(Session session, String message) throws IOException{
    session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)",message,session.getId()));
}  

一个用户向其他用户群发

在这里插入图片描述
服务器向所有在线的javax.websocket.Session用户发送消息。


/** 
 * 群发消息 
 * @param message  消息
 */  
public static void broadCastInfo(String message) throws IOException {
    for (Session session : SessionSet) {  
        if(session.isOpen()){  
            sendMessage(session, message);  
        }  
    }  
}

websocket实现聊天软件

WebSocketServer本节内容的核心代码,websocket服务端代码

  • @ServerEndpoint(value = “/ws/asset”)表示websocket的接口服务地址
  • @OnOpen注解的方法,为连接建立成功时调用的方法
  • @OnClose注解的方法,为连接关闭调用的方法
  • @OnMessage注解的方法,为收到客户端消息后调用的方法
  • @OnError注解的方法,为出现异常时调用的方法
@Component
@Slf4j
@ServerEndpoint(value = "/ws/asset")
public class WebSocketServer {  

    //用来统计连接客户端的数量
    private static final AtomicInteger OnlineCount = new AtomicInteger(0);
    // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。  
    private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<>();
    
    /** 
     * 连接建立成功调用的方法 
     */  
    @OnOpen
    public void onOpen(Session session) throws IOException {
        SessionSet.add(session);   
        int cnt = OnlineCount.incrementAndGet(); // 在线数加1  
        log.info("有连接加入,当前连接数为:{}", cnt);
    }

    /**
     * 收到客户端消息后调用的方法
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
        log.info("来自客户端的消息:{}",message);
        sendMessage(session, "Echo消息内容:"+message);
        // broadCastInfo(message); 群发消息
    }


    /** 
     * 连接关闭调用的方法 
     */  
    @OnClose
    public void onClose(Session session) {  
        SessionSet.remove(session);  
        int cnt = OnlineCount.decrementAndGet();  
        log.info("有连接关闭,当前连接数为:{}", cnt);  
    }  

    /** 
     * 出现错误
     */  
    @OnError
    public void onError(Session session, Throwable error) {  
        log.error("发生错误:{},Session ID: {}",error.getMessage(),session.getId());
    }  
  
    /** 
     * 发送消息,实践表明,每次浏览器刷新,session会发生变化。 
     * @param session  session
     * @param message  消息
     */  
    private static void sendMessage(Session session, String message) throws IOException {

        session.getBasicRemote().sendText(String.format("%s (From Server,Session ID=%s)",message,session.getId()));

    }  
  
    /** 
     * 群发消息 
     * @param message  消息
     */  
    public static void broadCastInfo(String message) throws IOException {
        for (Session session : SessionSet) {  
            if(session.isOpen()){  
                sendMessage(session, message);
            }  
        }  
    }
      
} 

客户端代码,做几次实验,自然明了代码的意思。先不要看代码,先通过浏览器看实验的效果才能更好的理解代码的作用。
public/wstest.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>websocket测试</title>
    <style type="text/css">
        h3,h4{
            text-align:center;
        }
    </style>
</head>
<body>

<h3>请输入要发送给服务器端的消息:</h3><br/>

<label for="text">输入发送信息</label><input id="text" type="text" />
<button onclick="sendToServer()">发送服务器消息</button>
<button onclick="closeWebSocket()">关闭连接</button>
<br>
信息:
<span id="message">

</span>
<script type="text/javascript">
    var socket;
    if (typeof (WebSocket) == "undefined") {
        console.log("遗憾:您的浏览器不支持WebSocket");
    } else {
        socket = new WebSocket("wss://localhost:8888/ws/asset");
        //连接打开事件
        socket.onopen = function() {
            console.log("Socket已打开");
        };
        //收到消息事件
        socket.onmessage = function(msg) {
            document.getElementById('message').innerHTML += msg.data + '<br/>';
        };
        //连接关闭事件
        socket.onclose = function() {
            console.log("Socket已关闭");
        };
        //发生了错误事件
        socket.onerror = function() {
            alert("Socket发生了错误");
        };

        //窗口关闭时,关闭连接
        window.unload=function() {
            socket.close();
        };
    }

    //关闭连接
    function closeWebSocket(){
        socket.close();
    }

    //发送消息给服务器
    function sendToServer(){
        var message = document.getElementById('text').value;
        socket.send(message);
    }
</script>

</body>
</html>

测试

在这里插入图片描述
在这里插入图片描述
连接一旦关闭,再发送消息,也不会生效了

刷新浏览器会导致当前的长连接关闭


websocke深入学习资料

WebSocket从入门到精通,半小时就够!

新手快速入门:WebSocket简明教程

网络编程懒人入门(一):快速理解网络通信协议(上篇)

Springboot +WebSocket学习


即时通信 IM

即时通信 IM


Goeasy

goeasy


详解消息系统

消息系统详解


新手入门:史上最全Web端即时通讯技术原理详解

新手入门:史上最全Web端即时通讯技术原理详解

赞(0) 打赏
未经允许不得转载:IDEA激活码 » 重学Springboot系列之服务器推送技术

相关推荐

  • 暂无文章

一个分享Java & Python知识的社区