import { RESULT_CODE_ERROR } from '@/constants/result-constants'; import { OptionsType, WS } from '@/plugins/ws-websocket'; import { onBeforeUnmount, ref } from 'vue'; import { eventData, eventListParse, eventItemParseAndPush, userActivityReset, } from './useUserActivity'; import { upfTotalFlow, upfTFParse, upfFlowParse, upfTotalFlowReset, } from './useUPFTotalFlow'; import { topologyReset, neStateParse, neStateRequestMap } from './useTopology'; import PQueue from 'p-queue'; /**UPF-的Id */ export const upfWhoId = ref(''); /**UPF-的RmUid */ export const upfWhoRmUid = ref(''); /**websocket连接 */ export default function useWS() { const ws = new WS(); const queue = new PQueue({ concurrency: 1, autoStart: true }); /**发消息 */ function wsSend(data: Record) { ws.send(data); } /**接收数据后回调 */ function wsMessage(res: Record) { //console.log(res); const { code, requestId, data } = res; if (code === RESULT_CODE_ERROR) { console.warn(res.msg); return; } // 网元状态 if (requestId && requestId.startsWith('neState')) { const neType = requestId.split('_')[1]; neStateParse(neType, data); return; } // 普通信息 switch (requestId) { // AMF_UE会话事件 case 'amf_1010_001': if (Array.isArray(data.rows)) { eventListParse('amf_ue', data); eventData.value.sort((a, b) => b.eTime - a.eTime); } break; // MME_UE会话事件 case 'mme_1011_001': if (Array.isArray(data.rows)) { eventListParse('mme_ue', data); eventData.value.sort((a, b) => b.eTime - a.eTime); } break; // IMS_CDR会话事件 case 'ims_1005_001': if (Array.isArray(data.rows)) { eventListParse('ims_cdr', data); eventData.value.sort((a, b) => b.eTime - a.eTime); } break; //UPF-总流量数 case 'upf_001_0': upfTFParse('0', data); break; case 'upf_001_7': upfTFParse('7', data); break; case 'upf_001_30': upfTFParse('30', data); break; } // 订阅组信息 if (!data?.groupId) { return; } switch (data.groupId) { // kpiEvent 指标UPF case '10_UPF_' + upfWhoId.value: if (data.data) { upfFlowParse(data.data); } break; // AMF_UE会话事件 case '1010_001': if (data.data) { queue.add(() => eventItemParseAndPush('amf_ue', data.data)); } break; // MME_UE会话事件 case '1011_001': if (data.data) { queue.add(() => eventItemParseAndPush('mme_ue', data.data)); } break; // IMS_CDR会话事件 case '1005_001': if (data.data) { queue.add(() => eventItemParseAndPush('ims_cdr', data.data)); } break; } } /**UPF-总流量数 发消息*/ function upfTFSend(day: '0' | '7' | '30') { // 请求标记检查避免重复发送 if (upfTotalFlow.value[day].requestFlag) { return; } upfTotalFlow.value[day].requestFlag = true; ws.send({ requestId: `upf_001_${day}`, type: 'upf_tf', data: { neType: 'UPF', neId: '001', day: Number(day), }, }); } /**userActivitySend 用户行为事件基础列表数据 发消息*/ function userActivitySend() { // AMF_UE会话事件 ws.send({ requestId: 'amf_1010_001', type: 'amf_ue', data: { neType: 'AMF', neId: '001', sortField: 'timestamp', sortOrder: 'desc', pageNum: 1, pageSize: 20, }, }); // MME_UE会话事件 ws.send({ requestId: 'mme_1011_001', type: 'mme_ue', data: { neType: 'MME', neId: '001', sortField: 'timestamp', sortOrder: 'desc', pageNum: 1, pageSize: 20, }, }); // IMS_CDR会话事件 ws.send({ requestId: 'ims_1005_001', type: 'ims_cdr', data: { neType: 'IMS', neId: '001', recordType: 'MOC', sortField: 'timestamp', sortOrder: 'desc', pageNum: 1, pageSize: 20, }, }); } /**重新发送至UPF 10_UPF_neId */ function reSendUPF(neId: string) { upfWhoId.value = neId; //初始时时无需还原全部属性以及关闭 if (ws.state() === WebSocket.OPEN) { ws.close(); // userActivityReset(); upfTotalFlowReset(); neStateRequestMap.value = new Map(); //topologyReset(); } const options: OptionsType = { url: '/ws', params: { /**订阅通道组 * * 指标UPF (GroupID:10_neType_neId) * AMF_UE会话事件(GroupID:1010_neId) * MME_UE会话事件(GroupID:1011_neId) * IMS_CDR会话事件(GroupID:1005_neId) */ subGroupID: '10_UPF_' + neId + ',1010_001,1011_001,1005_001', }, onmessage: wsMessage, onerror: (ev: any) => { console.error(ev); }, }; ws.connect(options); } onBeforeUnmount(() => { ws.close(); userActivityReset(); upfTotalFlowReset(); topologyReset(); upfWhoRmUid.value = ''; }); return { wsSend, userActivitySend, upfTFSend, reSendUPF, }; }