编程教育资源分享平台

网站首页 > 后端开发 正文

springboot+websocket+vue网站实时在线人数(02) 实现步骤

luoriw 2024-01-31 12:30:35 后端开发 11 ℃ 0 评论

需求

  1. 捕捉用户登录和关闭页签的动作,用于统计实时在线人数
  2. 系统管理员可以查看到当前实时在线人数列表

效果

实时人数监控

实现模型

实现模型


  • 实现步骤

    用户前端登录后发起socket连接

    /** 获取当前用户信息 **/
          getUserInfo () {
            this.$http({
              url: this.$http.adornUrl('/sys/user/info'),
              method: 'get',
              params: this.$http.adornParams()
            }).then(({data}) => {
              if (data && data.code === 0) {
                this.loading = false
                this.userId = data.user.userId
                this.userName = data.user.username
                this.name = data.user.name
                this.$store.commit('user/updateDept',  data.user.dept)
                // 注册websocket事件
                this.startSocketBeats(this.userId,data.user.onlineUrl)
              }
            })
          },
          
          /** 创建websocket连接 **/
          startSocketBeats(token,url){
            if (!window['SocketNotice']) {
            // 创建socket实例
              window['SocketNotice'] = new SocketNotice(url, token, 5)
              // window['SocketNotice'] = new SocketNotice("ws://127.0.0.1:8099/saleapi/notice.ws", '1290904685997350914', 5)
            }
            if (!window['SocketNotice'].connected) {
            // 发起socket连接
              window['SocketNotice'].connect()
            }
          }

    socketNotice.js (webSocket相关工具类代码)

    注意processMessage方法,这里用于区分接收到的消息类型,根据不同类型去处理不同的页面监听器

    import UUID from 'uuid/v4'
    
    const socketType = {
      // 心跳
      HEART: {
        msgType: 98
      }
    }
    
    export default class SocketNotice {
      /**
       * @param url
       * @param userId
       * @param pingIntervalInSeconds 心跳间隔,单位秒
       * @param maxConnectionBrokenTimes      最大的断开链接失败次数,超过就重连
       */
      constructor(url, userId, pingIntervalInSeconds = 5, maxConnectionBrokenTimes = 100) {
        this.socket = null
        this.connected = false// 是否已经联通了服务器
        this.heartHeatTimer = -1// 心跳定时器
        this.maxConnBrokenTimes = maxConnectionBrokenTimes// 默认的判断断线的丢包次数
        this.connBrokenTimes = 0// 默认的判断断线的丢包次数
        this.componentsBinded = new Map() // 存放页面组件的容器
        this.heartBeatData = JSON.stringify(socketType.HEART)
        this.connectUrl = url
        this.pingInterval = pingIntervalInSeconds * 1000
        this.wsTokenId = userId
        this.stopFlag = false// 是否停止socket
        this.defaultEventProcessor = {
          onclose: this.onClose.bind(this),
          onopen: this.onOpen.bind(this),
          onmessage: this.onMessage.bind(this),
          onerror: this.onError.bind(this)
        }
      }
    
      connect() {
        if ('WebSocket' in window) {
          this.socket = new WebSocket(this.connectUrl + '?onlineTokenId=' + this.wsTokenId)
          this.setUpEvents()
          this.connected = true
        } else {
          this.connected = false
          console.log('当前浏览器 不支持 websocket')
        }
      }
    
      isConnected() {
        return this.connected
      }
    
      /***
       * 关闭当前socket
       */
      closeSocket(msg = '未指明', stop = false) {
        this.stopFlag = stop
        this.socket && this.socket.readyState === 1 && (this.socket.close() || (this.socket = null))
        clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)// 清除定时器
        if (stop) {
          console.log(`由于 < ${msg} > 的原因客户端主动关闭远程连接,请联系管理员`)
        }
      }
    
      stop() {
        this.stopFlag = true
      }
    
      /**
       * 注册事件消费者
       * UUID(receiver) 生成一个理论上不重复的128位16进制表示的数字
       * @param receiver
       */
      registerReceiver(receiver, copmName) {
        console.log(receiver, copmName)
        //
        if (!receiver || !UUID(receiver)) {
          throw new Error('Illegal arguments')
        }
        if (this.componentsBinded.has(copmName)) {
          console.error(`key <${copmName}> 组件已经存在! `)
        }
        this.componentsBinded.set(copmName, receiver)
      }
    
      unRegisterReceiver(copmName) {
        copmName && this.componentsBinded.delete(copmName)
      }
    
      // 发送消息
      send(message) {
        if (this.stopFlag) return
        this.socket.send(message)
      }
    
      processMessage(data) {
        // console.log(`接收到 WS 消息... ${JSON.stringify(data)}`)
        if (data.msgType === 1) {
          this.online(data)
        }
      }
    
      online(data) {
        const comp = this.componentsBinded.get('ONLINE')
        if (comp && !comp.compDestory && comp.onSocketMessage) {
          comp.onSocketMessage(data)
        }
      }
    
      beatHeart() {
        if (!this.stopFlag) this.send(this.heartBeatData)
      }
    
      /**
       * 设置心跳时间间隔 同时开启心跳
       * @param onConnectionBroken    处理 无回应的次数超过限制的 函数
       */
      startPing() {
        if (this.heartHeatTimer > -1) {
          clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)
        }
        if (this.pingInterval === -1 || !this.connected) {
          return
        }
        this.heartHeatTimer = setInterval(this.beatHeart.bind(this), this.pingInterval)
        // console.log('开始发送心跳 , 定时器id -->> ' + this.heartHeatTimer)
      }
    
      setUpEvents() {
        Object.assign(this.socket, this.defaultEventProcessor)
      }
    
      // 连接发生错误的回调方法
      onError(error) {
        console.log('WebSocket连接发生错误')
        console.log(error)
      };
    
      // 连接成功建立的回调方法
      onOpen() {
        this.stopFlag = false
        console.log('WebSocket连接成功 ')
        this.startPing()
      }
    
      // 接收到消息的回调方法
      onMessage(event) {
        // console.log(`接收到socket消息 --> ${event.data} `)
        this.processMessage(JSON.parse(event.data))
      }
    
      /**
       * 连接关闭的回调方法
       * 需要在关闭的时候就重连.
       */
      onClose() {
        this.connected = false
        if (this.stopFlag) {
          return
        }
        console.log('WebSocket连接关闭')
    
        // 停止心跳
        clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)
        // 开始重连
        this.socket = null
        this.connBrokenTimes = this.connBrokenTimes + 1
    
        setTimeout(function () {
          console.log('socket断开链接 一秒钟之后,准备重试链接  ,已经尝试重连的次数:  -->> ' + this.connBrokenTimes + ', 最大次数: -> ' + this.maxConnBrokenTimes)
          if (this.connBrokenTimes > this.maxConnBrokenTimes) {
            console.log(`多次链接失败,可能是服务器出现故障`)
            return
          }
          if (this.connect()) {
            this.startPing()
          }
        }.bind(this), 1000)// 重连间隔时间是 一秒
      }
    }

    后端处理用户发起的socket连接

    1. 监听onOpen事件,根据token获取当前登录用户详情
    2. 记录发起连接的是哪个用户,放入缓存
    # 这部分根据具体业务自由发挥
    @Slf4j
    @Component
    @ServerEndpoint("/online.ws")
    public class OnlineWSServer {
        private SysUserService userService;
        private SaleDeptService deptService;
        private DeptUserRelService deptUserRelService;
    
        private static final String ONLINE_TOKEN_ID = "onlineTokenId";
        /**
         * 用来记录当前在线连接数。
         */
        private static AtomicInteger onlineCount = new AtomicInteger(0);
        private static NutMap onlineUserPool = new NutMap();
    
    
        @OnOpen
        public synchronized void onOpen(Session session) {
            this.userService = WsApplicationContextAware.getApplicationContext().getBean(SysUserService.class);
            this.deptService = WsApplicationContextAware.getApplicationContext().getBean(SaleDeptService.class);
            this.deptUserRelService = WsApplicationContextAware.getApplicationContext().getBean(DeptUserRelService.class);
            addOnlineCount();
            addOnlineUser(session);
        }
    
        @OnClose
        public synchronized void onClose(Session session) {
            log.error(JSONUtil.toJsonStr(session));
    
            String userId = getSessionToken(session);
            SysUserEntity user = userService.getById(userId);
            SysUserEntity onlineUser = onlineUserPool.getAs(userId, SysUserEntity.class);
            if (onlineUser != null) {
                AtomicInteger onlineCount = onlineUser.getOnlineCount();
                onlineCount.getAndDecrement();
                if (onlineCount.get() == 0) {
                    onlineUserPool.remove(userId);
                    WebSocketSessionManager.remove(userId, session);
                }
            }
            reduceOnlineCount();
        }
    
        @OnMessage
        public void onMessage(String message, Session session) {
    //        log.info("接收到ws=[{}]的消息:{}", session, message);
            JSONObject jsonObject = JSONObject.parseObject(message);
            int msgType = jsonObject.getIntValue("msgType");
            if (msgType == 98) {
                NutMap pong = new NutMap();
                pong.put("time", new Date());
                pong.put("onlineCount", onlineCount.intValue());
                pong.put("msgType", msgType);
                try {
                    // 发送实时人数到当前登录用户
                    Set<Map.Entry<String, Object>> entries = onlineUserPool.entrySet();
                    List<SysUserEntity> sysUserEntityList = new ArrayList<>();
                    for (Map.Entry<String, Object> entry : entries) {
                        SysUserEntity user = (SysUserEntity) entry.getValue();
                        sysUserEntityList.add(user);
                    }
                    pong.put("userList", sysUserEntityList);
                    pong.put("onlineUserPool", onlineUserPool);
                    pong.put("msgType", 1);
                    session.getBasicRemote().sendText(JSON.toJSONString(pong));
    
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
            }
    
        }
    
        @OnError
        public void onError(Session session, Throwable error) {
            log.error("ws= < " + session.getId() + " > 内部错误", error);
        }
    
        private String getSessionToken(Session session) {
            List<String> id = session.getRequestParameterMap().get(ONLINE_TOKEN_ID);
    
            return id.stream().findFirst().orElse(StrUtil.EMPTY);
        }
    
    
        private String getDeptFullName(String itcode){
            List<SaleDeptEntity> deptList = deptUserRelService.getDeptListByUsernameNotExpired(itcode);
            if (CollectionUtil.isNotEmpty(deptList)){
                List<String> deptNames = deptList.stream().map(SaleDeptEntity::getDeptFullName).filter(Objects::nonNull).collect(Collectors.toList());
                return Joiner.on(",").join(deptNames);
            }else{
                return "";
            }
        }
    
        public void addOnlineUser(Session session) {
            String userId = getSessionToken(session);
            SysUserEntity user = userService.getById(userId);
            if(user==null){
                return;
            }
            String deptFullName = getDeptFullName(user.getUsername());
            if (BeanUtil.isNotEmpty(onlineUserPool.get(userId))) {
                SysUserEntity userEntity = (SysUserEntity) onlineUserPool.get(userId);
                userEntity.setDeptFullNames(deptFullName);
                userEntity.getOnlineCount().incrementAndGet();
                onlineUserPool.put(userId, userEntity);
            } else {
                user.setDeptFullNames(deptFullName);
                user.getOnlineCount().incrementAndGet();
                onlineUserPool.put(userId, user);
            }
        }
    
        /**
         * 原子性操作,在线连接数加一
         */
        public void addOnlineCount() {
            onlineCount.getAndIncrement();
        }
    
        /**
         * 原子性操作,在线连接数减一
         */
        public static void reduceOnlineCount() {
            onlineCount.getAndDecrement();
        }
    
    }

    演示连接效果

    接收到msgType=98的心跳请求,反馈msgType=1的在线用户信息


    下面被划掉的地方其实是不存在的,我测试时多发送了一次


    目前已经能实时地拿到用户信息,但是这些数据目前是没有被任何页面处理的,我们的需求是当管理员打开实时监控页面时,才去处理并展示这些数据,接下来要做的就是,在打开实时监控页面时,把该页面的实例添加到socket容器中,也就是注册,在页面关闭时将该实例在容器中移除.


    前端注册socket监听

    <template>
      <div>
        <el-table
          :data="tableData"
          border
          style="width: 100%">
    
          <el-table-column
            prop=""
            align="center"
            label="当前登录用户"
            width="auto">
    
            <el-table-column
              align="center"
              type="index"
              width="auto">
            </el-table-column>
    
            <el-table-column
              fixed
              align="center"
              prop="username"
              label="ITCODE"
              width="auto">
    
            </el-table-column>
            <el-table-column
              fixed
              align="center"
              prop="name"
              label="姓名"
              width="auto">
    
            </el-table-column>
    
            <el-table-column
              fixed
              align="center"
              prop="deptFullNames"
              label="部门"
              width="auto">
    
            </el-table-column>
    
            <el-table-column
              fixed
              align="center"
              prop="onlineCount"
              label="会话个数"
              width="auto">
    
            </el-table-column>
          </el-table-column>
    
        </el-table>
      </div>
    </template>
    
    <script>
      const COMP_NAME = 'ONLINE'
      export default {
        name: 'index',
        created() {
          this.registerWebSocket(true)
        },
        destroyed() {
          this.registerWebSocket(false)
        },
        data() {
          return {
            tableData: []
          }
        },
        methods: {
          registerWebSocket(open) {
            if (open) {
              if (window['SocketNotice'] && window['SocketNotice'].registerReceiver) {
                window['SocketNotice'].registerReceiver(this, COMP_NAME)
              }
            } else {
              if (window['SocketNotice'] && window['SocketNotice'].unRegisterReceiver) {
                window['SocketNotice'].unRegisterReceiver(COMP_NAME)
              }
            }
          },
          onSocketMessage(data) {
            this.tableData = data.userList;
          }
        }
      }
    </script>
    
    <style scoped>
    </style>
    

    最终效果

    总结

    总的流程大概就是以下几点

    1. 客户端在用户登录后发起websocket连接
    2. 后端监听到有连接加入时就对用户信息进行记录,做相应的处理并进行反馈
    1. 客户端对指定页面和指定的消息类型进行监听和处理

    Tags:

    本文暂时没有评论,来添加一个吧(●'◡'●)

    欢迎 发表评论:

    最近发表
    标签列表
    最新留言