Skip to content

Commit

Permalink
Add basic queue for not sent bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
codepr committed Mar 10, 2019
1 parent e1e1345 commit eaf5ebc
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "trie.h"
#include "list.h"
#include "network.h"
#include "hashtable.h"


Expand Down Expand Up @@ -62,6 +63,7 @@ struct sol_client {
char *client_id;
int fd;
struct session session;
struct buffer buf;
};


Expand Down
37 changes: 36 additions & 1 deletion src/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,41 @@
#define EVLOOP_INITIAL_SIZE 4


static const int INITIAL_BUFLEN = 4;


void buffer_create(struct buffer *buf) {
buf->bytes = sol_malloc(INITIAL_BUFLEN);
buf->size = INITIAL_BUFLEN;
buf->start = buf->end = 0;
}


void buffer_release(struct buffer *buf) {
sol_free(buf->bytes);
buf->bytes = NULL;
buf->start = buf->end = 0;
}


void buffer_push_back(struct buffer *buf, unsigned char *bytes, size_t len) {

// Re-size buffer in case of len surpassing buffer size
if (len > buf->size / 2) {
buf->size *= 2;
buf->bytes = sol_realloc(buf->bytes, buf->size);
}

memcpy(buf->bytes + buf->end, bytes, len);
buf->end += len;

}


int buffer_empty(const struct buffer *buf) {
return buf->end == buf->start;
}

/* Set non-blocking socket */
int set_nonblocking(int fd) {
int flags, result;
Expand Down Expand Up @@ -122,7 +157,7 @@ static int create_and_bind_tcp(const char *host, const char *port) {
if (sfd == -1) continue;

/* set SO_REUSEADDR so the socket will be reusable after process kill */
if (setsockopt(sfd, SOL_SOCKET, (SO_REUSEPORT | SO_REUSEADDR),
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR,
&(int) { 1 }, sizeof(int)) < 0)
perror("SO_REUSEADDR");

Expand Down
24 changes: 23 additions & 1 deletion src/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,36 @@

#include <stdio.h>
#include <stdint.h>
#include <sys/types.h>
#include "util.h"


// Socket families
#define UNIX 0
#define INET 1

/*
* Buffer queue, mainly for remaining bytes to send, for EWOULDBLOCK or EAGAIN
* on non-blocking socket
*/
struct buffer {
size_t size;
size_t start;
size_t end;
unsigned char *bytes;
};

// Buffer build functions
void buffer_create(struct buffer *);

void buffer_release(struct buffer *);

/* Copy an array of bytes into the buffer */
void buffer_push_back(struct buffer *, unsigned char *, size_t);

/* Check if the buffer is empty by comparing start and end cursors */
int buffer_empty(const struct buffer *);

/* Event loop wrapper structure, define an EPOLL loop and his status. The
* EPOLL instance use EPOLLONESHOT for each event and must be re-armed
* manually, in order to allow future uses on a multithreaded architecture.
Expand Down Expand Up @@ -78,7 +101,6 @@ struct closure {
callback *call;
};


/* Set non-blocking socket */
int set_nonblocking(int);

Expand Down
22 changes: 22 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ static int connect_handler(struct closure *cb, union mqtt_packet *pkt) {
new_client->fd = cb->fd;
const char *cid = (const char *) pkt->connect.payload.client_id;
new_client->client_id = sol_strdup(cid);
buffer_create(&new_client->buf);
hashtable_put(sol.clients, cid, new_client);

/* Substitute fd on callback with closure */
Expand Down Expand Up @@ -653,12 +654,31 @@ static int pingreq_handler(struct closure *cb, union mqtt_packet *pkt) {
static void on_write(struct evloop *loop, void *arg) {

struct closure *cb = arg;
struct sol_client *c = cb->obj;

ssize_t sent;

if (buffer_empty(&c->buf) == 0) {

/* Check if there's still some remaning bytes to send out */
if ((sent = send_bytes(cb->fd, c->buf.bytes + c->buf.start, c->buf.end)) < 0)
sol_error("Error writing on socket to client %s: %s",
((struct sol_client *) cb->obj)->client_id, strerror(errno));

/* Update buffer pointers */
c->buf.start += sent;

}

if ((sent = send_bytes(cb->fd, cb->payload->data, cb->payload->size)) < 0)
sol_error("Error writing on socket to client %s: %s",
((struct sol_client *) cb->obj)->client_id, strerror(errno));

/* Update client buffer for remaining bytes to send */
if (sent < cb->payload->size)
buffer_push_back(&c->buf, cb->payload->data + sent,
cb->payload->size - sent);

// Update information stats
info.bytes_sent += sent;
bytestring_release(cb->payload);
Expand Down Expand Up @@ -977,6 +997,8 @@ static int client_destructor(struct hashtable_entry *entry) {
if (client->client_id)
sol_free(client->client_id);

buffer_release(&client->buf);

sol_free(client);

return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void *sol_realloc(void *, size_t);
size_t malloc_size(void *);
void sol_free(void *);
char *sol_strdup(const char *);
char *remove_occur(char *str, char c);
char *remove_occur(char *, char);
char *append_string(char *, char *, size_t);

size_t memory_used(void);
Expand Down

0 comments on commit eaf5ebc

Please sign in to comment.