mirror of
https://github.com/LBRYFoundation/pool.git
synced 2025-09-30 15:20:36 +00:00
stratum: add a flag indicating we need exit
can be useful to filter real valgrind leaks on exit and trap reboot signal also protect against db queries after close
This commit is contained in:
parent
3552577ad3
commit
6d3dd22082
6 changed files with 48 additions and 15 deletions
|
@ -429,7 +429,7 @@ void *client_thread(void *p)
|
||||||
client->shares_per_minute = YAAMP_SHAREPERSEC;
|
client->shares_per_minute = YAAMP_SHAREPERSEC;
|
||||||
client->last_submit_time = current_timestamp();
|
client->last_submit_time = current_timestamp();
|
||||||
|
|
||||||
while(1)
|
while(!g_exiting)
|
||||||
{
|
{
|
||||||
if(client->submit_bad > 1024)
|
if(client->submit_bad > 1024)
|
||||||
{
|
{
|
||||||
|
|
|
@ -4,6 +4,11 @@
|
||||||
|
|
||||||
void db_reconnect(YAAMP_DB *db)
|
void db_reconnect(YAAMP_DB *db)
|
||||||
{
|
{
|
||||||
|
if (g_exiting) {
|
||||||
|
db_close(db);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
mysql_init(&db->mysql);
|
mysql_init(&db->mysql);
|
||||||
for(int i=0; i<6; i++)
|
for(int i=0; i<6; i++)
|
||||||
{
|
{
|
||||||
|
@ -13,6 +18,7 @@ void db_reconnect(YAAMP_DB *db)
|
||||||
stratumlog("%d, %s\n", i, mysql_error(&db->mysql));
|
stratumlog("%d, %s\n", i, mysql_error(&db->mysql));
|
||||||
sleep(10);
|
sleep(10);
|
||||||
|
|
||||||
|
mysql_close(&db->mysql);
|
||||||
mysql_init(&db->mysql);
|
mysql_init(&db->mysql);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -27,8 +33,11 @@ YAAMP_DB *db_connect()
|
||||||
|
|
||||||
void db_close(YAAMP_DB *db)
|
void db_close(YAAMP_DB *db)
|
||||||
{
|
{
|
||||||
mysql_close(&db->mysql);
|
if (db) {
|
||||||
delete db;
|
mysql_close(&db->mysql);
|
||||||
|
delete db;
|
||||||
|
}
|
||||||
|
db = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *db_clean_string(YAAMP_DB *db, char *string)
|
char *db_clean_string(YAAMP_DB *db, char *string)
|
||||||
|
@ -46,6 +55,7 @@ void db_query(YAAMP_DB *db, const char *format, ...)
|
||||||
{
|
{
|
||||||
va_list arglist;
|
va_list arglist;
|
||||||
va_start(arglist, format);
|
va_start(arglist, format);
|
||||||
|
if(!db) return;
|
||||||
|
|
||||||
char *buffer = (char *)malloc(YAAMP_SMALLBUFSIZE+strlen(format));
|
char *buffer = (char *)malloc(YAAMP_SMALLBUFSIZE+strlen(format));
|
||||||
if(!buffer) return;
|
if(!buffer) return;
|
||||||
|
@ -53,7 +63,7 @@ void db_query(YAAMP_DB *db, const char *format, ...)
|
||||||
int len = vsprintf(buffer, format, arglist);
|
int len = vsprintf(buffer, format, arglist);
|
||||||
va_end(arglist);
|
va_end(arglist);
|
||||||
|
|
||||||
while(1)
|
while(!g_exiting)
|
||||||
{
|
{
|
||||||
int res = mysql_query(&db->mysql, buffer);
|
int res = mysql_query(&db->mysql, buffer);
|
||||||
if(!res) break;
|
if(!res) break;
|
||||||
|
@ -62,6 +72,7 @@ void db_query(YAAMP_DB *db, const char *format, ...)
|
||||||
stratumlog("SQL ERROR: %d, %s\n", res, mysql_error(&db->mysql));
|
stratumlog("SQL ERROR: %d, %s\n", res, mysql_error(&db->mysql));
|
||||||
if(res != CR_SERVER_GONE_ERROR && res != CR_SERVER_LOST) exit(1);
|
if(res != CR_SERVER_GONE_ERROR && res != CR_SERVER_LOST) exit(1);
|
||||||
|
|
||||||
|
usleep(100*YAAMP_MS);
|
||||||
db_reconnect(db);
|
db_reconnect(db);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,6 +85,7 @@ void db_register_stratum(YAAMP_DB *db)
|
||||||
{
|
{
|
||||||
int pid = getpid();
|
int pid = getpid();
|
||||||
int t = time(NULL);
|
int t = time(NULL);
|
||||||
|
if(!db) return;
|
||||||
|
|
||||||
db_query(db, "insert into stratums (pid, time, algo) values (%d, %d, '%s') on duplicate key update time=%d",
|
db_query(db, "insert into stratums (pid, time, algo) values (%d, %d, '%s') on duplicate key update time=%d",
|
||||||
pid, t, g_current_algo->name, t);
|
pid, t, g_current_algo->name, t);
|
||||||
|
@ -81,6 +93,8 @@ void db_register_stratum(YAAMP_DB *db)
|
||||||
|
|
||||||
void db_update_algos(YAAMP_DB *db)
|
void db_update_algos(YAAMP_DB *db)
|
||||||
{
|
{
|
||||||
|
if(!db) return;
|
||||||
|
|
||||||
if(g_current_algo->overflow)
|
if(g_current_algo->overflow)
|
||||||
{
|
{
|
||||||
debuglog("setting overflow\n");
|
debuglog("setting overflow\n");
|
||||||
|
@ -127,6 +141,8 @@ void db_update_algos(YAAMP_DB *db)
|
||||||
|
|
||||||
void db_update_coinds(YAAMP_DB *db)
|
void db_update_coinds(YAAMP_DB *db)
|
||||||
{
|
{
|
||||||
|
if(!db) return;
|
||||||
|
|
||||||
for(CLI li = g_list_coind.first; li; li = li->next)
|
for(CLI li = g_list_coind.first; li; li = li->next)
|
||||||
{
|
{
|
||||||
YAAMP_COIND *coind = (YAAMP_COIND *)li->data;
|
YAAMP_COIND *coind = (YAAMP_COIND *)li->data;
|
||||||
|
@ -291,6 +307,8 @@ void db_update_coinds(YAAMP_DB *db)
|
||||||
|
|
||||||
void db_update_remotes(YAAMP_DB *db)
|
void db_update_remotes(YAAMP_DB *db)
|
||||||
{
|
{
|
||||||
|
if(!db) return;
|
||||||
|
|
||||||
db_query(db, "select id, speed/1000000, host, port, username, password, time, price, renterid from jobs where active and ready and algo='%s' order by time", g_stratum_algo);
|
db_query(db, "select id, speed/1000000, host, port, username, password, time, price, renterid from jobs where active and ready and algo='%s' order by time", g_stratum_algo);
|
||||||
|
|
||||||
MYSQL_RES *result = mysql_store_result(&db->mysql);
|
MYSQL_RES *result = mysql_store_result(&db->mysql);
|
||||||
|
@ -420,6 +438,8 @@ void db_update_remotes(YAAMP_DB *db)
|
||||||
|
|
||||||
void db_update_renters(YAAMP_DB *db)
|
void db_update_renters(YAAMP_DB *db)
|
||||||
{
|
{
|
||||||
|
if(!db) return;
|
||||||
|
|
||||||
db_query(db, "select id, balance, updated from renters");
|
db_query(db, "select id, balance, updated from renters");
|
||||||
|
|
||||||
MYSQL_RES *result = mysql_store_result(&db->mysql);
|
MYSQL_RES *result = mysql_store_result(&db->mysql);
|
||||||
|
@ -456,12 +476,10 @@ static void _json_str_safe(YAAMP_DB *db, json_value *json, const char *key, size
|
||||||
{
|
{
|
||||||
json_value *val = json_get_val(json, key);
|
json_value *val = json_get_val(json, key);
|
||||||
out[0] = '\0';
|
out[0] = '\0';
|
||||||
if (val && json_is_string(val)) {
|
if (db && val && json_is_string(val)) {
|
||||||
strncpy(out, json_string_value(val), maxlen);
|
strncpy(out, json_string_value(val), maxlen);
|
||||||
out[maxlen-1] = '\0';
|
out[maxlen-1] = '\0';
|
||||||
db_clean_string(db, out);
|
db_clean_string(db, out);
|
||||||
} else {
|
|
||||||
//debuglog("stats: invalid string for field '%s'\n", key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#define json_str_safe(stats, k, out) _json_str_safe(db, stats, k, sizeof(out), out)
|
#define json_str_safe(stats, k, out) _json_str_safe(db, stats, k, sizeof(out), out)
|
||||||
|
@ -487,6 +505,8 @@ void db_store_stats(YAAMP_DB *db, YAAMP_CLIENT *client, json_value *stats)
|
||||||
double khashes, intensity, throughput;
|
double khashes, intensity, throughput;
|
||||||
int power, freq, memf;
|
int power, freq, memf;
|
||||||
|
|
||||||
|
if (!db) return;
|
||||||
|
|
||||||
json_str_safe(stats, "algo", salgo);
|
json_str_safe(stats, "algo", salgo);
|
||||||
if (strcasecmp(g_current_algo->name, salgo) && client->submit_bad) {
|
if (strcasecmp(g_current_algo->name, salgo) && client->submit_bad) {
|
||||||
// debuglog("stats: wrong algo used %s != %s", salgo, g_current_algo->name);
|
// debuglog("stats: wrong algo used %s != %s", salgo, g_current_algo->name);
|
||||||
|
|
|
@ -233,7 +233,7 @@ pthread_cond_t g_job_cond;
|
||||||
void *job_thread(void *p)
|
void *job_thread(void *p)
|
||||||
{
|
{
|
||||||
CommonLock(&g_job_mutex);
|
CommonLock(&g_job_mutex);
|
||||||
while(1)
|
while(!g_exiting)
|
||||||
{
|
{
|
||||||
job_update();
|
job_update();
|
||||||
pthread_cond_wait(&g_job_cond, &g_job_mutex);
|
pthread_cond_wait(&g_job_cond, &g_job_mutex);
|
||||||
|
|
|
@ -11,6 +11,7 @@ bool rpc_connected(YAAMP_RPC *rpc)
|
||||||
bool rpc_connect(YAAMP_RPC *rpc)
|
bool rpc_connect(YAAMP_RPC *rpc)
|
||||||
{
|
{
|
||||||
rpc_close(rpc);
|
rpc_close(rpc);
|
||||||
|
if(g_exiting) return false;
|
||||||
|
|
||||||
struct hostent *ent = gethostbyname(rpc->host);
|
struct hostent *ent = gethostbyname(rpc->host);
|
||||||
if(!ent) return false;
|
if(!ent) return false;
|
||||||
|
@ -144,7 +145,7 @@ char *rpc_do_call(YAAMP_RPC *rpc, char const *data)
|
||||||
int bufpos = 0;
|
int bufpos = 0;
|
||||||
char buffer[YAAMP_SMALLBUFSIZE] = { 0 };
|
char buffer[YAAMP_SMALLBUFSIZE] = { 0 };
|
||||||
|
|
||||||
while(1)
|
while(!g_exiting)
|
||||||
{
|
{
|
||||||
int bytes = recv(rpc->sock, buffer+bufpos, YAAMP_SMALLBUFSIZE-bufpos-1, 0);
|
int bytes = recv(rpc->sock, buffer+bufpos, YAAMP_SMALLBUFSIZE-bufpos-1, 0);
|
||||||
#ifdef RPC_DEBUGLOG_
|
#ifdef RPC_DEBUGLOG_
|
||||||
|
|
|
@ -41,6 +41,8 @@ pthread_mutex_t g_job_create_mutex;
|
||||||
|
|
||||||
struct ifaddrs *g_ifaddr;
|
struct ifaddrs *g_ifaddr;
|
||||||
|
|
||||||
|
volatile bool g_exiting = false;
|
||||||
|
|
||||||
void *stratum_thread(void *p);
|
void *stratum_thread(void *p);
|
||||||
void *monitor_thread(void *p);
|
void *monitor_thread(void *p);
|
||||||
|
|
||||||
|
@ -238,10 +240,10 @@ int main(int argc, char **argv)
|
||||||
pthread_t thread2;
|
pthread_t thread2;
|
||||||
pthread_create(&thread2, NULL, stratum_thread, NULL);
|
pthread_create(&thread2, NULL, stratum_thread, NULL);
|
||||||
|
|
||||||
while(1)
|
sleep(20);
|
||||||
{
|
|
||||||
sleep(20);
|
|
||||||
|
|
||||||
|
while(!g_exiting)
|
||||||
|
{
|
||||||
db_register_stratum(db);
|
db_register_stratum(db);
|
||||||
db_update_workers(db);
|
db_update_workers(db);
|
||||||
db_update_algos(db);
|
db_update_algos(db);
|
||||||
|
@ -274,9 +276,16 @@ int main(int argc, char **argv)
|
||||||
object_prune(&g_list_worker, worker_delete);
|
object_prune(&g_list_worker, worker_delete);
|
||||||
object_prune(&g_list_share, share_delete);
|
object_prune(&g_list_share, share_delete);
|
||||||
object_prune(&g_list_submit, submit_delete);
|
object_prune(&g_list_submit, submit_delete);
|
||||||
|
|
||||||
|
if (!g_exiting) sleep(20);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stratumlog("closing database...\n");
|
||||||
db_close(db);
|
db_close(db);
|
||||||
|
|
||||||
|
pthread_join(thread2, NULL);
|
||||||
|
db_close(g_db); // client threads (called by stratum one)
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,14 +293,15 @@ int main(int argc, char **argv)
|
||||||
|
|
||||||
void *monitor_thread(void *p)
|
void *monitor_thread(void *p)
|
||||||
{
|
{
|
||||||
while(1)
|
while(!g_exiting)
|
||||||
{
|
{
|
||||||
sleep(120);
|
sleep(120);
|
||||||
|
|
||||||
if(g_last_broadcasted + YAAMP_MAXJOBDELAY < time(NULL))
|
if(g_last_broadcasted + YAAMP_MAXJOBDELAY < time(NULL))
|
||||||
{
|
{
|
||||||
|
g_exiting = true;
|
||||||
stratumlog("%s dead lock, exiting...\n", g_current_algo->name);
|
stratumlog("%s dead lock, exiting...\n", g_current_algo->name);
|
||||||
exit(1);
|
// exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -320,7 +330,7 @@ void *stratum_thread(void *p)
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
while(1)
|
while(!g_exiting)
|
||||||
{
|
{
|
||||||
int sock = accept(listen_sock, NULL, NULL);
|
int sock = accept(listen_sock, NULL, NULL);
|
||||||
if(sock <= 0)
|
if(sock <= 0)
|
||||||
|
|
|
@ -90,6 +90,8 @@ extern pthread_mutex_t g_db_mutex;
|
||||||
extern pthread_mutex_t g_nonce1_mutex;
|
extern pthread_mutex_t g_nonce1_mutex;
|
||||||
extern pthread_mutex_t g_job_create_mutex;
|
extern pthread_mutex_t g_job_create_mutex;
|
||||||
|
|
||||||
|
extern volatile bool g_exiting;
|
||||||
|
|
||||||
#include "db.h"
|
#include "db.h"
|
||||||
#include "object.h"
|
#include "object.h"
|
||||||
#include "socket.h"
|
#include "socket.h"
|
||||||
|
|
Loading…
Add table
Reference in a new issue