Skip to content

Commit

Permalink
async publish
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-babu committed Feb 5, 2019
1 parent 28d9a02 commit 6494a67
Showing 1 changed file with 7 additions and 149 deletions.
156 changes: 7 additions & 149 deletions kore-publisher/src/async.c
Original file line number Diff line number Diff line change
@@ -1,134 +1,13 @@
#include "kore-publisher.h"
#include "async.h"

#include <libpq-fe.h>

static PGconn *conn = NULL;
static struct kore_buf *query = NULL;
static struct kore_buf *response = NULL;

static char exchange [MAX_LEN_RESOURCE_ID + 1];

static char string_to_be_hashed [MAX_LEN_APIKEY + MAX_LEN_SALT + MAX_LEN_ENTITY_ID + 1];
static uint8_t binary_hash [SHA256_DIGEST_LENGTH];
static char hash_string [2*SHA256_DIGEST_LENGTH + 1];

static ht connection_ht;

#define MAX_ASYNC_THREADS (5)

static int queue_index = 0;

static pthread_t thread [MAX_ASYNC_THREADS];
static Q thread_q[MAX_ASYNC_THREADS];

static
bool
async_login_success (const char *id, const char *apikey, bool *is_autonomous)
{
char *salt;
char *password_hash;
char *str_is_autonomous;

bool login_success = false;

PGresult *res = NULL;

if (id == NULL || apikey == NULL || *id == '\0' || *apikey == '\0')
goto done;

if (id[0] < 'a' || id[0] > 'z')
goto done;

if (! is_string_safe(id))
goto done;

CREATE_STRING (query,
"SELECT salt,password_hash,is_autonomous FROM users "
"WHERE id='%s' AND blocked='f'",
id
);

debug_printf("async login query = {%s}\n",query->data);

res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
goto done;

PQgetvalue(res, 0, 0);
if (PQntuples(res) == 0)
goto done;

salt = PQgetvalue(res,0,0);
password_hash = PQgetvalue(res,0,1);
str_is_autonomous = PQgetvalue(res,0,2);

if (is_autonomous)
*is_autonomous = false;

// there is no salt or password hash in db ?
if (salt[0] == '\0' || password_hash[0] == '\0')
goto done;

if (is_autonomous)
*is_autonomous = str_is_autonomous[0] == 't';

snprintf (string_to_be_hashed,
MAX_LEN_HASH_INPUT + 1,
"%s%s%s",
apikey, salt, id);

SHA256 (
(const uint8_t*)string_to_be_hashed,
strnlen (string_to_be_hashed,MAX_LEN_HASH_INPUT),
binary_hash
);

debug_printf("login success STRING TO BE HASHED = {%s}\n",
string_to_be_hashed);
snprintf
(
hash_string,
1 + 2*SHA256_DIGEST_LENGTH,
"%02x%02x%02x%02x"
"%02x%02x%02x%02x"
"%02x%02x%02x%02x"
"%02x%02x%02x%02x"
"%02x%02x%02x%02x"
"%02x%02x%02x%02x"
"%02x%02x%02x%02x"
"%02x%02x%02x%02x",
binary_hash[ 0],binary_hash[ 1],binary_hash[ 2],binary_hash[ 3],
binary_hash[ 4],binary_hash[ 5],binary_hash[ 6],binary_hash[ 7],
binary_hash[ 8],binary_hash[ 9],binary_hash[10],binary_hash[11],
binary_hash[12],binary_hash[13],binary_hash[14],binary_hash[15],
binary_hash[16],binary_hash[17],binary_hash[18],binary_hash[19],
binary_hash[20],binary_hash[21],binary_hash[22],binary_hash[23],
binary_hash[24],binary_hash[25],binary_hash[26],binary_hash[27],
binary_hash[28],binary_hash[29],binary_hash[30],binary_hash[31]
);

hash_string[2*SHA256_DIGEST_LENGTH] = '\0';

debug_printf("Expecting it to be {%s} got {%s}\n",
password_hash,
hash_string
);

if (strncmp(hash_string,password_hash,64) == 0) {
login_success = true;
debug_printf("Login OK\n");
}

done:
if (res)
PQclear(res);

kore_buf_reset(query);

return login_success;
}

static
void*
async_publish_function (void *v)
Expand All @@ -148,6 +27,8 @@ async_publish_function (void *v)
const char *message_type;
const char *content_type;

char exchange [MAX_LEN_RESOURCE_ID + 1];

char subject_to_publish[MAX_LEN_TOPIC + 1];

amqp_connection_state_t *cached_conn = NULL;
Expand All @@ -159,6 +40,7 @@ async_publish_function (void *v)
memset(&props, 0, sizeof props);
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_USER_ID_FLAG;

ht connection_ht;
ht_init (&connection_ht);

while (1)
Expand Down Expand Up @@ -191,10 +73,6 @@ async_publish_function (void *v)

snprintf(exchange,MAX_LEN_RESOURCE_ID + 1,"%s.%s",id,message_type);
strlcpy(subject_to_publish,subject,MAX_LEN_TOPIC);

printf("==> exchange = %s\n",exchange);
printf("==> topic = %s\n",subject_to_publish);
printf("==> message = %s\n",message);
}
else
{
Expand Down Expand Up @@ -222,9 +100,6 @@ async_publish_function (void *v)
if (! looks_like_a_valid_entity(id))
goto done;

if (! async_login_success(id,apikey,NULL))
goto done;

/////////////////////////////////////////////////

cached_conn = malloc(sizeof(amqp_connection_state_t));
Expand All @@ -239,7 +114,7 @@ async_publish_function (void *v)
goto done;

if (amqp_socket_open(socket, "broker", 5672))
goto done;
goto done;

login_reply = amqp_login(
*cached_conn,
Expand Down Expand Up @@ -283,10 +158,8 @@ async_publish_function (void *v)
{
goto done;
}

printf("Published to {%s} {%s} {%s}\n",exchange,message,subject);

done:

free (data);
}

Expand All @@ -305,6 +178,8 @@ publish_async (struct http_request *req)
const char *message_type;
const char *content_type;

struct kore_buf *response = kore_buf_alloc(128);

req->status = 403;

BAD_REQUEST_if
Expand Down Expand Up @@ -401,23 +276,6 @@ int async_init (char *connection_str)
{
int i;

if (conn == NULL)
conn = PQconnectdb(connection_str);

if (PQstatus(conn) == CONNECTION_BAD)
{
fprintf(stderr, "Connection to database failed: %s\n",PQerrorMessage(conn));
PQfinish(conn);

return KORE_RESULT_ERROR;
}

if (query == NULL)
query = kore_buf_alloc(512);

if (response == NULL)
response = kore_buf_alloc(1024*1024);

for (i = 0; i < MAX_ASYNC_THREADS; ++i)
{
q_init(&thread_q[i]);
Expand Down

0 comments on commit 6494a67

Please sign in to comment.