From 0ecb3e498ea59fe662b46bb63c5cfd46d329786a Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Thu, 7 Nov 2024 20:54:39 +0000 Subject: [PATCH] HPCC-32960 Fix epoll memory corruption issues Although the SelectItem 'items' array is protected by a CS, epoll_wait can receive pending events for SelectItems that have been deleted, which were then link counted and caused corruption. Like the select handler version, avoid deleting the SelectItem's when items are removed, instead mark them deleted and let the triggerselect() approach remove them on the main thread. Also fix a theoretical issue when adding a existing socket to CSocketEpollThread, it did not check other CSocketEpollThread's to see if it needed to be removed. (given default hdlPerThrd is UINT_MAX, it will not have happened). Signed-off-by: Jake Smith --- system/jlib/jsocket.cpp | 266 +++++++++++++++++++++++++++------------- 1 file changed, 181 insertions(+), 85 deletions(-) diff --git a/system/jlib/jsocket.cpp b/system/jlib/jsocket.cpp index 5dfd4acde52..7e341db5c2d 100644 --- a/system/jlib/jsocket.cpp +++ b/system/jlib/jsocket.cpp @@ -4327,7 +4327,7 @@ struct SelectItem T_SOCKET handle; ISocketSelectNotify *nfy; byte mode; - bool del; // only used in select handler method + bool del; bool operator == (const SelectItem & other) const { return sock == other.sock; } }; @@ -4406,13 +4406,12 @@ class CSocketBaseThread: public Thread Semaphore ticksem; std::atomic_uint tickwait; unsigned offset; - bool selectvarschange; + std::atomic selectvarschange; unsigned waitingchange; Semaphore waitingchangesem; int validateselecterror; unsigned validateerrcount; const char *selecttrace; - unsigned basesize; #ifdef _USE_PIPE_FOR_SELECT_TRIGGER T_SOCKET dummysock[2]; #else @@ -4547,6 +4546,7 @@ class CSocketBaseThread: public Thread class CSocketSelectThread: public CSocketBaseThread { SelectItemArray items; + unsigned basesize; void opendummy() { @@ -4709,9 +4709,10 @@ class CSocketSelectThread: public CSocketBaseThread Owned termexcept; + // JCSMORE: this is very similar to the version in CSocketEpollThread. Could be commoned if items used common structure. void updateItems() { - // must be in CriticalBlock block(sect); + // must be in CriticalBlock block(sect); unsigned n = items.ordinality(); #ifdef _WIN32 bool hashupdateneeded = (n!=basesize); // additions all come at end @@ -4751,6 +4752,7 @@ class CSocketSelectThread: public CSocketBaseThread basesize = n; } + // JCSMORE: this is very similar to the version in CSocketEpollThread. Could be commoned if items used common structure. bool checkSocks() { bool ret = false; @@ -4798,9 +4800,12 @@ class CSocketSelectThread: public CSocketBaseThread unsigned n=0; ForEachItemIn(i,items) { SelectItem &si = items.element(i); - if (!si.del) { - if (si.sock==sock) { + if (!si.del) + { + if (si.sock==sock) + { si.del = true; + break; } else n++; @@ -5182,6 +5187,7 @@ class CSocketSelectHandler: implements ISocketSelectHandler, public CInterface #ifdef _HAS_EPOLL_SUPPORT # define SOCK_ADDED 0x1 # define SOCK_REMOVED 0x2 +# define SOCK_FAILED 0x4 class CSocketEpollThread: public CSocketBaseThread { int epfd; @@ -5213,15 +5219,51 @@ class CSocketEpollThread: public CSocketBaseThread } } + // JCSMORE: this is very similar to the version in CSocketSelectThread. Could be commoned if items used common structure. + void updateItems() + { + // must be in CriticalBlock block(sect); + unsigned n = items.ordinality(); + for (unsigned i=0;idel) + { + // Release/dtors should not throw but leaving try/catch here until all paths checked + try + { +#ifdef SOCKTRACE + PROGLOG("CSocketSelectThread::updateItems release %d",si.handle); +#endif + si->nfy->Release(); + si->sock->Release(); + delete si; + } + catch (IException *e) + { + EXCLOG(e,"CSocketSelectThread::updateItems"); + e->Release(); + } + n--; + if (i < n) + items.swap(i, n); + items.remove(n); + } + else + i++; + } + } + void opendummy() { CriticalBlock block(sect); if (!dummysockopen) { + // NB: not notified when triggered, because explicitly spotted and handled before tonotify loop sidummy = new SelectItem; sidummy->sock = nullptr; sidummy->nfy = nullptr; - sidummy->del = true; // so its not added to tonotify ... + sidummy->del = false; sidummy->mode = 0; #ifdef _USE_PIPE_FOR_SELECT_TRIGGER if(pipe(dummysock)) @@ -5281,32 +5323,32 @@ class CSocketEpollThread: public CSocketBaseThread } } - void delSelItem(SelectItem *si) - { - epoll_op(epfd, EPOLL_CTL_DEL, si, 0); - // Release/dtors should not throw but leaving try/catch here until all paths checked - try - { - si->nfy->Release(); - si->sock->Release(); - delete si; - } - catch (IException *e) - { - EXCLOG(e,"CSocketEpollThread::delSelItem()"); - e->Release(); - } - } - - void delSelItemPos(SelectItem *si, unsigned pos) - { - unsigned last = items.ordinality(); - delSelItem(si); - last--; - if (pos < last) - items.swap(pos, last); - items.remove(last); - } + // void delSelItem(SelectItem *si) + // { + // epoll_op(epfd, EPOLL_CTL_DEL, si, 0); + // // Release/dtors should not throw but leaving try/catch here until all paths checked + // try + // { + // si->nfy->Release(); + // si->sock->Release(); + // delete si; + // } + // catch (IException *e) + // { + // EXCLOG(e,"CSocketEpollThread::delSelItem()"); + // e->Release(); + // } + // } + + // void delSelItemPos(SelectItem *si, unsigned pos) + // { + // unsigned last = items.ordinality(); + // delSelItem(si); + // last--; + // if (pos < last) + // items.swap(pos, last); + // items.remove(last); + // } public: CSocketEpollThread(const char *trc, unsigned _hdlPerThrd) @@ -5357,9 +5399,21 @@ class CSocketEpollThread: public CSocketBaseThread closedummy(); ForEachItemIn(i, items) { - SelectItem *si = items.element(i); - delSelItem(si); + // Release/dtors should not throw but leaving try/catch here until all paths checked + try + { + SelectItem *si = items.element(i); + si->nfy->Release(); + si->sock->Release(); + delete si; + } + catch (IException *e) + { + EXCLOG(e,"~CSocketSelectThread"); + e->Release(); + } } + if (epfd >= 0) { # ifdef EPOLLTRACE @@ -5373,52 +5427,52 @@ class CSocketEpollThread: public CSocketBaseThread Owned termexcept; + // JCSMORE: this is very similar to the version in CSocketSelectThread. Could be commoned if items used common structure. bool checkSocks() { bool ret = false; - // must be holding CriticalBlock (sect) - unsigned n = items.ordinality(); - for (unsigned i=0;ihandle)) + SelectItem &si = *items.element(i); + if (si.del) + ret = true; // maybe that bad one + else if (!sockOk(si.handle)) { - delSelItemPos(si, i); - n--; + si.del = true; ret = true; } - else - i++; } return ret; } - bool removeSock(ISocket *sock) - { - // must be holding CriticalBlock (sect) - unsigned n = items.ordinality(); - for (unsigned i=0;isock==sock) - { - delSelItemPos(si, i); - n--; - return true; - } - else - i++; - } - return false; - } + // bool removeSock(ISocket *sock) + // { + // // must be holding CriticalBlock (sect) + // unsigned n = items.ordinality(); + // for (unsigned i=0;isock==sock) + // { + // delSelItemPos(si, i); + // n--; + // return true; + // } + // else + // i++; + // } + // return false; + // } bool remove(ISocket *sock) { - CriticalBlock block(sect); if (terminating) - return false; - if (sock==NULL) - { // wait until no changes outstanding + return true; // pretend is, to short-circuit caller + CriticalBlock block(sect); + + // JCS - I can't see when sock can be null + if (sock==NULL) // wait until no changes outstanding + { while (selectvarschange) { waitingchange++; @@ -5427,33 +5481,59 @@ class CSocketEpollThread: public CSocketBaseThread } return true; } - if (removeSock(sock)) + + ForEachItemIn(i, items) { - selectvarschange = true; - // NB: could set terminating here if no more hdls on - // this thread and at least one other thread is present - triggerselect(); - return true; + SelectItem *si = items.element(i); + if (!si->del && (si->sock==sock)) + { + // epoll_wait (in the CSocketEpollThread thread) may have pending events on socket this is about to remove + // so we must not remove the SelectItem here, just mark it as deleted (deletion performed in updateEpollVars) + epoll_op(epfd, EPOLL_CTL_DEL, si, 0); + si->del = true; + selectvarschange = true; + triggerselect(); + // JCSMORE see comment in CSocketEpollThread::run re. need for CS. + // I think the array could be manipulated here, as long as the SelectItem isn't deleted. + // It may also be more efficient if it used a double linked list to remove items. + return true; + } } return false; } - unsigned add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy) + unsigned add(ISocket *sock, unsigned mode, ISocketSelectNotify *nfy) { if ( !sock || !nfy || !(mode & (SELECTMODE_READ|SELECTMODE_WRITE|SELECTMODE_EXCEPT)) ) { IWARNLOG("EPOLL: adding fd but sock or nfy is NULL or mode is empty"); dbgassertex(false); - return 0; + return SOCK_FAILED; } CriticalBlock block(sect); if (terminating) - return 0; + return SOCK_FAILED; + + // JCSMORE may be more efficient to use a double linked list to avoid this scan (and array/expansion) unsigned rm = 0; - if (removeSock(sock)) - rm = SOCK_REMOVED; - unsigned n = items.ordinality(); + unsigned n=0; + ForEachItemIn(i, items) + { + SelectItem &si = *items.element(i); + if (!si.del) + { + if (si.sock==sock) + { + si.del = true; + rm = SOCK_REMOVED; + break; + } + else + n++; + } + } + // new handler thread if (n >= hdlPerThrd) return (0|rm); @@ -5471,6 +5551,7 @@ class CSocketEpollThread: public CSocketBaseThread ep_mode |= EPOLLOUT; if (mode & SELECTMODE_EXCEPT) ep_mode |= EPOLLPRI; + // JCSMORE - we are in a CS, but it is thread-safe to epoll_wait concurrently with epoll_op epoll_op(epfd, EPOLL_CTL_ADD, sn, ep_mode); selectvarschange = true; triggerselect(); @@ -5502,6 +5583,7 @@ class CSocketEpollThread: public CSocketBaseThread #ifndef _USE_PIPE_FOR_SELECT_TRIGGER opendummy(); #endif + updateItems(); ni = items.ordinality(); validateselecterror = 0; } @@ -5571,6 +5653,11 @@ class CSocketEpollThread: public CSocketBaseThread totnum++; SelectItemArray tonotify; { + // JCSMORE - I don't think this CS is needed. + // It is in the CSocketSelectThread version because the items array needs protecting) + // But in the epoll version, we reference the SelectItem pointers only, the items array isn't referenced, + // and epoll_op's are thread-safe. But we must ensure the SelectItem's are only deleted by this thread, + // since even if in a mutex, pending events may referene them. CriticalBlock block(sect); // retrieve events, without waiting, while holding CS ... @@ -5580,20 +5667,26 @@ class CSocketEpollThread: public CSocketBaseThread { int tfd = -1; SelectItem *epsi = (SelectItem *)epevents[j].data.ptr; - if (epsi) + if (epsi && !epsi->del) // NB: could receive an event for a pending deleted item tfd = epsi->handle; # ifdef EPOLLTRACE DBGLOG("EPOLL: j = %d, fd = %d, emask = %u", j, tfd, epevents[j].events); # endif if (tfd >= 0) { -# ifdef _USE_PIPE_FOR_SELECT_TRIGGER - if ( (dummysockopen) && (tfd == dummysock[0]) ) + if (dummysockopen) { - resettrigger(); - continue; - } +# ifdef _USE_PIPE_FOR_SELECT_TRIGGER + if (tfd == dummysock[0]) + { + resettrigger(); + continue; + } +#else + // would still need to resettrigger and ignore dummysock (but subtly different code) + throwUnimplemented(); # endif + } unsigned int ep_mode = 0; if (epevents[j].events & (EPOLLINX | EPOLLHUP | EPOLLERR)) ep_mode |= SELECTMODE_READ; @@ -5626,6 +5719,7 @@ class CSocketEpollThread: public CSocketBaseThread const SelectItem &si = tonotify.item(j); try { + // is it worth checking si.del again? we can't guarantee that this is a socket that has been marked deleted. si.nfy->notifySelected(si.sock,si.mode); // ignore return } catch (IException *e) @@ -5736,7 +5830,9 @@ class CSocketEpollHandler: implements ISocketSelectHandler, public CInterface if (!(addrm & SOCK_ADDED)) { addrm |= threads.item(i).add(sock,mode,nfy); - if (addrm & (SOCK_ADDED | SOCK_REMOVED)) + // if both added and removed, we're done + if (((addrm & (SOCK_ADDED | SOCK_REMOVED)) == (SOCK_ADDED | SOCK_REMOVED)) || + (addrm & SOCK_FAILED)) return; } else if (!(addrm & SOCK_REMOVED))