diff --git a/system/jlib/jsocket.cpp b/system/jlib/jsocket.cpp index 5dfd4acde52..7e341db5c2d 100644 --- a/system/jlib/jsocket.cpp +++ b/system/jlib/jsocket.cpp @@ -4327,7 +4327,7 @@ struct SelectItem T_SOCKET handle; ISocketSelectNotify *nfy; byte mode; - bool del; // only used in select handler method + bool del; bool operator == (const SelectItem & other) const { return sock == other.sock; } }; @@ -4406,13 +4406,12 @@ class CSocketBaseThread: public Thread Semaphore ticksem; std::atomic_uint tickwait; unsigned offset; - bool selectvarschange; + std::atomic selectvarschange; unsigned waitingchange; Semaphore waitingchangesem; int validateselecterror; unsigned validateerrcount; const char *selecttrace; - unsigned basesize; #ifdef _USE_PIPE_FOR_SELECT_TRIGGER T_SOCKET dummysock[2]; #else @@ -4547,6 +4546,7 @@ class CSocketBaseThread: public Thread class CSocketSelectThread: public CSocketBaseThread { SelectItemArray items; + unsigned basesize; void opendummy() { @@ -4709,9 +4709,10 @@ class CSocketSelectThread: public CSocketBaseThread Owned termexcept; + // JCSMORE: this is very similar to the version in CSocketEpollThread. Could be commoned if items used common structure. void updateItems() { - // must be in CriticalBlock block(sect); + // must be in CriticalBlock block(sect); unsigned n = items.ordinality(); #ifdef _WIN32 bool hashupdateneeded = (n!=basesize); // additions all come at end @@ -4751,6 +4752,7 @@ class CSocketSelectThread: public CSocketBaseThread basesize = n; } + // JCSMORE: this is very similar to the version in CSocketEpollThread. Could be commoned if items used common structure. bool checkSocks() { bool ret = false; @@ -4798,9 +4800,12 @@ class CSocketSelectThread: public CSocketBaseThread unsigned n=0; ForEachItemIn(i,items) { SelectItem &si = items.element(i); - if (!si.del) { - if (si.sock==sock) { + if (!si.del) + { + if (si.sock==sock) + { si.del = true; + break; } else n++; @@ -5182,6 +5187,7 @@ class CSocketSelectHandler: implements ISocketSelectHandler, public CInterface #ifdef _HAS_EPOLL_SUPPORT # define SOCK_ADDED 0x1 # define SOCK_REMOVED 0x2 +# define SOCK_FAILED 0x4 class CSocketEpollThread: public CSocketBaseThread { int epfd; @@ -5213,15 +5219,51 @@ class CSocketEpollThread: public CSocketBaseThread } } + // JCSMORE: this is very similar to the version in CSocketSelectThread. Could be commoned if items used common structure. + void updateItems() + { + // must be in CriticalBlock block(sect); + unsigned n = items.ordinality(); + for (unsigned i=0;idel) + { + // Release/dtors should not throw but leaving try/catch here until all paths checked + try + { +#ifdef SOCKTRACE + PROGLOG("CSocketSelectThread::updateItems release %d",si.handle); +#endif + si->nfy->Release(); + si->sock->Release(); + delete si; + } + catch (IException *e) + { + EXCLOG(e,"CSocketSelectThread::updateItems"); + e->Release(); + } + n--; + if (i < n) + items.swap(i, n); + items.remove(n); + } + else + i++; + } + } + void opendummy() { CriticalBlock block(sect); if (!dummysockopen) { + // NB: not notified when triggered, because explicitly spotted and handled before tonotify loop sidummy = new SelectItem; sidummy->sock = nullptr; sidummy->nfy = nullptr; - sidummy->del = true; // so its not added to tonotify ... + sidummy->del = false; sidummy->mode = 0; #ifdef _USE_PIPE_FOR_SELECT_TRIGGER if(pipe(dummysock)) @@ -5281,32 +5323,32 @@ class CSocketEpollThread: public CSocketBaseThread } } - void delSelItem(SelectItem *si) - { - epoll_op(epfd, EPOLL_CTL_DEL, si, 0); - // Release/dtors should not throw but leaving try/catch here until all paths checked - try - { - si->nfy->Release(); - si->sock->Release(); - delete si; - } - catch (IException *e) - { - EXCLOG(e,"CSocketEpollThread::delSelItem()"); - e->Release(); - } - } - - void delSelItemPos(SelectItem *si, unsigned pos) - { - unsigned last = items.ordinality(); - delSelItem(si); - last--; - if (pos < last) - items.swap(pos, last); - items.remove(last); - } + // void delSelItem(SelectItem *si) + // { + // epoll_op(epfd, EPOLL_CTL_DEL, si, 0); + // // Release/dtors should not throw but leaving try/catch here until all paths checked + // try + // { + // si->nfy->Release(); + // si->sock->Release(); + // delete si; + // } + // catch (IException *e) + // { + // EXCLOG(e,"CSocketEpollThread::delSelItem()"); + // e->Release(); + // } + // } + + // void delSelItemPos(SelectItem *si, unsigned pos) + // { + // unsigned last = items.ordinality(); + // delSelItem(si); + // last--; + // if (pos < last) + // items.swap(pos, last); + // items.remove(last); + // } public: CSocketEpollThread(const char *trc, unsigned _hdlPerThrd) @@ -5357,9 +5399,21 @@ class CSocketEpollThread: public CSocketBaseThread closedummy(); ForEachItemIn(i, items) { - SelectItem *si = items.element(i); - delSelItem(si); + // Release/dtors should not throw but leaving try/catch here until all paths checked + try + { + SelectItem *si = items.element(i); + si->nfy->Release(); + si->sock->Release(); + delete si; + } + catch (IException *e) + { + EXCLOG(e,"~CSocketSelectThread"); + e->Release(); + } } + if (epfd >= 0) { # ifdef EPOLLTRACE @@ -5373,52 +5427,52 @@ class CSocketEpollThread: public CSocketBaseThread Owned termexcept; + // JCSMORE: this is very similar to the version in CSocketSelectThread. Could be commoned if items used common structure. bool checkSocks() { bool ret = false; - // must be holding CriticalBlock (sect) - unsigned n = items.ordinality(); - for (unsigned i=0;ihandle)) + SelectItem &si = *items.element(i); + if (si.del) + ret = true; // maybe that bad one + else if (!sockOk(si.handle)) { - delSelItemPos(si, i); - n--; + si.del = true; ret = true; } - else - i++; } return ret; } - bool removeSock(ISocket *sock) - { - // must be holding CriticalBlock (sect) - unsigned n = items.ordinality(); - for (unsigned i=0;isock==sock) - { - delSelItemPos(si, i); - n--; - return true; - } - else - i++; - } - return false; - } + // bool removeSock(ISocket *sock) + // { + // // must be holding CriticalBlock (sect) + // unsigned n = items.ordinality(); + // for (unsigned i=0;isock==sock) + // { + // delSelItemPos(si, i); + // n--; + // return true; + // } + // else + // i++; + // } + // return false; + // } bool remove(ISocket *sock) { - CriticalBlock block(sect); if (terminating) - return false; - if (sock==NULL) - { // wait until no changes outstanding + return true; // pretend is, to short-circuit caller + CriticalBlock block(sect); + + // JCS - I can't see when sock can be null + if (sock==NULL) // wait until no changes outstanding + { while (selectvarschange) { waitingchange++; @@ -5427,33 +5481,59 @@ class CSocketEpollThread: public CSocketBaseThread } return true; } - if (removeSock(sock)) + + ForEachItemIn(i, items) { - selectvarschange = true; - // NB: could set terminating here if no more hdls on - // this thread and at least one other thread is present - triggerselect(); - return true; + SelectItem *si = items.element(i); + if (!si->del && (si->sock==sock)) + { + // epoll_wait (in the CSocketEpollThread thread) may have pending events on socket this is about to remove + // so we must not remove the SelectItem here, just mark it as deleted (deletion performed in updateEpollVars) + epoll_op(epfd, EPOLL_CTL_DEL, si, 0); + si->del = true; + selectvarschange = true; + triggerselect(); + // JCSMORE see comment in CSocketEpollThread::run re. need for CS. + // I think the array could be manipulated here, as long as the SelectItem isn't deleted. + // It may also be more efficient if it used a double linked list to remove items. + return true; + } } return false; } - unsigned add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy) + unsigned add(ISocket *sock, unsigned mode, ISocketSelectNotify *nfy) { if ( !sock || !nfy || !(mode & (SELECTMODE_READ|SELECTMODE_WRITE|SELECTMODE_EXCEPT)) ) { IWARNLOG("EPOLL: adding fd but sock or nfy is NULL or mode is empty"); dbgassertex(false); - return 0; + return SOCK_FAILED; } CriticalBlock block(sect); if (terminating) - return 0; + return SOCK_FAILED; + + // JCSMORE may be more efficient to use a double linked list to avoid this scan (and array/expansion) unsigned rm = 0; - if (removeSock(sock)) - rm = SOCK_REMOVED; - unsigned n = items.ordinality(); + unsigned n=0; + ForEachItemIn(i, items) + { + SelectItem &si = *items.element(i); + if (!si.del) + { + if (si.sock==sock) + { + si.del = true; + rm = SOCK_REMOVED; + break; + } + else + n++; + } + } + // new handler thread if (n >= hdlPerThrd) return (0|rm); @@ -5471,6 +5551,7 @@ class CSocketEpollThread: public CSocketBaseThread ep_mode |= EPOLLOUT; if (mode & SELECTMODE_EXCEPT) ep_mode |= EPOLLPRI; + // JCSMORE - we are in a CS, but it is thread-safe to epoll_wait concurrently with epoll_op epoll_op(epfd, EPOLL_CTL_ADD, sn, ep_mode); selectvarschange = true; triggerselect(); @@ -5502,6 +5583,7 @@ class CSocketEpollThread: public CSocketBaseThread #ifndef _USE_PIPE_FOR_SELECT_TRIGGER opendummy(); #endif + updateItems(); ni = items.ordinality(); validateselecterror = 0; } @@ -5571,6 +5653,11 @@ class CSocketEpollThread: public CSocketBaseThread totnum++; SelectItemArray tonotify; { + // JCSMORE - I don't think this CS is needed. + // It is in the CSocketSelectThread version because the items array needs protecting) + // But in the epoll version, we reference the SelectItem pointers only, the items array isn't referenced, + // and epoll_op's are thread-safe. But we must ensure the SelectItem's are only deleted by this thread, + // since even if in a mutex, pending events may referene them. CriticalBlock block(sect); // retrieve events, without waiting, while holding CS ... @@ -5580,20 +5667,26 @@ class CSocketEpollThread: public CSocketBaseThread { int tfd = -1; SelectItem *epsi = (SelectItem *)epevents[j].data.ptr; - if (epsi) + if (epsi && !epsi->del) // NB: could receive an event for a pending deleted item tfd = epsi->handle; # ifdef EPOLLTRACE DBGLOG("EPOLL: j = %d, fd = %d, emask = %u", j, tfd, epevents[j].events); # endif if (tfd >= 0) { -# ifdef _USE_PIPE_FOR_SELECT_TRIGGER - if ( (dummysockopen) && (tfd == dummysock[0]) ) + if (dummysockopen) { - resettrigger(); - continue; - } +# ifdef _USE_PIPE_FOR_SELECT_TRIGGER + if (tfd == dummysock[0]) + { + resettrigger(); + continue; + } +#else + // would still need to resettrigger and ignore dummysock (but subtly different code) + throwUnimplemented(); # endif + } unsigned int ep_mode = 0; if (epevents[j].events & (EPOLLINX | EPOLLHUP | EPOLLERR)) ep_mode |= SELECTMODE_READ; @@ -5626,6 +5719,7 @@ class CSocketEpollThread: public CSocketBaseThread const SelectItem &si = tonotify.item(j); try { + // is it worth checking si.del again? we can't guarantee that this is a socket that has been marked deleted. si.nfy->notifySelected(si.sock,si.mode); // ignore return } catch (IException *e) @@ -5736,7 +5830,9 @@ class CSocketEpollHandler: implements ISocketSelectHandler, public CInterface if (!(addrm & SOCK_ADDED)) { addrm |= threads.item(i).add(sock,mode,nfy); - if (addrm & (SOCK_ADDED | SOCK_REMOVED)) + // if both added and removed, we're done + if (((addrm & (SOCK_ADDED | SOCK_REMOVED)) == (SOCK_ADDED | SOCK_REMOVED)) || + (addrm & SOCK_FAILED)) return; } else if (!(addrm & SOCK_REMOVED))