#include #include #include "curl_adaptor.h" #include "utils_queue.h" // #include "json_adaptor.h" #include "rest_proxy.h" #include "log.h" SRestFulInfo g_restful; typedef struct _STablCurl { ECurlType curlType; char* curlFormat; }STablCurl; /* curl table */ STablCurl g_tab_url[] = { // {URL_FETCH_SUB, URLFetchSub}, {URL_FETCH_UPBalance, URLFetchUPBalance}, {URL_FETCH_InfoBalance, URLFetchInfoBalance}, {URL_FETCH_WalletBalance_Getsubsid, URLFetchWalletBalanceGetsubsid}, {URL_FETCH_WalletBalance, URLFetchWalletBalance}, {URL_RECHAGRGE_Getsubsid, URLRechargeGetsubsid}, {URL_RECHAGRGE, URLRecharge}, {URL_TRANS_GetsubsidSender, URLTransGetsubidSender}, {URL_TRANS_GetsubsidReceiver, URLTransGetsubidReceiver}, {URL_TRANS, URLTrans}, }; /* curl cache */ static const int CURL_MAX_CONN = 200; static const int CURL_WAIT_TIME_US = 30*1000; static const int CURL_CB_TIME_OUT_MS = 500; static CURL_ADAPTOR_APP_CB g_curl_adaptor_app_cb; static queueADT g_curl_queue; static int g_curl_thread_running; char *get_val_by_name(const char *str, const char *name, char dst_buf[], const int len) { char *p = strstr(str, name); if(p) { char val[2][64] = {{0}}; sscanf(p, "%64[^:]: \"%64[^\"]", val[0], val[1]); snprintf(dst_buf, len, "%s", val[1]); LOG_D("[curl][recv] [%s]=>[%s],[%s]\n", str, val[0], val[1]); return dst_buf; } return NULL; } int curl_adaptor_resolve_cb_walletbalance(const char *str, unsigned int *balance) { char *p_1 = NULL; char *p_2 = NULL; char *p_3 = NULL; if( (p_1 = strstr(str, "balance") ) != NULL && (p_2 = strstr(p_1, ":") ) != NULL && (p_3 = strstr(p_2, "\"") ) != NULL) { char *p_4 = p_3 + 1; char *p = p_4; while(*p_4 != '"') { p_4++; } *p_4 = '\0'; *balance = atof(p) * 100; return 1; } return 0; } int curl_adaptor_resolve_cb_validitydate_mon(const char* str, int *mon) { static char *str_mon[] = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"}; int mon_num = sizeof(str_mon)/sizeof(str_mon[0]); int i = 0; for( ; i < mon_num; ++i) { if(strcmp(str, str_mon[i]) == 0) { *mon = i; return 1; } } return 0; } int curl_adaptor_resolve_cb_validitydate(const char *str, unsigned int *validitydate) { char data[48] = {0}; if(get_val_by_name(str, "validitydate", data, sizeof(data)) == NULL) { return 0; } char str_dd[8] = {0}; char str_mon[8] = {0}; char str_year[8]= {0}; char str_h[8] = {0}; char str_m[8] = {0}; char str_s[8] = {0}; int ret = sscanf(data, "%7[^-]-%7[^-]-%7[^ ] %7[^:]:%7[^:]:%7s", str_dd, str_mon, str_year, str_h, str_m, str_s); if(ret == 6 || ret == 3) { struct tm tm; time_t time; tm.tm_year = atoi(str_year) - 1900; curl_adaptor_resolve_cb_validitydate_mon(str_mon, &tm.tm_mon); tm.tm_mday = atoi(str_dd); tm.tm_hour = atoi(str_h); tm.tm_min = atoi(str_m); tm.tm_sec = atoi(str_s); time = mktime(&tm); *validitydate = time; LOG_D("[curl][recv] [%s-%s-%s %s:%s:%s]=>[%u]\n", str_year, str_mon, str_dd, str_h, str_m, str_s, *validitydate); return 1; } return 0; } /* * function: handle receive http response and resovle the raw data */ static size_t curl_adaptor_resolve_cb(char *d, size_t n, size_t l, void *p) { char *p_find, *p_start; SCurlAdaptor *curAdaptor = (SCurlAdaptor*)p; switch(curAdaptor->urlType) { case URL_FETCH_UPBalance: break; case URL_FETCH_InfoBalance: p_find = strstr(d, ":"); if(p_find == NULL) { curAdaptor->msg.query_res.result = FAILURE; curAdaptor->msg.query_res.error_code = 0; curAdaptor->msg.query_res.optional_flag = 0; break; } p_find = p_start = strstr(p_find, "\""); p_find++; while((*p_find) != '\"') { p_find++; } *p_find = '\0'; curAdaptor->msg.query_res.balance = (unsigned int)(atoi(p_start+1)); curAdaptor->msg.query_res.result = SUCCESS; curAdaptor->msg.query_res.error_code = 0; curAdaptor->msg.query_res.optional_flag = 0; break; case URL_FETCH_WalletBalance_Getsubsid: get_val_by_name(d, "subscriberId", curAdaptor->msg.query.subsid, RESTFUL_SUBSID_LEN); break; case URL_FETCH_WalletBalance: { int ret = 0; if(d != NULL) { char *tmp = (char*)malloc(strlen(d) + 1); // char tmp[512] = {0}; strcpy(tmp, d); ret = curl_adaptor_resolve_cb_walletbalance(tmp, &curAdaptor->msg.query_res.balance); if(ret != 0) { strcpy(tmp, d); ret = curl_adaptor_resolve_cb_validitydate(tmp, &curAdaptor->msg.query_res.mo_expiry); } free(tmp); } if(ret != 0) { curAdaptor->msg.query_res.result = SUCCESS; curAdaptor->msg.query_res.mt_expiry = curAdaptor->msg.query_res.mo_expiry; } else { curAdaptor->msg.query_res.result = FAILURE; } curAdaptor->msg.query_res.error_code = 0; curAdaptor->msg.query_res.optional_flag = 0; } break; case URL_RECHAGRGE_Getsubsid: get_val_by_name(d, "subscriberId", curAdaptor->msg.topup.subsid, RESTFUL_SUBSID_LEN); break; case URL_RECHAGRGE: { curAdaptor->msg.topup_res.result = SUCCESS; curAdaptor->msg.topup_res.error_code = 0; curAdaptor->msg.topup_res.balance = 0; curAdaptor->msg.topup_res.mo_expiry = 0; curAdaptor->msg.topup_res.mt_expiry = 0; curAdaptor->msg.topup_res.optional_flag = 0; } break; case URL_TRANS_GetsubsidSender: get_val_by_name(d, "subscriberId", curAdaptor->msg.transfer.subsid_sender, RESTFUL_SUBSID_LEN); break; case URL_TRANS_GetsubsidReceiver: get_val_by_name(d, "subscriberId", curAdaptor->msg.transfer.subsid_receiver, RESTFUL_SUBSID_LEN); break; case URL_TRANS: { curAdaptor->msg.transfer_res.result = SUCCESS; curAdaptor->msg.transfer_res.error_code = 0; curAdaptor->msg.transfer_res.optional_flag = 0; } break; case URL_PPC_OfferGetAllV2_1: break; default: break; } return n*l; } static void curl_adaptor_easy_init(CURLM *cm, const SCurlAdaptor* curAdaptor) { CURL *eh = curl_easy_init(); char userNamePasswordBuf[64] = {0}; /* 对于query、get-subscriberid来说,都是http-request操作,不用带参数 */ if(curAdaptor->urlType == URL_FETCH_WalletBalance || curAdaptor->urlType == URL_FETCH_WalletBalance_Getsubsid || curAdaptor->urlType == URL_RECHAGRGE_Getsubsid || curAdaptor->urlType == URL_TRANS_GetsubsidSender || curAdaptor->urlType == URL_TRANS_GetsubsidReceiver) { curl_easy_setopt(eh, CURLOPT_WRITEDATA, (void *)(curAdaptor)); curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, curl_adaptor_resolve_cb); // curl_easy_setopt(eh, CURLOPT_HEADER, 0L); curl_easy_setopt(eh, CURLOPT_NOSIGNAL, 1); curl_easy_setopt(eh, CURLOPT_URL, curAdaptor->urlBuf); // 设置basic 向服务器提供验证信息 curl_easy_setopt(eh, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); snprintf(userNamePasswordBuf, sizeof(userNamePasswordBuf), "%s:%s", g_rest_proxy_env.cnf.username, g_rest_proxy_env.cnf.password); curl_easy_setopt(eh, CURLOPT_USERPWD, userNamePasswordBuf); curl_easy_setopt(eh, CURLOPT_PRIVATE, curAdaptor); // curl_easy_setopt(eh, CURLOPT_VERBOSE, 0L); curl_easy_setopt(eh, CURLOPT_TIMEOUT_MS, CURL_CB_TIME_OUT_MS); curl_easy_setopt(eh, CURLOPT_FORBID_REUSE, 1); LOG_D(" urlType:%d, curl:%s\n", curAdaptor->urlType, curAdaptor->urlBuf); } else if(curAdaptor->urlType == URL_RECHAGRGE || curAdaptor->urlType == URL_TRANS) { /* 对于topup来说,topup需要是http-post操作,且需要带额外参数.*/ /* 注意: 这里param是全局变量,多线程可能出错. 后续要分析下安全性. 2019/06/23 22:58:30 */ static char param[512] = {0}; // 设置问非0表示本次操作为post curl_easy_setopt(eh, CURLOPT_POST, 1); // url地址 curl_easy_setopt(eh, CURLOPT_URL,curAdaptor->urlBuf); // 设置basic 向服务器提供验证信息 curl_easy_setopt(eh, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); snprintf(userNamePasswordBuf, sizeof(userNamePasswordBuf), "%s:%s", g_rest_proxy_env.cnf.username, g_rest_proxy_env.cnf.password); curl_easy_setopt(eh, CURLOPT_USERPWD, userNamePasswordBuf); if(curAdaptor->urlType == URL_RECHAGRGE) { if(REST_PROXY_SUBSCRIBID_FLAG) { /* 配置为query/topup流程都要先获取subscriber-id的话, topup的参数格式是这样的: */ /* msisdn=90909093&&pin=39977093792343 */ snprintf(param, sizeof(param), "msisdn=%s&pin=%s", curAdaptor->msg.topup.msisdn, curAdaptor->msg.topup.pin); } else { /* 配置为query/topup流程不用先获取subscriber-id,而是直接query/topup到self-http, topup的参数格式是这样的: */ /* char *post_para = "actor=4559936&pin=57160898969922&customerIp=10.60.1.42&voucherId=&subscriberId=4559936&subsystem=Selfcare&rechargeTransactiontId=1503&msisdn=6924559936" */ snprintf(param, sizeof(param), "actor=%s&pin=%s&customerIp=%s&voucherId=%s&subscriberId=%s&subsystem=%s&msisdn=%s", curAdaptor->msg.topup.msisdn, curAdaptor->msg.topup.pin, g_rest_proxy_env.cnf.customer_ip, "", curAdaptor->msg.topup.msisdn, g_rest_proxy_env.cnf.subsystem, curAdaptor->msg.topup.msisdn); } } else { snprintf(param, sizeof(param), "gifteeSubscriptionIdentifier=%s&&gifterSubscriptionIdentifier=%s&&BankTypeToCredit=WalletBank;;%u&&Comments=Gift WalletBank of $%u from %s to %s", curAdaptor->msg.transfer.msisdn_receiver, curAdaptor->msg.transfer.msisdn_sender, curAdaptor->msg.transfer.money, curAdaptor->msg.transfer.money, curAdaptor->msg.transfer.msisdn_sender, curAdaptor->msg.transfer.msisdn_receiver); } // post参数 curl_easy_setopt(eh, CURLOPT_POSTFIELDS, param); curl_easy_setopt(eh, CURLOPT_POSTFIELDSIZE, strlen(param) ); // 对返回的数据进行操作的函数地址 curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, curl_adaptor_resolve_cb); // 这是write_data的第四个参数值 curl_easy_setopt(eh, CURLOPT_WRITEDATA, curAdaptor); curl_easy_setopt(eh, CURLOPT_PRIVATE, curAdaptor); { struct curl_slist *chunk = NULL; /* Remove a header curl would otherwise add by itself */ chunk = curl_slist_append(chunk, "Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2"); /* set our custom set of headers */ curl_easy_setopt(eh, CURLOPT_HTTPHEADER, chunk); } // 打印调试信息 // curl_easy_setopt(eh, CURLOPT_VERBOSE, 1); // 将响应头信息和相应体一起传给write_data curl_easy_setopt(eh, CURLOPT_HEADER, 1); // 设置为非0,响应头信息location // curl_easy_setopt(eh, CURLOPT_FOLLOWLOCATION, 1); curl_easy_setopt(eh, CURLOPT_FORBID_REUSE, 1); LOG_D(" urlType:%d, curl:%s param:%s\n", curAdaptor->urlType, curAdaptor->urlBuf, param); } curl_multi_add_handle(cm, eh); } int curl_adaptor_init_cb(CURL_ADAPTOR_APP_CB appCB) { g_curl_adaptor_app_cb = appCB; return SUCCESS; } static int curl_adaptor_init_restfulInfo(const char* queryIP, unsigned int queryPort, const char* rechargeIP, unsigned int rechargePort) { strcpy(g_restful.queryIP, queryIP); g_restful.queryPort = queryPort; strcpy(g_restful.recharegeIP, rechargeIP); g_restful.recharegePort = rechargePort; return SUCCESS; } int curl_adaptor_add(SCurlAdaptor* curAdaptor) { int len = 0; int rc = FAILURE; if(QueueIsFull(g_curl_queue)) { rc = FAILURE; goto END; } int urlType = curAdaptor->urlType; switch(urlType) { case URL_FETCH_UPBalance: case URL_FETCH_InfoBalance: case URL_FETCH_WalletBalance_Getsubsid: { len = snprintf(curAdaptor->urlBuf, RESTFUL_CUL_BUF_MAX, "%s%s:%d", URLPrefix, g_restful.queryIP, g_restful.queryPort); len += snprintf(curAdaptor->urlBuf + len, RESTFUL_CUL_BUF_MAX - len, g_tab_url[urlType].curlFormat, curAdaptor->msg.query.msisdn); } break; case URL_FETCH_WalletBalance: { len = snprintf(curAdaptor->urlBuf, RESTFUL_CUL_BUF_MAX, "%s%s:%d", URLPrefix, g_restful.queryIP, g_restful.queryPort); if(REST_PROXY_SUBSCRIBID_FLAG) { /* 配置为query/topup流程都要先获取subscriber-id. query/topup的时候要将subscriber-id送出去 */ len += snprintf(curAdaptor->urlBuf + len, RESTFUL_CUL_BUF_MAX - len, g_tab_url[urlType].curlFormat, curAdaptor->msg.query.subsid, curAdaptor->msg.query.msisdn); } else { /* 配置为query/topup流程不用先获取subscriber-id,不需要将subscriber-id送出去 */ len += snprintf(curAdaptor->urlBuf + len, RESTFUL_CUL_BUF_MAX - len, g_tab_url[urlType].curlFormat, curAdaptor->msg.query.msisdn); } } break; case URL_RECHAGRGE_Getsubsid: { len = snprintf(curAdaptor->urlBuf, RESTFUL_CUL_BUF_MAX, "%s%s:%d", URLPrefix, g_restful.recharegeIP, g_restful.recharegePort); len += snprintf(curAdaptor->urlBuf + len, RESTFUL_CUL_BUF_MAX - len, g_tab_url[urlType].curlFormat, curAdaptor->msg.topup.msisdn); } break; case URL_RECHAGRGE: { len = snprintf(curAdaptor->urlBuf, RESTFUL_CUL_BUF_MAX, "%s%s:%d", URLPrefix, g_restful.recharegeIP, g_restful.recharegePort); if(REST_PROXY_SUBSCRIBID_FLAG) { /* 配置为query/topup流程都要先获取subscriber-id. query/topup的时候要将subscriber-id送出去 */ len += snprintf(curAdaptor->urlBuf + len, RESTFUL_CUL_BUF_MAX - len, g_tab_url[urlType].curlFormat, curAdaptor->msg.topup.subsid); } else { /* 配置为query/topup流程不用先获取subscriber-id,不需要将subscriber-id送出去 */ len += snprintf(curAdaptor->urlBuf + len, RESTFUL_CUL_BUF_MAX - len, g_tab_url[urlType].curlFormat, curAdaptor->msg.topup.msisdn); } } break; case URL_TRANS_GetsubsidSender: { len = snprintf(curAdaptor->urlBuf, RESTFUL_CUL_BUF_MAX, "%s%s:%d", URLPrefix, g_restful.recharegeIP, g_restful.recharegePort); len += snprintf(curAdaptor->urlBuf + len, RESTFUL_CUL_BUF_MAX - len, g_tab_url[urlType].curlFormat, curAdaptor->msg.transfer.msisdn_sender); } break; case URL_TRANS_GetsubsidReceiver: { len = snprintf(curAdaptor->urlBuf, RESTFUL_CUL_BUF_MAX, "%s%s:%d", URLPrefix, g_restful.recharegeIP, g_restful.recharegePort); len += snprintf(curAdaptor->urlBuf + len, RESTFUL_CUL_BUF_MAX - len, g_tab_url[urlType].curlFormat, curAdaptor->msg.transfer.msisdn_receiver); } break; case URL_TRANS: { len = snprintf(curAdaptor->urlBuf, RESTFUL_CUL_BUF_MAX, "%s%s:%d", URLPrefix, g_restful.recharegeIP, g_restful.recharegePort); len += snprintf(curAdaptor->urlBuf + len, RESTFUL_CUL_BUF_MAX - len, "%s", g_tab_url[urlType].curlFormat/*, curAdaptor->msg.transfer.subsid_sender*/); } break; case URL_PPC_OfferGetAllV2_1: { len = snprintf(curAdaptor->urlBuf, RESTFUL_CUL_BUF_MAX, "%s%s:%d", URLPrefix, g_restful.queryIP, g_restful.queryPort); len += snprintf(curAdaptor->urlBuf + len, RESTFUL_CUL_BUF_MAX - len, g_tab_url[urlType%URL_MAX_NUM].curlFormat, "-weekly,daily"); } break; default: { rc = FAILURE; goto END; } break; } if(SUCCESS_QUEUE == QueueEnter(g_curl_queue, (queueElementT)curAdaptor) ) { rc = SUCCESS; } else { rc = FAILURE; } END: return rc; } /* curl_multi_perform() + curl_multi_wait() */ int curl_adaptor_go(SCurlAdaptor *curlAdaptor) { CURLM *cm = NULL; CURL *eh = NULL; CURLMsg *msg = NULL; CURLcode return_code = 0; int still_running = 0, msgs_left = 0; int http_status_code; cm = curl_multi_init(); if(cm == NULL) return FAILURE; /* we can optionally limit the total amount of connections this multi handle uses */ curl_multi_setopt(cm, CURLMOPT_MAXCONNECTS, (long)CURL_MAX_CONN); curl_adaptor_easy_init(cm, curlAdaptor); curl_multi_perform(cm, &still_running); do { int numfds=0; int res = curl_multi_wait(cm, NULL, 0, CURL_WAIT_TIME_US, &numfds); if(res != CURLM_OK) { LOG_E("[curl][recv] error: curl_multi_wait() returned %d\n", res); return FAILURE; } curl_multi_perform(cm, &still_running); }while(still_running); while((msg = curl_multi_info_read(cm, &msgs_left))) { if (msg->msg == CURLMSG_DONE) { curlAdaptor = NULL; eh = msg->easy_handle; return_code = msg->data.result; curl_easy_getinfo(eh, CURLINFO_PRIVATE, &curlAdaptor); if(return_code != CURLE_OK) { LOG_E("[curl][recv] error: curl_multi_info_read() returned %d\n", return_code); g_curl_adaptor_app_cb(curlAdaptor, msg->data.result); curl_multi_remove_handle(cm, eh); curl_easy_cleanup(eh); continue; } /* Get HTTP status code */ http_status_code = 0; curl_easy_getinfo(eh, CURLINFO_RESPONSE_CODE, &http_status_code); if(http_status_code != 200) { LOG_E("[curl][recv] get of %p returned http status code %d\n", curlAdaptor, http_status_code); } g_curl_adaptor_app_cb(curlAdaptor, http_status_code); curl_multi_remove_handle(cm, eh); curl_easy_cleanup(eh); } else { LOG_E("[curl][recv] error: after curl_multi_info_read(), CURLMsg=%d\n", msg->msg); } } curl_multi_cleanup(cm); return SUCCESS; } int curl_adaptor_init_queue() { g_curl_queue = QueueCreate(3000); return g_curl_queue != NULL; } static void *curl_adaptor_thread(void *data) { prctl(PR_SET_NAME, "curl_queue"); SCurlAdaptor *curAdaptor; while(g_curl_thread_running) { if( !QueueIsEmpty(g_curl_queue) ) { curAdaptor = (SCurlAdaptor *)QueueDelete(g_curl_queue); if(curAdaptor) { curl_adaptor_go(curAdaptor); } } usleep(10); } pthread_exit(NULL); return NULL; } int curl_adaptor_init_thread() { pthread_t handle; g_curl_thread_running = 1; if(pthread_create(&handle, NULL, curl_adaptor_thread, NULL)) { printf("Thread create err !!!\n"); return FAILURE; if ( pthread_detach(handle) ) { printf("Thread detached err !!!\n"); return FAILURE; } } return SUCCESS; } int curl_adaptor_init(CURL_ADAPTOR_APP_CB appCB, const char* queryIP, unsigned int queryPort, const char* rechargeIP, unsigned int rechargePort) { curl_adaptor_init_queue(); curl_adaptor_init_thread(); curl_adaptor_init_cb(appCB); curl_adaptor_init_restfulInfo(queryIP, queryPort, rechargeIP, rechargePort); return SUCCESS; } int curl_adaptor_fini() { g_curl_thread_running = 0; QueueDestroy(g_curl_queue); return SUCCESS; }