Skip to content

Commit

Permalink
HPCC-32822 Fix MP protocol error logged indefinitely in loop
Browse files Browse the repository at this point in the history
The detection of a MP packet with an invalid header threw
an exception that was logged, but did not close the socket.
When the client closed the socket, the MP server was notified
but because it had nothing left to read of the [bad] header,
it saw the invalid protocol error again and rethrew the
same error.
The epoll handler loop continuously notified the handler
of the close event (because it did nothing with it), and
the protocol error was continuosly output.

The socket should be close on this and any other exception.

In case these events are too frequent, add a timer to log
the protocol errors less frequently, and log the header
bytes received to help diagnose what the source of these
'rogue' connections are.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
  • Loading branch information
jakesmith committed Oct 18, 2024
1 parent 09d82ed commit 81b3e2e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion system/jlib/jdebug.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class PeriodicTimer

protected:
cycle_t timePeriodCycles = 0;
cycle_t lastElapsedCycles = 0;
std::atomic<cycle_t> lastElapsedCycles{0};
};


Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jmisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ void throwExceptionIfAborting()

StringBuffer & hexdump2string(byte const * in, size32_t inSize, StringBuffer & out)
{
out.append("[");
out.appendf("%u bytes [", inSize);
byte last = 0;
unsigned seq = 1;
for(unsigned i=0; i<inSize; ++i)
Expand Down
40 changes: 28 additions & 12 deletions system/mp/mpcomm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,11 @@ struct MultiPacketHeader

class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
{
StringAttr msg;
public:
IMPLEMENT_IINTERFACE;

CMPException(MessagePassingError err,const SocketEndpoint &ep) : error(err), endpoint(ep)
CMPException(MessagePassingError err,const SocketEndpoint &ep, const char *_msg = nullptr) : error(err), endpoint(ep), msg(_msg)
{
}

Expand All @@ -240,6 +241,8 @@ class DECL_EXCEPTION CMPException: public IMP_Exception, public CInterface
// change it from "MP link closed" to something more helpful
case MPERR_link_closed: str.appendf("Unexpected process termination (ep:%s)",endpoint.getEndpointHostText(tmp).str()); break;
}
if (msg.length())
str.append(" - ").append(msg);
return str;
}
int errorCode() const { return error; }
Expand Down Expand Up @@ -1813,6 +1816,8 @@ class ForwardPacketHandler // TAG_SYS_FORWARD
};


static PeriodicTimer periodicTimer(10*60*1000, false); // 10 minutes
static std::atomic<__uint64> mpProtocolErrors{0};
// --------------------------------------------------------

class CMPPacketReader: public ISocketSelectNotify, public CInterface
Expand Down Expand Up @@ -1847,7 +1852,8 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
{
if (!parent)
return false;
bool gc = false; // if a gc is hit, then will fall through to close socket
bool closeSocket = false; // if a graceful close is hit, this will be set and will fall through to close socket
bool suppressException = false;
try
{
while (true) // NB: breaks out if blocked (if (remaining) ..)
Expand All @@ -1872,7 +1878,7 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
if (!gotPacketHdr)
{
CCycleTimer timer;
gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, timer.remainingMs(60000));
closeSocket = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, timer.remainingMs(60000));
remaining -= szRead;
activeptr += szRead;
if (remaining) // only possible if blocked.
Expand All @@ -1882,10 +1888,20 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100)
{
// TBD IPV6 here
mpProtocolErrors++;
SocketEndpoint ep;
sock->getPeerEndpoint(ep);
IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep);
throw e;
if (periodicTimer.hasElapsed())
{
VStringBuffer packetHdrBytes("[%" I64F "u incidents to date]. Packet Header: ", mpProtocolErrors.load());
hexdump2string((byte const *)&hdr, sizeof(hdr), packetHdrBytes);
throw new CMPException(MPERR_protocol_version_mismatch, ep, packetHdrBytes.str());
}
else
{
suppressException = true;
throw new CMPException(MPERR_protocol_version_mismatch, ep);
}
}
hdr.setMessageFields(*activemsg);
#ifdef _FULLTRACE
Expand All @@ -1898,9 +1914,9 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
gotPacketHdr = true;
}

if (!gc && remaining)
if (!closeSocket && remaining)
{
gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, WAIT_FOREVER);
closeSocket = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, WAIT_FOREVER);
remaining -= szRead;
activeptr += szRead;
}
Expand Down Expand Up @@ -1939,19 +1955,19 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface
}
}
while (activemsg);
if (gc)
if (closeSocket)
break;
}
}
catch (IException *e)
{
if (e->errorCode()!=JSOCKERR_graceful_close)
FLLOG(MCoperatorWarning, e,"MP(Packet Reader)");
if (!suppressException && e->errorCode()!=JSOCKERR_graceful_close)
FLLOG(MCoperatorWarning, e, "MP(Packet Reader)");
e->Release();
gotPacketHdr = false;
closeSocket = true; // NB: this select handler will removed and not be notified again
}

if (gc)
if (closeSocket)
{
// here due to error or graceful close, so close socket (ignore error as may be closed already)
try
Expand Down

0 comments on commit 81b3e2e

Please sign in to comment.