diff --git a/src/core.h b/src/core.h index 1a10bb3..2dcc698 100644 --- a/src/core.h +++ b/src/core.h @@ -30,6 +30,7 @@ #include "trie.h" #include "list.h" +#include "network.h" #include "hashtable.h" @@ -62,6 +63,7 @@ struct sol_client { char *client_id; int fd; struct session session; + struct buffer buf; }; diff --git a/src/network.c b/src/network.c index 5a8033e..a49f9f0 100644 --- a/src/network.c +++ b/src/network.c @@ -49,6 +49,41 @@ #define EVLOOP_INITIAL_SIZE 4 +static const int INITIAL_BUFLEN = 4; + + +void buffer_create(struct buffer *buf) { + buf->bytes = sol_malloc(INITIAL_BUFLEN); + buf->size = INITIAL_BUFLEN; + buf->start = buf->end = 0; +} + + +void buffer_release(struct buffer *buf) { + sol_free(buf->bytes); + buf->bytes = NULL; + buf->start = buf->end = 0; +} + + +void buffer_push_back(struct buffer *buf, unsigned char *bytes, size_t len) { + + // Re-size buffer in case of len surpassing buffer size + if (len > buf->size / 2) { + buf->size *= 2; + buf->bytes = sol_realloc(buf->bytes, buf->size); + } + + memcpy(buf->bytes + buf->end, bytes, len); + buf->end += len; + +} + + +int buffer_empty(const struct buffer *buf) { + return buf->end == buf->start; +} + /* Set non-blocking socket */ int set_nonblocking(int fd) { int flags, result; @@ -122,7 +157,7 @@ static int create_and_bind_tcp(const char *host, const char *port) { if (sfd == -1) continue; /* set SO_REUSEADDR so the socket will be reusable after process kill */ - if (setsockopt(sfd, SOL_SOCKET, (SO_REUSEPORT | SO_REUSEADDR), + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &(int) { 1 }, sizeof(int)) < 0) perror("SO_REUSEADDR"); diff --git a/src/network.h b/src/network.h index 4988c01..9bbe96d 100644 --- a/src/network.h +++ b/src/network.h @@ -31,6 +31,7 @@ #include #include +#include #include "util.h" @@ -38,6 +39,28 @@ #define UNIX 0 #define INET 1 +/* + * Buffer queue, mainly for remaining bytes to send, for EWOULDBLOCK or EAGAIN + * on non-blocking socket + */ +struct buffer { + size_t size; + size_t start; + size_t end; + unsigned char *bytes; +}; + +// Buffer build functions +void buffer_create(struct buffer *); + +void buffer_release(struct buffer *); + +/* Copy an array of bytes into the buffer */ +void buffer_push_back(struct buffer *, unsigned char *, size_t); + +/* Check if the buffer is empty by comparing start and end cursors */ +int buffer_empty(const struct buffer *); + /* Event loop wrapper structure, define an EPOLL loop and his status. The * EPOLL instance use EPOLLONESHOT for each event and must be re-armed * manually, in order to allow future uses on a multithreaded architecture. @@ -78,7 +101,6 @@ struct closure { callback *call; }; - /* Set non-blocking socket */ int set_nonblocking(int); diff --git a/src/server.c b/src/server.c index 569097e..eff4357 100644 --- a/src/server.c +++ b/src/server.c @@ -256,6 +256,7 @@ static int connect_handler(struct closure *cb, union mqtt_packet *pkt) { new_client->fd = cb->fd; const char *cid = (const char *) pkt->connect.payload.client_id; new_client->client_id = sol_strdup(cid); + buffer_create(&new_client->buf); hashtable_put(sol.clients, cid, new_client); /* Substitute fd on callback with closure */ @@ -653,12 +654,31 @@ static int pingreq_handler(struct closure *cb, union mqtt_packet *pkt) { static void on_write(struct evloop *loop, void *arg) { struct closure *cb = arg; + struct sol_client *c = cb->obj; ssize_t sent; + + if (buffer_empty(&c->buf) == 0) { + + /* Check if there's still some remaning bytes to send out */ + if ((sent = send_bytes(cb->fd, c->buf.bytes + c->buf.start, c->buf.end)) < 0) + sol_error("Error writing on socket to client %s: %s", + ((struct sol_client *) cb->obj)->client_id, strerror(errno)); + + /* Update buffer pointers */ + c->buf.start += sent; + + } + if ((sent = send_bytes(cb->fd, cb->payload->data, cb->payload->size)) < 0) sol_error("Error writing on socket to client %s: %s", ((struct sol_client *) cb->obj)->client_id, strerror(errno)); + /* Update client buffer for remaining bytes to send */ + if (sent < cb->payload->size) + buffer_push_back(&c->buf, cb->payload->data + sent, + cb->payload->size - sent); + // Update information stats info.bytes_sent += sent; bytestring_release(cb->payload); @@ -977,6 +997,8 @@ static int client_destructor(struct hashtable_entry *entry) { if (client->client_id) sol_free(client->client_id); + buffer_release(&client->buf); + sol_free(client); return 0; diff --git a/src/util.h b/src/util.h index ef1006f..da87876 100644 --- a/src/util.h +++ b/src/util.h @@ -59,7 +59,7 @@ void *sol_realloc(void *, size_t); size_t malloc_size(void *); void sol_free(void *); char *sol_strdup(const char *); -char *remove_occur(char *str, char c); +char *remove_occur(char *, char); char *append_string(char *, char *, size_t); size_t memory_used(void);