pool/stratum/rpc.cpp
2016-02-12 22:37:40 +01:00

292 lines
5.7 KiB
C++

#include "stratum.h"
//#define RPC_DEBUGLOG_
bool rpc_connected(YAAMP_RPC *rpc)
{
return rpc->sock > 0;
}
bool rpc_connect(YAAMP_RPC *rpc)
{
rpc_close(rpc);
struct hostent *ent = gethostbyname(rpc->host);
if(!ent) return false;
struct sockaddr_in serv;
serv.sin_family = AF_INET;
serv.sin_port = htons(rpc->port);
bcopy((char *)ent->h_addr, (char *)&serv.sin_addr.s_addr, ent->h_length);
rpc->sock = socket(AF_INET, SOCK_STREAM, 0);
if(rpc->sock <= 0) return false;
int res = connect(rpc->sock, (struct sockaddr *)&serv, sizeof(serv));
if(res < 0)
{
rpc_close(rpc);
return false;
}
yaamp_create_mutex(&rpc->mutex);
rpc->id = 0;
rpc->bufpos = 0;
#ifdef RPC_DEBUGLOG_
debuglog("connected to %s:%d\n", rpc->host, rpc->port);
#endif
return true;
}
void rpc_close(YAAMP_RPC *rpc)
{
if(!rpc_connected(rpc)) return;
pthread_mutex_destroy(&rpc->mutex);
close(rpc->sock);
rpc->sock = 0;
#ifdef RPC_DEBUGLOG_
debuglog("disconnected from %s:%d\n", rpc->host, rpc->port);
#endif
}
///////////////////////////////////////////////////////////////////
int rpc_send_raw(YAAMP_RPC *rpc, const char *buffer, int bytes)
{
if(!rpc_connected(rpc)) return -1;
int res = send(rpc->sock, buffer, bytes, MSG_NOSIGNAL);
if(res <= 0) return res;
#ifdef RPC_DEBUGLOG_
debuglog("sending >%s<\n", buffer);
#endif
return res;
}
int rpc_flush_soft(YAAMP_RPC *rpc)
{
if(!rpc_connected(rpc)) return -1;
int res = send(rpc->sock, rpc->buffer, rpc->bufpos, MSG_MORE);
rpc->bufpos = 0;
return res;
}
int rpc_flush(YAAMP_RPC *rpc)
{
if(!rpc_connected(rpc)) return -1;
int res = rpc_send_raw(rpc, rpc->buffer, rpc->bufpos);
rpc->bufpos = 0;
return res;
}
int rpc_send(YAAMP_RPC *rpc, const char *format, ...)
{
if(!rpc_connected(rpc)) return -1;
char buffer[YAAMP_SMALLBUFSIZE] = { 0 };
va_list args;
va_start(args, format);
vsprintf(buffer, format, args);
va_end(args);
int bytes = strlen(buffer);
if(bytes + rpc->bufpos > YAAMP_SMALLBUFSIZE)
return -1;
memcpy(rpc->buffer + rpc->bufpos, buffer, bytes);
rpc->bufpos += bytes;
return bytes;
}
/////////////////////////////////////////////////////////////////////////////////
char *rpc_do_call(YAAMP_RPC *rpc, char const *data)
{
CommonLock(&rpc->mutex);
// HTTP 1.1 accepts chunked data, and keep the connection
rpc_send(rpc, "POST / HTTP/1.1\r\n");
rpc_send(rpc, "Authorization: Basic %s\r\n", rpc->credential);
rpc_send(rpc, "Host: %s:%d\n", rpc->host, rpc->port);
rpc_send(rpc, "Accept: */*\r\n");
rpc_send(rpc, "Content-Type: application/json\r\n");
rpc_send(rpc, "Content-Length: %d\r\n\r\n", strlen(data));
int res = rpc_flush(rpc);
if(res <= 0)
{
CommonUnlock(&rpc->mutex);
return NULL;
}
res = rpc_send_raw(rpc, data, strlen(data));
if(res <= 0)
{
CommonUnlock(&rpc->mutex);
return NULL;
}
int bufpos = 0;
char buffer[YAAMP_SMALLBUFSIZE] = { 0 };
while(1)
{
int bytes = recv(rpc->sock, buffer+bufpos, YAAMP_SMALLBUFSIZE-bufpos-1, 0);
#ifdef RPC_DEBUGLOG_
debuglog("got %s\n", buffer+bufpos);
#endif
if(bytes <= 0)
{
debuglog("ERROR: recv1, %d, %d, %s, %s\n", bytes, errno, data, buffer);
CommonUnlock(&rpc->mutex);
return NULL;
}
bufpos += bytes;
buffer[bufpos] = 0;
if(strstr(buffer, "\r\n\r\n")) break;
}
///////////////////////////////////////////////////
const char *p = strchr(buffer, ' ');
if(!p)
{
CommonUnlock(&rpc->mutex);
return NULL;
}
int status = atoi(p+1);
if(status != 200)
debuglog("ERROR: rpc_do_call: %s:%d %d\n", rpc->host, rpc->port, status);
char tmp[1024];
header_value(buffer, "Transfer-Encoding:", tmp);
if (!strcmp(tmp, "chunked")) {
#ifdef HAVE_CURL
if (!rpc->curl) debuglog("%s chunked transfer detected, switching to curl!\n",
rpc->coind->symbol);
rpc->curl = 1;
#endif
CommonUnlock(&rpc->mutex);
rpc_connect(rpc);
return NULL;
}
int datalen = atoi(header_value(buffer, "Content-Length:", tmp));
if(!datalen)
{
debuglog("ERROR: rpc No Content-Length header!\n");
CommonUnlock(&rpc->mutex);
return NULL;
}
p = strstr(buffer, "\r\n\r\n");
bufpos = strlen(p+4);
char *databuf = (char *)malloc(datalen+1);
if(!databuf)
{
CommonUnlock(&rpc->mutex);
return NULL;
}
memcpy(databuf, p+4, bufpos+1);
while(bufpos < datalen)
{
int bytes = recv(rpc->sock, databuf+bufpos, datalen-bufpos, 0);
if(bytes <= 0)
{
debuglog("ERROR: recv2, %d, %d, %s\n", bytes, errno, data);
rpc_connect(rpc);
free(databuf);
CommonUnlock(&rpc->mutex);
return NULL;
}
bufpos += bytes;
databuf[bufpos] = 0;
}
CommonUnlock(&rpc->mutex);
header_value(buffer, "Connection:", tmp);
if(strcmp(tmp, "close") == 0)
{
// debuglog("closing connection from %s:%d\n", rpc->host, rpc->port);
rpc_connect(rpc);
}
return databuf;
}
json_value *rpc_call(YAAMP_RPC *rpc, char const *method, char const *params)
{
// debuglog("rpc_call :%d %s\n", rpc->port, method);
#ifdef HAVE_CURL
if (rpc->ssl || rpc->curl)
return rpc_curl_call(rpc, method, params);
#endif
int s1 = current_timestamp();
if(!rpc_connected(rpc)) return NULL;
int paramlen = params? strlen(params): 0;
char *message = (char *)malloc(paramlen+1024);
if(!message) return NULL;
if(params)
sprintf(message, "{\"method\":\"%s\",\"params\":%s,\"id\":\"%d\"}", method, params, ++rpc->id);
else
sprintf(message, "{\"method\":\"%s\",\"id\":\"%d\"}", method, ++rpc->id);
char *buffer = rpc_do_call(rpc, message);
free(message);
if(!buffer) return NULL;
json_value *json = json_parse(buffer, strlen(buffer));
if(!json) {
debuglog("invalid json: %s", buffer);
free(buffer);
return NULL;
}
free(buffer);
int s2 = current_timestamp();
if(s2-s1 > 2000)
debuglog("delay rpc_call %s:%d %s in %d ms\n", rpc->host, rpc->port, method, s2-s1);
if(json->type != json_object)
{
json_value_free(json);
return NULL;
}
return json;
}