Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jakesmith committed Nov 8, 2024
1 parent 680fc9b commit c08c088
Showing 1 changed file with 46 additions and 56 deletions.
102 changes: 46 additions & 56 deletions system/jlib/jsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4327,7 +4327,7 @@ struct SelectItem
ISocketSelectNotify *nfy;
T_SOCKET handle;
byte mode;
bool del; // only used in select handler method
bool del;
bool operator == (const SelectItem & other) const { return sock == other.sock; }
};

Expand Down Expand Up @@ -4424,6 +4424,9 @@ class CSocketBaseThread: public Thread

CSocketBaseThread(const char *trc) : Thread("CSocketBaseThread"), tickwait(0)
{
#ifdef _WIN32
inithash();
#endif
}

~CSocketBaseThread()
Expand Down Expand Up @@ -4498,43 +4501,70 @@ class CSocketBaseThread: public Thread
dummysockopen = false;
}


#ifdef _WIN32
#define HASHTABSIZE 256
#define HASHNULL (HASHTABSIZE-1)
#define HASHTABMASK (HASHTABSIZE-1)
byte hashtab[HASHTABSIZE];
#define HASHSOCKET(s) ((((unsigned)s)>>2)&HASHTABMASK) // with some knowledge of windows handles

void inithash()
{
memset(&hashtab,HASHNULL,sizeof(hashtab));
assertex(FD_SETSIZE<255);
}

void reinithash()
{ // done this way because index of items changes and hash table not that big
inithash();
assertex(items.ordinality()<HASHTABSIZE-1);
ForEachItemIn(i,items) {
unsigned h = HASHSOCKET(items.item(i)->handle);
for (;;) {
if (hashtab[h]==HASHNULL) {
hashtab[h] = (byte)i;
break;
}
if (++h==HASHTABSIZE)
h = 0;
}
}
}
#endif

void updateItems()
{
// must be in CriticalBlock block(sect);
unsigned n = items.ordinality();
#ifdef _WIN32
bool hashupdateneeded = (n!=basesize); // additions all come at end
#endif
for (unsigned i=0;i<n;)
while (n)
{
SelectItem &si = *items.element(i);
if (si.del)
SelectItem *si = items.element(n);
if (si->del)
{
// Release/dtors should not throw but leaving try/catch here until all paths checked
try
{
#ifdef SOCKTRACE
PROGLOG("CSocketSelectThread::updateItems release %d",si.handle);
#endif
si.nfy->Release();
si.sock->Release();
si->nfy->Release();
si->sock->Release();
delete si;
}
catch (IException *e)
{
EXCLOG(e,"CSocketSelectThread::updateItems");
e->Release();
}
// NOTE: si is a reference, put last item into remove slot
n--;
if (i<n)
si = *items.item(n);
items.remove(n);
#ifdef _WIN32
hashupdateneeded = true;
#endif
}
else
i++;
}
assertex(n<=XFD_SETSIZE-1);
#ifdef _WIN32
Expand Down Expand Up @@ -4599,8 +4629,6 @@ class CSocketBaseThread: public Thread

bool remove(ISocket *sock)
{
assertex(sock);

if (terminating)
return true; // pretend is, to short-circuit caller
CriticalBlock block(sect);
Expand Down Expand Up @@ -4740,37 +4768,7 @@ class CSocketSelectThread: public CSocketBaseThread
closeDummyPipe();
}


#ifdef _WIN32
#define HASHTABSIZE 256
#define HASHNULL (HASHTABSIZE-1)
#define HASHTABMASK (HASHTABSIZE-1)
byte hashtab[HASHTABSIZE];
#define HASHSOCKET(s) ((((unsigned)s)>>2)&HASHTABMASK) // with some knowledge of windows handles

void inithash()
{
memset(&hashtab,HASHNULL,sizeof(hashtab));
assertex(FD_SETSIZE<255);
}

void reinithash()
{ // done this way because index of items changes and hash table not that big
inithash();
assertex(items.ordinality()<HASHTABSIZE-1);
ForEachItemIn(i,items) {
unsigned h = HASHSOCKET(items.item(i)->handle);
for (;;) {
if (hashtab[h]==HASHNULL) {
hashtab[h] = (byte)i;
break;
}
if (++h==HASHTABSIZE)
h = 0;
}
}
}

inline SelectItem &findhash(T_SOCKET handle)
{
unsigned h = HASHSOCKET(handle);
Expand All @@ -4784,7 +4782,6 @@ class CSocketSelectThread: public CSocketBaseThread
assertex(h!=sh);
}
}

inline void processfds(T_FD_SET &s,byte mode,SelectItemArray &tonotify)
{
for (;;) {
Expand All @@ -4805,8 +4802,8 @@ class CSocketSelectThread: public CSocketBaseThread
}
}
}
#endif

#endif
public:
CSocketSelectThread(const char *trc)
: CSocketBaseThread("CSocketSelectThread")
Expand All @@ -4821,9 +4818,6 @@ class CSocketSelectThread: public CSocketBaseThread
offset = 0;
selecttrace = trc;
basesize = 0;
#ifdef _WIN32
inithash();
#endif
}

~CSocketSelectThread()
Expand Down Expand Up @@ -5263,10 +5257,11 @@ class CSocketEpollThread: public CSocketBaseThread
if (dummysockopen)
return;
openDummyPipe();
// NB: not notified when triggered, because explicitly spotted and handled before tonotify loop
sidummy = new SelectItem;
sidummy->sock = nullptr;
sidummy->nfy = nullptr;
sidummy->del = true; // so its not added to tonotify ...
sidummy->del = false;
sidummy->mode = 0;
#ifdef _USE_PIPE_FOR_SELECT_TRIGGER
sidummy->handle = dummysock[0];
Expand Down Expand Up @@ -5348,12 +5343,6 @@ class CSocketEpollThread: public CSocketBaseThread
{
closedummy();

// Not sure really necessary at this late stage?
ForEachItemIn(i, items)
{
SelectItem *si = items.element(i);
epoll_op(epfd, EPOLL_CTL_DEL, si, 0);
}
if (epfd >= 0)
{
# ifdef EPOLLTRACE
Expand Down Expand Up @@ -5410,6 +5399,7 @@ class CSocketEpollThread: public CSocketBaseThread
ep_mode |= EPOLLOUT;
if (mode & SELECTMODE_EXCEPT)
ep_mode |= EPOLLPRI;
// JCSMORE - we are in a CS, but it is thread-safe to epoll_wait concurrently with epoll_op
epoll_op(epfd, EPOLL_CTL_ADD, sn, ep_mode);
selectvarschange = true;
triggerselect();
Expand Down

0 comments on commit c08c088

Please sign in to comment.