import { RESULT_CODE_ERROR } from '@/constants/result-constants'; import { OptionsType, WS } from '@/plugins/ws-websocket'; import { onBeforeUnmount, onMounted } from 'vue'; import { eventListParse, eventItemParseAndPush, userActivityReset, } from './useUserActivity'; import { upfTotalFlow, upfTFParse, upfFlowParse, upfTotalFlowReset, } from './useUPFTotalFlow'; import { topologyReset, neStateParse } from './useTopology'; import PQueue from 'p-queue'; /**websocket连接 */ export default function useWS() { const ws = new WS(); const queue = new PQueue({ concurrency: 1, autoStart: true }); /**发消息 */ function wsSend(data: Record) { ws.send(data); } /**接收数据后回调 */ function wsError(ev: any) { // 接收数据后回调 console.error(ev); } /**接收数据后回调 */ function wsMessage(res: Record) { // console.log(res); const { code, requestId, data } = res; if (code === RESULT_CODE_ERROR) { console.warn(res.msg); return; } // 网元状态 if (requestId && requestId.startsWith('neState')) { const neType = requestId.split('_')[1]; neStateParse(neType, data); return; } // 普通信息 switch (requestId) { // AMF_UE会话事件 case '1010': if (Array.isArray(data.rows)) { eventListParse('amf_ue', data); } break; // MME_UE会话事件 case '1011': if (Array.isArray(data.rows)) { eventListParse('mme_ue', data); } break; // IMS_CDR会话事件 case '1005': if (Array.isArray(data.rows)) { eventListParse('ims_cdr', data); } break; //UPF-总流量数 case '1030_0': const v0 = upfTFParse(data); upfTotalFlow.value[0].up = v0.up; upfTotalFlow.value[0].down = v0.down; upfTotalFlow.value[0].requestFlag = false; break; case '1030_7': const v7 = upfTFParse(data); upfTotalFlow.value[1].up = v7.up; upfTotalFlow.value[1].down = v7.down; upfTotalFlow.value[1].requestFlag = false; break; case '1030_30': const v30 = upfTFParse(data); upfTotalFlow.value[2].up = v30.up; upfTotalFlow.value[2].down = v30.down; upfTotalFlow.value[2].requestFlag = false; break; } // 订阅组信息 if (!data?.groupId) { return; } switch (data.groupId) { // kpiEvent 指标UPF case '12_001': if (data.data) { upfFlowParse(data.data); } break; // AMF_UE会话事件 case '1010': if (data.data) { queue.add(() => eventItemParseAndPush('amf_ue', data.data)); } break; // MME_UE会话事件 case '1011': if (data.data) { queue.add(() => eventItemParseAndPush('mme_ue', data.data)); } break; // IMS_CDR会话事件 case '1005': if (data.data) { queue.add(() => eventItemParseAndPush('ims_cdr', data.data)); } break; } } /**UPF-总流量数 发消息*/ function upfTFSend(day: 0 | 7 | 30) { // 请求标记检查避免重复发送 let index = 0; if (day === 0) { index = 0; } else if (day === 7) { index = 1; } else if (day === 30) { index = 2; } if (upfTotalFlow.value[index].requestFlag) { return; } upfTotalFlow.value[index].requestFlag = true; ws.send({ requestId: `1030_${day}`, type: 'upf_tf', data: { neType: 'UPF', neId: '001', day: day, }, }); } /**userActivitySend 用户行为事件基础列表数据 发消息*/ function userActivitySend() { // AMF_UE会话事件 ws.send({ requestId: '1010', type: 'amf_ue', data: { neType: 'AMF', neId: '001', sortField: 'timestamp', sortOrder: 'desc', pageNum: 1, pageSize: 5, }, }); // MME_UE会话事件 ws.send({ requestId: '1011', type: 'mme_ue', data: { neType: 'MME', neId: '001', sortField: 'timestamp', sortOrder: 'desc', pageNum: 1, pageSize: 5, }, }); // IMS_CDR会话事件 ws.send({ requestId: '1005', type: 'ims_cdr', data: { neType: 'IMS', neId: '001', recordType: 'MOC', sortField: 'timestamp', sortOrder: 'desc', pageNum: 1, pageSize: 5, }, }); } onMounted(() => { const options: OptionsType = { url: '/ws', params: { /**订阅通道组 * * 指标UPF (GroupID:12_neId) * AMF_UE会话事件(GroupID:1010) * MME_UE会话事件(GroupID:1011) * IMS_CDR会话事件(GroupID:1005) */ subGroupID: '12_001,1010,1011,1005', }, onmessage: wsMessage, onerror: wsError, }; ws.connect(options); }); onBeforeUnmount(() => { ws.close(); userActivityReset(); upfTotalFlowReset(); topologyReset(); }); return { wsSend, userActivitySend, upfTFSend, }; }