#include "rest_proxy.h" #include "selfcare_proxy.h" #include "log.h" #include "rest_proxy_stat.h" #include "rest_proxy_print.h" extern int get_proxy_run_mode(); extern int rest_query_recharge_card_proc( struct rest_msg_s *rest_ptr); extern int rest_query_tariff_proc(struct rest_msg_s *rest_ptr); extern int rest_update_plan_info_proc(struct rest_msg_s *rest_ptr); extern int rest_update_subs_info_proc(struct rest_msg_s *rest_ptr); extern int encode_rest_query_tariff_res(_rest_msg_s *ptr, u8 *buf); extern int selfcare_udp_send(char *buf, short len); extern int dba_crm_get_vc_validity_days(int face_value); extern int myLOG_D( const char *fmt, ...); #define REST_PROXY_CHECK_REST_APIID(appID, doSomething) \ do{ \ if( (appID) >= MAX_THREADS ) \ { \ doSomething; \ } \ }while(0) s_rest_proxy_env g_rest_proxy_env; int rest_conf_set_emulator_flag(int flag) { g_rest_proxy_env.work_as_proxy_emulator = flag; return flag; } int rest_proxy_thread_assign() { int index = THREAD_INVALID_INDEX; int loop; int loop_time = 1; static int last_start_pos = 0; for(loop = last_start_pos; loop_time < MAX_THREADS; loop++, loop_time++) { if(loop >= MAX_THREADS) { loop = loop%MAX_THREADS; } if(g_rest_proxy_env.threads[loop].used == 0) { g_rest_proxy_env.threads[loop].used = 1; index = loop; last_start_pos = loop; LOG_D("restproxy assign restID:%d\n", loop); break; } } return index; } int rest_proxy_thread_release(int index) { REST_PROXY_CHECK_REST_APIID(index ,return 0); memset(&g_rest_proxy_env.threads[index], 0x00, sizeof(_rest_thread)); LOG_D("restproxy relese restID:%d.\n", index); return 1; } _rest_thread* rest_proxy_thread_get(int index) { REST_PROXY_CHECK_REST_APIID(index, return NULL); return &g_rest_proxy_env.threads[index]; } int rest_proxy_get_server_fd() { return g_rest_proxy_env.cnf.as_server_fd; } void rest_proxy_sendto_pps(int retCode, const _rest_thread *srcRestFul) { int fd; int len = 0; char buffer[MAX_BUFFER]; struct rest_msg_s dstRestProxy; switch(srcRestFul->curAdaptor.urlType) { case URL_FETCH_UPBalance: case URL_FETCH_InfoBalance: case URL_FETCH_WalletBalance: { memset(&dstRestProxy, 0x00, sizeof(dstRestProxy)); dstRestProxy.msg_type = REST_QUERY; dstRestProxy.header.src_ref = srcRestFul->curAdaptor.header.restproxyAppID; dstRestProxy.header.dst_ref = srcRestFul->curAdaptor.header.ppsAppID; switch(retCode) { case HTTP_OK: dstRestProxy.msg.query_res.result = RES_SUCCEED; dstRestProxy.msg.query_res.error_code = 0; dstRestProxy.msg.query_res.optional_flag = srcRestFul->curAdaptor.msg.query_res.optional_flag; dstRestProxy.msg.query_res.balance = srcRestFul->curAdaptor.msg.query_res.balance; dstRestProxy.msg.query_res.mo_expiry = srcRestFul->curAdaptor.msg.query_res.mo_expiry; dstRestProxy.msg.query_res.mt_expiry = srcRestFul->curAdaptor.msg.query_res.mt_expiry; break; case 400: dstRestProxy.msg.query_res.result = RES_FAILED; dstRestProxy.msg.query_res.error_code = ERR_UNKNOWN_SUBS; break; case 401: dstRestProxy.msg.query_res.result = RES_FAILED; dstRestProxy.msg.query_res.error_code = ERR_WRONG_PWD; break; case 403: case 404: case 500: default: dstRestProxy.msg.query_res.result = RES_FAILED; dstRestProxy.msg.query_res.error_code = ERR_TIMEROUT; break; } len = encode_rest_query_res(&dstRestProxy, (u8*)buffer); rest_proxy_stat_count(STAT_QUERY_RES); } break; case URL_RECHAGRGE: { memset(&dstRestProxy, 0x00, sizeof(dstRestProxy)); dstRestProxy.msg_type = REST_TOPUP; dstRestProxy.header.src_ref = srcRestFul->curAdaptor.header.restproxyAppID; dstRestProxy.header.dst_ref = srcRestFul->curAdaptor.header.ppsAppID; switch(retCode) { case HTTP_OK: case 201: dstRestProxy.msg.topup_res.result = RES_SUCCEED; dstRestProxy.msg.topup_res.error_code = 0; dstRestProxy.msg.topup_res.optional_flag = srcRestFul->curAdaptor.msg.topup_res.optional_flag; dstRestProxy.msg.topup_res.balance = srcRestFul->curAdaptor.msg.topup_res.balance; dstRestProxy.msg.topup_res.mo_expiry = srcRestFul->curAdaptor.msg.topup_res.mo_expiry; dstRestProxy.msg.topup_res.mt_expiry = srcRestFul->curAdaptor.msg.topup_res.mt_expiry; break; case 206: case 400: case 404: dstRestProxy.msg.topup_res.result = RES_FAILED; dstRestProxy.msg.topup_res.error_code = ERR_UNKNOWN_SUBS; break; case 401: dstRestProxy.msg.topup_res.result = RES_FAILED; dstRestProxy.msg.topup_res.error_code = ERR_WRONG_PWD; break; default: dstRestProxy.msg.topup_res.result = RES_FAILED; dstRestProxy.msg.topup_res.error_code = ERR_TIMEROUT; break; } len = encode_rest_topup_res(&dstRestProxy, (u8*)buffer); rest_proxy_stat_count(STAT_TOPUP_RES); } break; case URL_TRANS: { memset(&dstRestProxy, 0x00, sizeof(dstRestProxy)); dstRestProxy.msg_type = REST_TRANSFER; dstRestProxy.header.src_ref = srcRestFul->curAdaptor.header.restproxyAppID; dstRestProxy.header.dst_ref = srcRestFul->curAdaptor.header.ppsAppID; switch(retCode) { case HTTP_OK: case 201: dstRestProxy.msg.transfer_res.result = RES_SUCCEED; dstRestProxy.msg.transfer_res.error_code = 0; dstRestProxy.msg.transfer_res.optional_flag = srcRestFul->curAdaptor.msg.transfer_res.optional_flag; break; case 206: case 400: case 404: dstRestProxy.msg.topup_res.result = RES_FAILED; dstRestProxy.msg.topup_res.error_code = ERR_UNKNOWN_SUBS; break; case 401: dstRestProxy.msg.topup_res.result = RES_FAILED; dstRestProxy.msg.topup_res.error_code = ERR_WRONG_PWD; break; default: dstRestProxy.msg.topup_res.result = RES_FAILED; dstRestProxy.msg.topup_res.error_code = ERR_TIMEROUT; break; } len = encode_rest_transfer_res(&dstRestProxy, (u8*)buffer); rest_proxy_stat_count(STAT_TRANSFER_RES); } break; case URL_PPC_OfferGetAllV2_1: break; default: break; } fd = rest_proxy_get_server_fd(); if(len > 0) { udp_send(fd, srcRestFul->pps_ip, srcRestFul->pps_port, buffer, len); rest_proxy_print_msg(&dstRestProxy, 0, "restproxy recfrom restful and sendto pps response."); } rest_proxy_thread_release(srcRestFul->curAdaptor.header.restproxyAppID); } int rest_proxy_handlemsg_from_pps(char *buf, int len, int ip, short port) { struct rest_msg_s rest_msg, *rest_ptr; _rest_thread *thread_ptr; int thread_id = THREAD_INVALID_INDEX; rest_ptr = &rest_msg; if(decode_rest_api_msg((u8*)buf, len, rest_ptr) == 0) { LOG_D("decode err.\n"); return 0; } thread_id = rest_proxy_thread_assign(); REST_PROXY_CHECK_REST_APIID(thread_id, return 0); thread_ptr = rest_proxy_thread_get(thread_id); thread_ptr->restproxyAppID = thread_id; thread_ptr->ppsAppID = rest_ptr->header.src_ref; thread_ptr->pps_ip = ip; thread_ptr->pps_port = port; switch(rest_ptr->msg_type) { case REST_QUERY: { strcpy(thread_ptr->msisdn, rest_ptr->msg.query.msisdn); if(REST_PROXY_SUBSCRIBID_FLAG) { /* 配置为query/topup流程都要先获取subscriber-id */ thread_ptr->state = ST_SEND_QUERY_SUBSID; } else { /* 配置为query/topup流程不用先获取subscriber-id,而是直接query/topup到self-http */ thread_ptr->state = ST_SEND_QUERY; } rest_proxy_stat_count(STAT_QUERY_REQ); } break; case REST_TOPUP: if(get_proxy_run_mode() == 4)//OCS+CRM mode { rest_query_recharge_card_proc( rest_ptr); rest_proxy_thread_release(thread_id); } else { strcpy(thread_ptr->msisdn, rest_ptr->msg.topup.msisdn); /* 注意: alpo_ocs<--->retsproxy<--->pps时,pps过来的topup请求消息是msisdn+usrname+password */ /* 而alpo_ocs的topup请求接口是msisdn+pin,这里password当做pin用 */ strcpy(thread_ptr->pin, rest_ptr->msg.topup.password); if(REST_PROXY_SUBSCRIBID_FLAG) { /* 配置为query/topup流程都要先获取subscriber-id */ thread_ptr->state = ST_SEND_TOPUP_SUBSCRIBID; } else { /* 配置为query/topup流程不用先获取subscriber-id,而是直接query/topup到self-http */ thread_ptr->state = ST_SEND_TOPUP; } rest_proxy_stat_count(STAT_TOPUP_REQ); } break; case REST_TRANSFER: { strcpy(thread_ptr->msisdn_out, rest_ptr->msg.transfer.transferOutMsisdn); strcpy(thread_ptr->msisdn_in, rest_ptr->msg.transfer.transferInMsisdn); thread_ptr->money = rest_ptr->msg.transfer.amount; thread_ptr->state = ST_SEND_TRANS; // ST_SEND_TRANS_SUBSID_SENDER; rest_proxy_stat_count(STAT_TRANSFER_REQ); } break; case REST_QUERY_RECHARGE_CARD_REQ: //rest_query_recharge_card_proc(rest_ptr); rest_proxy_thread_release(thread_id); break; case REST_UPDATE_RECHARGE_CARD_REQ: //rest_update_recharge_card_proc(rest_ptr); rest_proxy_thread_release(thread_id); break; case REST_CRM_QUERY_TARIFF_REQ: rest_query_tariff_proc( rest_ptr); rest_proxy_thread_release(thread_id); break; case REST_CRM_UPDATE_PLAN_INFO_REQ: rest_update_plan_info_proc( rest_ptr); rest_proxy_thread_release(thread_id); break; case REST_CRM_UPDATE_SUBS_REQ: rest_update_subs_info_proc( rest_ptr); rest_proxy_thread_release(thread_id); break; default: break; } thread_ptr->timer = 0; #ifdef TEST_RESTPROXY rest_proxy_print_msg(rest_ptr, 1, "restproxy recfrom pps"); #endif return 1; } void rest_proxy_crm_sendto_pps(struct rest_msg_s *rest_msg_ptr) { int len = 0; char buffer[MAX_BUFFER]; switch(rest_msg_ptr->msg_type ) { case REST_TOPUP: len = encode_rest_topup_res(rest_msg_ptr, (u8*)buffer); rest_proxy_stat_count(STAT_TOPUP_RES); break; case REST_CRM_QUERY_TARIFF_RES: len = encode_rest_query_tariff_res(rest_msg_ptr, (u8*)buffer); break; case REST_SEND_AUTHCODE_REQ: case REST_CRM_RENT_CHARGE: default: len = encode_rest(rest_msg_ptr, (u8 *)buffer); break; } if(len > 0) { selfcare_udp_send(buffer, len); rest_proxy_print_msg(rest_msg_ptr, 0, "restproxy recfrom restful and sendto pps response."); } } int rest_crm_vc_expired(long expired_ts) { struct timeval tv_now; gettimeofday(&tv_now, NULL); return (tv_now.tv_sec > expired_ts ? 1: 0); } extern int dba_verify_vc_with_pwd(const char *vc_pwd, char card_info[][1024], char *enc_password); extern int dba_crm_update_vc_info_as_used_with_enc_pwd(const char *msisdn, const char *enc_vc_pwd, const char *status); extern int dba_crm_insert_recharge_record_in_db(long account_id, const char *msisdn, int pay_amount, int balance); int rest_query_recharge_card_proc( struct rest_msg_s *rest_ptr) { char card_status_of_fresh[]={"20C"}, card_status_of_used[]={"20X"}; char password[128], enc_password[128]; char card_info[6][1024]; int rc = 0; _rest_topup_res *tr_ptr; struct rest_msg_s rest_msg; if(rest_ptr == NULL) return rc; memcpy(&rest_msg, rest_ptr, sizeof(struct rest_msg_s )); rest_msg.msg_type = REST_TOPUP; tr_ptr = &rest_msg.msg.topup_res; strcpy(password, rest_ptr->msg.topup.password); rc = dba_verify_vc_with_pwd(password, card_info, enc_password); if(rc != 0) { if(strcmp(card_info[1], card_status_of_fresh))//used { rc = 0; tr_ptr->error_code = ERR_WRONG_CARD_STATUS; } if(rest_crm_vc_expired(atol(card_info[2]))) { rc = 0; tr_ptr->error_code = ERR_WRONG_CARD_EXPIRED; } } else { tr_ptr->error_code = ERR_WRONG_PWD; } if(rc == 0)//failed { tr_ptr->result = RES_FAILED; } else { int valid_day = 30; int face_amount = atoi(card_info[0]); valid_day = dba_crm_get_vc_validity_days(face_amount); if(valid_day<1) valid_day = 7; tr_ptr->result = RES_SUCCEED; tr_ptr->balance = face_amount; tr_ptr->mo_expiry = valid_day; tr_ptr->mt_expiry = tr_ptr->mo_expiry; dba_crm_update_vc_info_as_used_with_enc_pwd(rest_ptr->msg.topup.msisdn, enc_password, card_status_of_used); dba_crm_insert_recharge_record_in_db(rest_ptr->msg.topup.account_id, rest_ptr->msg.topup.msisdn, face_amount, rest_ptr->msg.topup.balance); } if(1) { u_short tmp_ref = rest_msg.header.src_ref; rest_msg.header.src_ref = rest_msg.header.dst_ref; rest_msg.header.dst_ref = tmp_ref; rest_proxy_crm_sendto_pps(&rest_msg); } return 1; } int rest_update_recharge_card_proc(struct rest_msg_s *rest_ptr) { return 1; } static int test_mode_without_crm=0; static u64 test_total_plan_value=0x100000000, test_used_plan_value=0; void crm_set_test_mode_flag(int flag) { test_mode_without_crm = flag; test_used_plan_value = 0; } int crm_get_test_mode_flag() { return test_mode_without_crm; } int rest_query_tariff_proc_test_mode(struct rest_msg_s *rest_ptr) { char msisdn[128]={""}, called_number[128]={""}; int call_type; _rest_query_tariff_res *qt_ptr; struct rest_msg_s rest_msg; u_short tmp_ref = 0; if(rest_ptr == NULL) return 0; memcpy(&rest_msg, rest_ptr, sizeof(struct rest_msg_s )); qt_ptr = &rest_msg.msg.query_tariff_res; rest_msg.msg_type = REST_CRM_QUERY_TARIFF_RES; strcpy(msisdn, rest_ptr->msg.query_tariff.msisdn); strcpy(called_number, rest_ptr->msg.query_tariff.called_number); call_type = rest_ptr->msg.query_tariff.service_type; if(1) { qt_ptr->result = RES_SUCCEED; strcpy(qt_ptr->prefix, called_number); qt_ptr->discount = 100; qt_ptr->plan_id = 0XCC; qt_ptr->total_plan_value = 0x00;//test_total_plan_value; qt_ptr->used_plan_value = 0x00; //test_used_plan_value; if(call_type == 2) { qt_ptr->unit_charge = 48; qt_ptr->unit_time = 1024000; } else { qt_ptr->unit_charge = 55; qt_ptr->unit_time = 60; } qt_ptr->bundle_plan_id = 0xEE; tmp_ref = rest_msg.header.src_ref; rest_msg.header.src_ref = rest_msg.header.dst_ref; rest_msg.header.dst_ref = tmp_ref; rest_proxy_crm_sendto_pps(&rest_msg); } return 1; } int crm_update_plan_data_info_test_mode(int plan_id, long long volume_used_value, int op_flag) { if(op_flag == 0) test_used_plan_value += volume_used_value; else test_used_plan_value -= volume_used_value; myLOG_D("<%s>: OP=%d, used_change=%lld, total_used=%lld\n", __FUNCTION__, op_flag, volume_used_value, test_used_plan_value); return 1; } extern int dba_query_tariff_record(char *msisdn, int call_type, char *called_number, char tariff[][1024]); int rest_query_tariff_proc(struct rest_msg_s *rest_ptr) { char msisdn[128]={""}, called_number[128]={""}; int call_type; char tariff[20][1024]; /* +------------------+-----------+-------------+-------------+------+--------------+-------------+--------------+------+----------+ | holiday_discount | TARIFF_ID | PRD_INST_ID | OFR_INST_ID | | ratableValue | usedRatable | acctItemType | fee | rateUnit | +------------------+-----------+-------------+-------------+------+--------------+-------------+--------------+------+----------+ | NULL | 10 | 69 | 143 | 580 | 600 | 0 | 900012 | 0 | 60 | +------------------+-----------+-------------+-------------+------+--------------+-------------+--------------+------+----------+ */ int rc = 0; _rest_query_tariff_res *qt_ptr; struct rest_msg_s rest_msg; if(rest_ptr == NULL) return rc; if(crm_get_test_mode_flag()) return rest_query_tariff_proc_test_mode(rest_ptr); memcpy(&rest_msg, rest_ptr, sizeof(struct rest_msg_s )); qt_ptr = &rest_msg.msg.query_tariff_res; rest_msg.msg_type = REST_CRM_QUERY_TARIFF_RES; strcpy(msisdn, rest_ptr->msg.query_tariff.msisdn); strcpy(called_number, rest_ptr->msg.query_tariff.called_number); call_type = rest_ptr->msg.query_tariff.service_type; rc = dba_query_tariff_record(msisdn, call_type, called_number, tariff); if(rc != 0) { qt_ptr->result = RES_SUCCEED; strcpy(qt_ptr->prefix, called_number); if(tariff[0][0] == 0) //NULL { qt_ptr->discount = 100; } else { qt_ptr->discount = atoi(tariff[0]); } if(tariff[3][0] == 0) //NULL { qt_ptr->plan_id = 0; } else { qt_ptr->plan_id = atoi(tariff[3]); } if(tariff[4][0] == 0) //NULL { qt_ptr->total_plan_value = 0; } else { qt_ptr->total_plan_value = atoll(tariff[4]); } if(tariff[5][0] == 0) //NULL { qt_ptr->used_plan_value = 0; } else { qt_ptr->used_plan_value = atoll(tariff[5]); } qt_ptr->unit_charge = atoi(tariff[7]); qt_ptr->unit_time = atoi(tariff[8]); if(tariff[9][0] == 0) //NULL { qt_ptr->bundle_plan_id = 0; } else { qt_ptr->bundle_plan_id = atoi(tariff[9]); } if(call_type == 2 && qt_ptr->total_plan_value == 0 && qt_ptr->unit_charge == 0) { qt_ptr->result = RES_FAILED; qt_ptr->error_code = ERR_NO_PREFIX; } } else { qt_ptr->result = RES_FAILED; qt_ptr->error_code = ERR_NO_PREFIX; } if(1) { u_short tmp_ref = rest_msg.header.src_ref; rest_msg.header.src_ref = rest_msg.header.dst_ref; rest_msg.header.dst_ref = tmp_ref; rest_proxy_crm_sendto_pps(&rest_msg); } return 1; } static int crm_notification_flag = 1; void crm_set_notificaton_flag(int flag) { crm_notification_flag = flag; } extern int dba_update_plan_data_info(int plan_id, long long volume_used_value, int op_flag); extern int dba_update_plan_info(int service_type, int plan_id, long long plan_used_value, int update_expiry_date, long long plan_total_value); extern int crm_pxy_check_plan_record(u32 plan_id); extern int dba_send_plan_notification_info(int account_id, int plan_id, int service_type, int update_expiry_date, int used_rate); extern int crm_pxy_add_plan_record(u32 plan_id); int rest_update_plan_info_proc(struct rest_msg_s *rest_ptr) { int rc; int account_id, plan_id, service_type, bundle_id; long long plan_total_value, plan_used_value, this_time_add_value = 0; int update_expiry_date = 0; int rate_used = 0, old_rate_used=0; int send_notification_flag = 0; int op_flag=0; service_type = rest_ptr->msg.update_plan_info.service_type; account_id = rest_ptr->msg.update_plan_info.account_id; bundle_id = rest_ptr->msg.update_plan_info.bundle_id; plan_id = rest_ptr->msg.update_plan_info.plan_id; plan_total_value = rest_ptr->msg.update_plan_info.plan_total_value; plan_used_value = rest_ptr->msg.update_plan_info.plan_used_value; this_time_add_value = rest_ptr->msg.update_plan_info.this_time_add_value; if((this_time_add_value & 0x8000000000000000) == 0x8000000000000000) op_flag = 1; this_time_add_value &= 0x7FFFFFFFFFFFFFFF; if(plan_total_value<=0) return 0; if((service_type>0) && ((2<<(service_type-1)) & selfcare_config_get()->sms_notify_type) != 0)//(service_type == 2)//data only; 1: voice; 2: data; 3: sms; { rate_used = plan_used_value*100 / plan_total_value; if(rate_used > 75) { old_rate_used = (plan_used_value - this_time_add_value)*100 / plan_total_value; if(old_rate_used <= 75) { send_notification_flag = 1; rate_used = 75; } } } if(plan_used_value>=plan_total_value) { plan_used_value = plan_total_value; update_expiry_date = 1; if(0) send_notification_flag = 1; } if(service_type == 2) { if(crm_get_test_mode_flag()) { rc = crm_update_plan_data_info_test_mode( bundle_id, this_time_add_value, op_flag); } else rc = dba_update_plan_data_info( bundle_id, this_time_add_value, op_flag); } else rc = dba_update_plan_info(service_type, bundle_id, plan_used_value, update_expiry_date, plan_total_value); if(crm_notification_flag) { if(send_notification_flag) { if(crm_pxy_check_plan_record(plan_id) == 0) { //if(service_type == 2) //only for data... dba_send_plan_notification_info(account_id, plan_id, service_type, update_expiry_date, rate_used); crm_pxy_add_plan_record(plan_id); } } } return 1; } /* typedef enum account_status { T_FRESH=0, T_NORMAL, T_SUSPEND, T_BLACKLIST, T_RELEASED, T_OPRTRIAL, T_DISABLE, MAX_USER_TYPE, T_EXTERNAL=10, } _account_status; */ extern int dba_send_expired_notification_info(int account_id, int plan_id, long expired_date); extern int dba_update_user_status_in_crm(int account_id, int account_status); extern int dba_send_update_hlr_tele_service(char *msisdn, int account_status); int rest_update_subs_info_proc(struct rest_msg_s *rest_ptr) { //int rc; char msisdn[32]; int account_id=0, plan_id=0; int expired_notification_flag = 0; int account_status = 0; long expired_date = 0; strcpy(msisdn, rest_ptr->msg.update_subs.msisdn); account_id = rest_ptr->msg.update_subs.account_id; plan_id = rest_ptr->msg.update_subs.basic_plan_id; if((rest_ptr->msg.update_subs.optional_flag & 0x04 ) == 0x04) { expired_notification_flag = 1; expired_date = rest_ptr->msg.update_subs.balance_expiry_date; } if(expired_notification_flag) { dba_send_expired_notification_info(account_id, plan_id, expired_date); } else { account_status = rest_ptr->msg.update_subs.status; dba_update_user_status_in_crm(account_id, account_status); if(0)//OCS will control all service dba_send_update_hlr_tele_service(msisdn, account_status); } return 1; } int rest_crm_send_sms_notification(char sms_data[][1024]) { struct rest_msg_s rest_msg; _rest_send_authcode *rest_sa_ptr=NULL; rest_msg.msg_type = REST_SEND_AUTHCODE_REQ; rest_msg.header.dst_ref = 0xFFFF; rest_msg.header.src_ref = 0xFFFF; rest_sa_ptr = &rest_msg.msg.send_authcode; strcpy(rest_sa_ptr->msisdn, sms_data[1]); strcpy(rest_sa_ptr->sms_content, sms_data[2]); rest_proxy_crm_sendto_pps(&rest_msg); return 1; } int rest_update_ocs_del_result(struct rest_msg_s *rest_ptr) { //should update operation result here.... return 1; } extern int dba_get_user_rent(char *msisdn); int rest_rent_charge_behalf_on_ocs(struct rest_msg_s *rest_ptr) { int rent = 0; int balance = 0; struct rest_msg_s rest_msg; if(rest_ptr == NULL) return 0; rent = dba_get_user_rent(rest_ptr->msg.rent_charge.msisdn); if(rent>0) { balance = rest_ptr->msg.rent_charge.balance; if(rent <= balance) { rest_msg.header.src_ref = 0xffff; rest_msg.header.dst_ref = 0xffff; rest_msg.msg_type = REST_CRM_RENT_CHARGE; strcpy(rest_msg.msg.rent_charge.msisdn, rest_ptr->msg.rent_charge.msisdn); rest_msg.msg.rent_charge.optional_flag = 0x02; rest_msg.msg.rent_charge.rent_charge = rent; } else { rest_msg.header.src_ref = 0xffff; rest_msg.header.dst_ref = 0xffff; rest_msg.msg_type = REST_CRM_UPDATE_SUBS_REQ; strcpy(rest_msg.msg.update_subs.msisdn, rest_ptr->msg.rent_charge.msisdn); rest_msg.msg.update_subs.optional_flag = 0x20; rest_msg.msg.update_subs.vas_cug_status=1;//suspended } rest_proxy_crm_sendto_pps(&rest_msg); } return 1; } /* -------------------------------------------------------------*/ int rest_pps_server_emulator(char *buf, int len, int ip, short port) { int fd, length=0; char buffer[256]; struct rest_msg_s rest_msg, *rest_ptr=NULL, rest_msg_res; rest_ptr = &rest_msg; if(decode_rest_api_msg((u8*)buf, len, rest_ptr) == 0) { printf("%s, decode err.\n", __FUNCTION__); return 0; } memset(&rest_msg_res, 0x00, sizeof(struct rest_msg_s)); if(rest_ptr->msg_type == REST_QUERY) { rest_msg_res.msg_type = REST_QUERY; rest_msg_res.header.src_ref = 1000; rest_msg_res.header.dst_ref = rest_ptr->header.src_ref; rest_msg_res.msg.query_res.optional_flag = 0x1d; rest_msg_res.msg.query_res.result = RES_SUCCEED; rest_msg_res.msg.query_res.balance = rest_ptr->header.src_ref+1000; rest_msg_res.msg.query_res.mo_expiry = 1888335337; rest_msg_res.msg.query_res.mt_expiry = 1888335337; length = encode_rest_query_res(&rest_msg_res, (u8*)buffer); rest_proxy_stat_count(STAT_QUERY_REQ); } else if(rest_ptr->msg_type == REST_TOPUP) { rest_msg_res.msg_type = REST_QUERY; rest_msg_res.header.src_ref = 1000; rest_msg_res.header.dst_ref = rest_ptr->header.src_ref; rest_msg_res.msg.topup_res.optional_flag = 0x1d; rest_msg_res.msg.topup_res.result = RES_SUCCEED; rest_msg_res.msg.topup_res.balance = rest_ptr->header.src_ref+1000; rest_msg_res.msg.topup_res.mo_expiry = 1888335337; rest_msg_res.msg.topup_res.mt_expiry = 1888335337; length = encode_rest_topup_res(&rest_msg_res, (u8*)buffer); rest_proxy_stat_count(STAT_TOPUP_REQ); } fd = rest_proxy_get_server_fd(); if(len > 0) { udp_send(fd, ip, port, buffer, length); } return 1; } int rest_proxy_recfrom_pps() { int loop; char buf[MAX_BUFFER]; int len; int fd; int ip; short port; int b_has_data = 0; fd = rest_proxy_get_server_fd(); if(fd <= 0) { return b_has_data; } for(loop = 0; loop < 10; loop++) { len = udp_recv_with_ip_info(fd, buf, MAX_BUFFER, &ip, &port); if(len > 0) { if(g_rest_proxy_env.work_as_proxy_emulator) { rest_pps_server_emulator(buf, len, ip, port); } else { rest_proxy_handlemsg_from_pps(buf, len, ip, port); b_has_data = 1; } } } return b_has_data; } int rest_proxy_sendto_rest_query_subsid(int thread_id) { _rest_thread *thread_ptr = NULL; REST_PROXY_CHECK_REST_APIID(thread_id, return 0); thread_ptr = rest_proxy_thread_get(thread_id); /* 将pps-udp请求的参数转存到self-http中,并调用curl接口发送http请求到rest-http */ thread_ptr->curAdaptor.urlType = URL_FETCH_WalletBalance_Getsubsid; thread_ptr->curAdaptor.header.ppsAppID = thread_ptr->ppsAppID; thread_ptr->curAdaptor.header.restproxyAppID = thread_ptr->restproxyAppID; strcpy(thread_ptr->curAdaptor.msg.query.msisdn, thread_ptr->msisdn); curl_adaptor_add(&thread_ptr->curAdaptor); rest_proxy_stat_count(STAT_QUERY_REST_SUBSID_REQ); return 1; } int rest_proxy_sendto_rest_query(int thread_id) { _rest_thread *thread_ptr = NULL; REST_PROXY_CHECK_REST_APIID(thread_id, return 0); thread_ptr = rest_proxy_thread_get(thread_id); /* 将pps-udp请求的参数转存到self-http中,并调用curl接口发送http请求到rest-http */ thread_ptr->curAdaptor.urlType = URL_FETCH_WalletBalance; thread_ptr->curAdaptor.header.ppsAppID = thread_ptr->ppsAppID; thread_ptr->curAdaptor.header.restproxyAppID = thread_ptr->restproxyAppID; strcpy(thread_ptr->curAdaptor.msg.query.msisdn, thread_ptr->msisdn); /* thread_ptr->curAdaptor.msg.query.subsid已经在curl-api中收消息的时候填进去了,这里不再赋值 */ curl_adaptor_add(&thread_ptr->curAdaptor); rest_proxy_stat_count(STAT_QUERY_REST_REQ); return 1; } int rest_proxy_sendto_rest_topup_subsid(int thread_id) { _rest_thread *thread_ptr = NULL; REST_PROXY_CHECK_REST_APIID(thread_id, return 0); thread_ptr = rest_proxy_thread_get(thread_id); /* 将pps-udp请求的参数转存到self-http中,并调用curl接口发送http请求到rest-http */ thread_ptr->curAdaptor.urlType = URL_RECHAGRGE_Getsubsid; thread_ptr->curAdaptor.header.ppsAppID = thread_ptr->ppsAppID; thread_ptr->curAdaptor.header.restproxyAppID = thread_ptr->restproxyAppID; strcpy(thread_ptr->curAdaptor.msg.topup.msisdn, thread_ptr->msisdn); strcpy(thread_ptr->curAdaptor.msg.topup.pin, thread_ptr->pin); curl_adaptor_add(&thread_ptr->curAdaptor); rest_proxy_stat_count(STAT_TOPUP_REST_SUBSID_REQ); return 1; } int rest_proxy_sendto_rest_topup(int thread_id) { _rest_thread *thread_ptr = NULL; REST_PROXY_CHECK_REST_APIID(thread_id, return 0); thread_ptr = rest_proxy_thread_get(thread_id); /* 将pps-udp请求的参数转存到self-http中,并调用curl接口发送http请求到rest-http */ thread_ptr->curAdaptor.urlType = URL_RECHAGRGE; thread_ptr->curAdaptor.header.ppsAppID = thread_ptr->ppsAppID; thread_ptr->curAdaptor.header.restproxyAppID = thread_ptr->restproxyAppID; strcpy(thread_ptr->curAdaptor.msg.topup.msisdn, thread_ptr->msisdn); strcpy(thread_ptr->curAdaptor.msg.topup.pin, thread_ptr->pin); /* thread_ptr->curAdaptor.msg.topup.subsid已经在curl-api中收消息的时候填进去了,这里不再赋值 */ curl_adaptor_add(&thread_ptr->curAdaptor); rest_proxy_stat_count(STAT_TOPUP_REST_REQ); return 1; } int rest_proxy_sendto_rest_trans_subsid_sender(int thread_id) { _rest_thread *thread_ptr = NULL; REST_PROXY_CHECK_REST_APIID(thread_id, return 0); thread_ptr = rest_proxy_thread_get(thread_id); /* 将pps-udp请求的参数转存到self-http中,并调用curl接口发送http请求到rest-http */ thread_ptr->curAdaptor.urlType = URL_TRANS_GetsubsidSender; thread_ptr->curAdaptor.header.ppsAppID = thread_ptr->ppsAppID; thread_ptr->curAdaptor.header.restproxyAppID = thread_ptr->restproxyAppID; strcpy(thread_ptr->curAdaptor.msg.transfer.msisdn_sender, thread_ptr->msisdn_out); curl_adaptor_add(&thread_ptr->curAdaptor); rest_proxy_stat_count(STAT_TRANSFER_REST_SUBSID_REQ_SENDER); return 1; } int rest_proxy_sendto_rest_trans_subsid_receiver(int thread_id) { _rest_thread *thread_ptr = NULL; REST_PROXY_CHECK_REST_APIID(thread_id, return 0); thread_ptr = rest_proxy_thread_get(thread_id); /* 将pps-udp请求的参数转存到self-http中,并调用curl接口发送http请求到rest-http */ thread_ptr->curAdaptor.urlType = URL_TRANS_GetsubsidReceiver; thread_ptr->curAdaptor.header.ppsAppID = thread_ptr->ppsAppID; thread_ptr->curAdaptor.header.restproxyAppID = thread_ptr->restproxyAppID; strcpy(thread_ptr->curAdaptor.msg.transfer.msisdn_receiver, thread_ptr->msisdn_in); curl_adaptor_add(&thread_ptr->curAdaptor); rest_proxy_stat_count(STAT_TRANSFER_REST_SUBSID_REQ_RECEIVER); return 1; } int rest_proxy_sendto_rest_trans(int thread_id) { _rest_thread *thread_ptr = NULL; REST_PROXY_CHECK_REST_APIID(thread_id, return 0); thread_ptr = rest_proxy_thread_get(thread_id); /* 将pps-udp请求的参数转存到self-http中,并调用curl接口发送http请求到rest-http */ thread_ptr->curAdaptor.urlType = URL_TRANS; thread_ptr->curAdaptor.header.ppsAppID = thread_ptr->ppsAppID; thread_ptr->curAdaptor.header.restproxyAppID = thread_ptr->restproxyAppID; thread_ptr->curAdaptor.msg.transfer.money = thread_ptr->money; memcpy(thread_ptr->curAdaptor.msg.transfer.msisdn_sender, thread_ptr->msisdn_out, sizeof(thread_ptr->msisdn_out)); memcpy(thread_ptr->curAdaptor.msg.transfer.msisdn_receiver, thread_ptr->msisdn_in, sizeof(thread_ptr->msisdn_in)); curl_adaptor_add(&thread_ptr->curAdaptor); rest_proxy_stat_count(STAT_TRANSFER_REST_REQ); return 1; } int rest_proxy_recfrom_rest(SCurlAdaptor *curAdaptor, int retCode) { int rc = SUCCESS; _rest_thread *thread_ptr = NULL; REST_PROXY_CHECK_REST_APIID(curAdaptor->header.restproxyAppID, goto END); thread_ptr = &g_rest_proxy_env.threads[curAdaptor->header.restproxyAppID]; if(thread_ptr->used == 0) { rc = FAILURE; goto END; } if(thread_ptr->timer++ > WAIT_TIMER) { rc = FAILURE; goto END; } switch(thread_ptr->state) { case ST_WAIT_RESTAPI_QUERY_SUBSID: LOG_D("restproxy recv-from rest.query-subsid restID:%d,subsid[%s]\n", curAdaptor->header.restproxyAppID, curAdaptor->msg.query.subsid); thread_ptr->state = ST_SEND_QUERY; break; case ST_WAIT_RESTAPI_TOPUP_SUBID: LOG_D("restproxy recv-from rest.topup-subsid restID:%d,subsid[%s]\n", curAdaptor->header.restproxyAppID, curAdaptor->msg.topup.subsid); thread_ptr->state = ST_SEND_TOPUP; break; case ST_WAIT_RESTAPI_TRANS_SUBSID_SENDER: LOG_D("restproxy recv-from rest.trans-subsid_sender restID:%d,subsid[%s]\n", curAdaptor->header.restproxyAppID, curAdaptor->msg.transfer.subsid_sender); thread_ptr->state = ST_SEND_TRANS_SUBSID_RECEIVER; break; case ST_WAIT_RESTAPI_TRANS_SUBID_RECEIVER: LOG_D("restproxy recv-from rest.trans-subsid_receiver restID:%d,subsid[%s]\n", curAdaptor->header.restproxyAppID, curAdaptor->msg.transfer.subsid_receiver); thread_ptr->state = ST_SEND_TRANS; break; case ST_WAIT_RESTAPI_TRANS: rest_proxy_sendto_pps(retCode, thread_ptr); break; case ST_SEND_QUERY: case ST_SEND_TOPUP: // rc = FAILURE; break; case ST_WAIT_RESTAPI: rest_proxy_sendto_pps(retCode, thread_ptr); break; } /* 统计rest-http 回复过来的消息 */ switch(curAdaptor->urlType) { case URL_FETCH_WalletBalance_Getsubsid: rest_proxy_stat_count(STAT_QUERY_REST_SUBSID_RES); break; case URL_FETCH_WalletBalance: rest_proxy_stat_count(STAT_QUERY_REST_RES); break; case URL_RECHAGRGE_Getsubsid: rest_proxy_stat_count(STAT_TOPUP_REST_SUBSID_RES); break; case URL_RECHAGRGE: rest_proxy_stat_count(STAT_TOPUP_REST_RES); break; case URL_TRANS_GetsubsidSender: rest_proxy_stat_count(STAT_TRANSFER_REST_SUBSID_RES_SENDER); break; case URL_TRANS_GetsubsidReceiver: rest_proxy_stat_count(STAT_TRANSFER_REST_SUBSID_RES_RECEIVER); break; case URL_TRANS: rest_proxy_stat_count(STAT_TRANSFER_REST_RES); break; default: break; } END: return rc; } int rest_proxy_fsm_main() { register int loop; _rest_thread *thread_ptr = NULL; if(rest_proxy_recfrom_pps() == 0) { if(REST_PROXY_SUBSCRIBID_FLAG == 0) { /* 配置为query/topup流程不用先获取subscriber-id,而是直接query/topup到self-http */ /* 状态机由pps的消息驱动 */ return 0; } } for(loop = 0; loop < MAX_THREADS; loop++) { thread_ptr = &g_rest_proxy_env.threads[loop]; if(thread_ptr->used == 0) continue; if(thread_ptr->timer ++>WAIT_TIMER) { rest_proxy_thread_release(loop); continue; } switch(thread_ptr->state) { case ST_SEND_QUERY_SUBSID: rest_proxy_sendto_rest_query_subsid(loop); LOG_D("restproxy sendto rest.query-getsubsid restID:%d\n", loop); thread_ptr->state = ST_WAIT_RESTAPI_QUERY_SUBSID; break; case ST_SEND_TRANS_SUBSID_SENDER: rest_proxy_sendto_rest_trans_subsid_sender(loop); LOG_D("restproxy sendto rest.trans-getsubsid_sender restID:%d\n", loop); thread_ptr->state = ST_WAIT_RESTAPI_TRANS_SUBSID_SENDER; break; case ST_SEND_TRANS_SUBSID_RECEIVER: rest_proxy_sendto_rest_trans_subsid_receiver(loop); LOG_D("restproxy sendto rest.trans-getsubsidr_receiver restID:%d\n", loop); thread_ptr->state = ST_WAIT_RESTAPI_TRANS_SUBID_RECEIVER; break; case ST_SEND_TRANS: rest_proxy_sendto_rest_trans(loop); LOG_D("restproxy sendto rest.trans restID:%d\n", loop); thread_ptr->state = ST_WAIT_RESTAPI_TRANS; break; break; case ST_SEND_QUERY: rest_proxy_sendto_rest_query(loop); LOG_D("restproxy sendto rest.query restID:%d\n", loop); thread_ptr->state = ST_WAIT_RESTAPI; break; case ST_SEND_TOPUP_SUBSCRIBID: rest_proxy_sendto_rest_topup_subsid(loop); LOG_D("restproxy sendto rest.topup-getsubsid restID:%d\n", loop); thread_ptr->state = ST_WAIT_RESTAPI_TOPUP_SUBID; break; case ST_SEND_TOPUP: LOG_D("restproxy sendto rest.topup restID:%d\n", loop); rest_proxy_sendto_rest_topup(loop); thread_ptr->state = ST_WAIT_RESTAPI; break; case ST_WAIT_RESTAPI: break; case ST_WAIT_RESTAPI_QUERY_SUBSID: break; case ST_WAIT_RESTAPI_TOPUP_SUBID: break; case ST_WAIT_RESTAPI_TRANS_SUBSID_SENDER: break; case ST_WAIT_RESTAPI_TRANS_SUBID_RECEIVER: break; case ST_WAIT_RESTAPI_TRANS: break; } } return 1; } #ifdef TEST_RESTPROXY int g_pps_sendquery_to_restproxy_num = 0; int g_pps_sendtopup_to_restproxy_num = 0; int g_pps_sendtrans_to_restproxy_num = 0; void rest_proxy_simulate_pps(void *arg) { prctl(PR_SET_NAME, "simu_pps"); int fd; char buffer[MAX_BUFFER]; int len; int ip; short port; s_rest_proxy_env *env = (s_rest_proxy_env*)arg; while( (fd = init_socket(0, env->cnf.local_port + 10, 0, 1) ) <= 0 ) { LOG_D("rest_proxy_simulate_pps err. create fd err.\n"); usleep(1000000); } LOG_D("restproxy simulate pps suc.fd:%d\n", fd); while(1) { while(g_pps_sendquery_to_restproxy_num > 0) { struct rest_msg_s query_msg; memset(&query_msg, 0x00, sizeof(query_msg)); query_msg.msg_type = REST_QUERY; query_msg.header.src_ref = 10000 + g_pps_sendquery_to_restproxy_num; query_msg.header.dst_ref = 0; strcpy(query_msg.msg.query.msisdn, g_rest_proxy_env.cnf.msisdn); /* encode query message */ len = encode_rest_query(&query_msg, (u8*)buffer); /* pps send to restproxy */ udp_send(fd, env->cnf.local_ip, env->cnf.local_port, buffer, len); /* print the message */ rest_proxy_print_msg(&query_msg, 1, "pps sendto restproxy.query"); g_pps_sendquery_to_restproxy_num--; } while(g_pps_sendtopup_to_restproxy_num > 0) { struct rest_msg_s topup_msg; memset(&topup_msg, 0x00, sizeof(topup_msg)); topup_msg.msg_type = REST_TOPUP; topup_msg.header.src_ref = 1000 + g_pps_sendtopup_to_restproxy_num; topup_msg.header.dst_ref = 0; strcpy(topup_msg.msg.topup.msisdn, g_rest_proxy_env.cnf.msisdn); strcpy(topup_msg.msg.topup.password, g_rest_proxy_env.cnf.pin); /* encode query message */ len = encode_rest_topup(&topup_msg, (u8*)buffer); /* pps send to rest */ udp_send(fd, env->cnf.local_ip, env->cnf.local_port, buffer, len); /* print the message */ rest_proxy_print_msg(&topup_msg, 1, "pps sendto restproxy.topup"); g_pps_sendtopup_to_restproxy_num--; } while(g_pps_sendtrans_to_restproxy_num > 0) { struct rest_msg_s transfer_msg; memset(&transfer_msg, 0x00, sizeof(transfer_msg)); transfer_msg.msg_type = REST_TRANSFER; transfer_msg.header.src_ref = 10000 + g_pps_sendtrans_to_restproxy_num; transfer_msg.header.dst_ref = 0; strcpy(transfer_msg.msg.transfer.transferOutMsisdn, g_rest_proxy_env.cnf.msisdn); strcpy(transfer_msg.msg.transfer.transferInMsisdn, g_rest_proxy_env.cnf.msisdn_in); transfer_msg.msg.transfer.amount = 100; /* encode query message */ len = encode_rest_transfer(&transfer_msg, (u8*)buffer); /* pps send to restproxy */ udp_send(fd, env->cnf.local_ip, env->cnf.local_port, buffer, len); /* print the message */ rest_proxy_print_msg(&transfer_msg, 1, "pps sendto restproxy.trans"); g_pps_sendtrans_to_restproxy_num--; } while( (len = udp_recv_with_ip_info(fd, buffer, MAX_BUFFER, &ip, &port) ) > 0) { struct rest_msg_s response_msg; memset(&response_msg, 0x00, sizeof(response_msg)); /* pps decode response message from restproxy */ if(decode_rest_api_msg((u8*)buffer, len, &response_msg) == 0) { continue; } /* we just print the message */ rest_proxy_print_msg(&response_msg, 0, "pps recfrom restproxy"); } usleep(5000); } } #endif int rest_proxy_initudp(s_rest_proxy_env* rest_proxy_env) { int fd; fd = init_socket(rest_proxy_env->cnf.local_ip, rest_proxy_env->cnf.local_port, 0, 1); if(fd <= 0) { fprintf(stderr, "restproxy init err. localIPPort[%u:%d]\n", rest_proxy_env->cnf.local_ip, rest_proxy_env->cnf.local_port); exit(-1); } g_rest_proxy_env.cnf.as_server_fd = fd; return 1; } int rest_proxy_init() { pthread_t tid; rest_proxy_load_cnf(&(g_rest_proxy_env.cnf)); rest_proxy_initudp(&g_rest_proxy_env); curl_adaptor_init(rest_proxy_recfrom_rest, g_rest_proxy_env.cnf.query_ip_str, g_rest_proxy_env.cnf.query_port, g_rest_proxy_env.cnf.recharge_ip_str, g_rest_proxy_env.cnf.recharge_port); // pthread_create(&tid, NULL, (void *)debug_monitor, NULL); #ifdef TEST_RESTPROXY pthread_create(&tid, NULL, (void *)rest_proxy_simulate_pps, &g_rest_proxy_env); #endif return SUCCESS; } void rest_proxy_uninit() { }