Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30629 Clean up Roxie logs #17960

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 0 additions & 96 deletions roxie/udplib/udpmsgpk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ class PackageSequencer : public CInterface, implements IInterface

PackageSequencer(bool _encrypted) : encrypted(_encrypted)
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: PackageSequencer::PackageSequencer this=%p", this);
metaSize = 0;
headerSize = 0;
header = NULL;
Expand All @@ -93,8 +91,6 @@ class PackageSequencer : public CInterface, implements IInterface

~PackageSequencer()
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: PackageSequencer::~PackageSequencer this=%p", this);
DataBuffer *finger = firstPacket;
while (finger)
{
Expand All @@ -112,18 +108,6 @@ class PackageSequencer : public CInterface, implements IInterface
ret = after->msgNext;
else
ret = firstPacket;
if (checkTraceLevel(TRACE_MSGPACK, 5))
{
if (ret)
{
StringBuffer s;
UdpPacketHeader *pktHdr = (UdpPacketHeader*) ret->data;
DBGLOG("UdpCollator: PackageSequencer::next returns ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s dataBuffer=%p this=%p",
pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str(), ret, this);
}
else
DBGLOG("UdpCollator: PackageSequencer::next returns NULL this=%p", this);
}
return ret;
}

Expand All @@ -142,13 +126,6 @@ class PackageSequencer : public CInterface, implements IInterface
resends++;
}

if (checkTraceLevel(TRACE_MSGPACK, 5))
{
StringBuffer s;
DBGLOG("UdpCollator: PackageSequencer::insert ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X sendSeq=%" SEQF "u node=%s dataBuffer=%p this=%p",
pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->sendSeq, pktHdr->node.getTraceText(s).str(), dataBuff, this);
}

// Optimize the (very) common case where I need to add to the end
DataBuffer *finger;
DataBuffer *prev;
Expand All @@ -168,8 +145,6 @@ class PackageSequencer : public CInterface, implements IInterface
if (pktHdr->pktSeq <= oldHdr->pktSeq)
{
// discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
if (checkTraceLevel(TRACE_MSGPACK, 5))
DBGLOG("UdpCollator: Discarding duplicate incoming packet %u (we have all up to %u)", pktHdr->pktSeq, oldHdr->pktSeq);
dataBuff->Release();
duplicates++;
return false;
Expand All @@ -184,28 +159,10 @@ class PackageSequencer : public CInterface, implements IInterface
}
while (finger)
{
#ifdef _DEBUG
scans++;
if (scans==1000000)
{
overscans++;
DBGLOG("%u million scans in UdpCollator insert(DataBuffer *dataBuff", overscans);
if (lastContiguousPacket)
{
UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
DBGLOG("lastContiguousPacket is at %u , last packet seen is %u", oldHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK, pktHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK);
}
else
DBGLOG("lastContiguousPacket is NULL , last packet seen is %u", pktHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK);
scans = 0;
}
#endif
UdpPacketHeader *oldHdr = (UdpPacketHeader*) finger->data;
if (pktHdr->pktSeq == oldHdr->pktSeq)
{
// discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
if (checkTraceLevel(TRACE_MSGPACK, 5))
DBGLOG("UdpCollator: Discarding duplicate incoming packet %u", pktHdr->pktSeq);
dataBuff->Release();
return false;
}
Expand Down Expand Up @@ -243,8 +200,6 @@ class PackageSequencer : public CInterface, implements IInterface
// might need to rethink this
const MemoryAttr &udpkey = getSecretUdpKey(true);
size_t decryptedSize = aesDecrypt(udpkey.get(), udpkey.length(), pktHdr+1, pktHdr->length-sizeof(UdpPacketHeader), pktHdr+1, DATA_PAYLOAD-sizeof(UdpPacketHeader));
if (checkTraceLevel(TRACE_MSGPACK, 5))
DBGLOG("Decrypted %u bytes at %p resulting in %u bytes", (unsigned) (pktHdr->length-sizeof(UdpPacketHeader)), pktHdr+1, (unsigned) decryptedSize);
pktHdr->length = decryptedSize + sizeof(UdpPacketHeader);
}

Expand Down Expand Up @@ -318,16 +273,6 @@ class PackageSequencer : public CInterface, implements IInterface
return headerSize;
}

void dump()
{
DBGLOG("Contains %u packets, lastSeq = %u", numPackets, maxSeqSeen);
if (lastContiguousPacket)
{
UdpPacketHeader *hdr = (UdpPacketHeader*) lastContiguousPacket->data;
DBGLOG("lastContiguousPacket is %u %" SEQF "u", hdr->pktSeq, hdr->sendSeq);
}
}

};

// MessageResult ====================================================================================
Expand All @@ -344,8 +289,6 @@ class CMessageUnpackCursor: implements IMessageUnpackCursor, public CInterface

CMessageUnpackCursor(PackageSequencer *_pkSqncr, IRowManager *_rowMgr) : rowMgr(_rowMgr)
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageUnpackCursor::CMessageUnpackCursor this=%p", this);
pkSequencer = _pkSqncr;
dataBuff = pkSequencer->next(NULL);
UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
Expand All @@ -367,8 +310,6 @@ class CMessageUnpackCursor: implements IMessageUnpackCursor, public CInterface

~CMessageUnpackCursor()
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageUnpackCursor::~CMessageUnpackCursor this=%p", this);
pkSequencer->Release();
}

Expand All @@ -389,14 +330,6 @@ class CMessageUnpackCursor: implements IMessageUnpackCursor, public CInterface
if (dataBuff)
{
UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
if (checkTraceLevel(TRACE_MSGPACK, 4))
{
StringBuffer s;
DBGLOG("UdpCollator: CMessageUnpackCursor::getNext(%u) pos=%u pktLength=%u metaLen=%u ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s dataBuff=%p this=%p",
length, current_pos, pktHdr->length, pktHdr->metalength,
pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq,
pktHdr->node.getTraceText(s).str(), dataBuff, this);
}
unsigned packetDataLimit = pktHdr->length - pktHdr->metalength;
if ((packetDataLimit - current_pos) >= (unsigned) length)
{
Expand Down Expand Up @@ -493,15 +426,11 @@ class CMessageResult : public IMessageResult, CInterface {

CMessageResult(PackageSequencer *_pkSqncr)
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageResult::CMessageResult pkSqncr=%p, this=%p", _pkSqncr, this);
pkSequencer = _pkSqncr;
}

~CMessageResult()
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageResult::~CMessageResult this=%p", this);
pkSequencer->Release();
}

Expand All @@ -523,8 +452,6 @@ class CMessageResult : public IMessageResult, CInterface {

virtual void discard() const
{
if (checkTraceLevel(TRACE_MSGPACK, 2))
DBGLOG("UdpCollator: CMessageResult - Roxie server discarded a packet");
unwantedDiscarded++;
}

Expand All @@ -545,17 +472,13 @@ PUID GETPUID(DataBuffer *dataBuff)

CMessageCollator::CMessageCollator(IRowManager *_rowMgr, unsigned _ruid, bool _encrypted) : rowMgr(_rowMgr), ruid(_ruid), encrypted(_encrypted)
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageCollator::CMessageCollator rowMgr=%p this=%p ruid=" RUIDF "", _rowMgr, this, ruid);
memLimitExceeded = false;
activity = false;
totalBytesReceived = 0;
}

CMessageCollator::~CMessageCollator()
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageCollator::~CMessageCollator ruid=" RUIDF ", this=%p", ruid, this);
while (!queue.empty())
{
PackageSequencer *pkSqncr = queue.front();
Expand Down Expand Up @@ -671,9 +594,6 @@ void CMessageCollator::collate(DataBuffer *dataBuff)

IMessageResult *CMessageCollator::getNextResult(unsigned time_out, bool &anyActivity)
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageCollator::getNextResult() timeout=%.8X ruid=%u rowMgr=%p this=%p", time_out, ruid, (void*) rowMgr, this);

if (memLimitExceeded)
{
DBGLOG("UdpCollator: CMessageCollator::getNextResult() throwing memory limit exceeded exception - rowMgr=%p this=%p", (void*) rowMgr, this);
Expand All @@ -696,22 +616,6 @@ IMessageResult *CMessageCollator::getNextResult(unsigned time_out, bool &anyActi
}
anyActivity = activity;
activity = false;
if (!anyActivity && ruid>=RUID_FIRST && checkTraceLevel(TRACE_MSGPACK, 2)) // suppress the tracing for pings where we expect the timeout...
{
#ifdef _DEBUG
DBGLOG("GetNextResult timeout: mapping has %d partial results", mapping.ordinality());
HashIterator h(mapping);
ForEach(h)
{
auto *r = mapping.mapToValue(&h.query());
PUID puid = *(PUID *) h.query().getKey();
DBGLOG("puid=%" I64F "x:", puid);
PackageSequencer *pkSqncr = mapping.getValue(puid);
pkSqncr->dump();
}
#endif
DBGLOG("UdpCollator: CMessageCollator::GetNextResult timeout");
}
return 0;
}

Expand Down
19 changes: 0 additions & 19 deletions roxie/udplib/udpsha.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,6 @@ void queue_t::doEnqueue(DataBuffer *buf)
}
tail = buf;
count.fastAdd(1); // inside a critical section, so no need for atomic inc.
#ifdef _DEBUG
if (count > limit)
DBGLOG("queue_t::pushOwn set count to %u", count.load());
#endif
}

void queue_t::pushOwnWait(DataBuffer * buf)
Expand Down Expand Up @@ -376,11 +372,6 @@ bool PacketTracker::noteSeen(UdpPacketHeader &hdr)
// Be careful to think about wrapping. Less than and higher can't really be distinguished, but we treat resent differently from original
bool duplicate = false;
unsigned delta = seq - base;
if (udpTraceLevel > 5)
{
DBGLOG("PacketTracker::noteSeen %" SEQF "u: delta %d", hdr.sendSeq, delta);
dump();
}
if (delta < TRACKER_BITS)
{
unsigned idx = (seq / 64) % TRACKER_DWORDS;
Expand Down Expand Up @@ -447,11 +438,6 @@ bool PacketTracker::hasSeen(sequence_t seq) const
// Accessed only on sender side where these are not modified, so no need for locking
// Careful about wrapping!
unsigned delta = seq - base;
if (udpTraceLevel > 5)
{
DBGLOG("PacketTracker::hasSeen - have I seen %" SEQF "u, %d", seq, delta);
dump();
}
if (delta < TRACKER_BITS)
{
unsigned idx = (seq / 64) % TRACKER_DWORDS;
Expand All @@ -468,11 +454,6 @@ bool PacketTracker::canRecord(sequence_t seq) const
{
// Careful about wrapping!
unsigned delta = seq - base;
if (udpTraceLevel > 5)
{
DBGLOG("PacketTracker::hasSeen - can I record %" SEQF "u, %d", seq, delta);
dump();
}
return (delta < TRACKER_BITS);
}

Expand Down
8 changes: 0 additions & 8 deletions roxie/udplib/udpsha.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,6 @@ int check_max_socket_write_buffer(int size);
int check_set_max_socket_read_buffer(int size);
int check_set_max_socket_write_buffer(int size);

#define TRACE_RETRY_DATA 0x08
#define TRACE_MSGPACK 0x10

inline bool checkTraceLevel(unsigned category, unsigned level)
{
return (udpTraceLevel >= level);
}

extern UDPLIB_API void sanityCheckUdpSettings(unsigned receiveQueueSize, unsigned sendQueueSize, unsigned numSenders, __uint64 networkSpeedBitsPerSecond);


Expand Down
29 changes: 6 additions & 23 deletions roxie/udplib/udptrr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
msg.seen = packetsSeen.copy();
}

if (udpTraceLevel > 3 || udpTraceFlow)
if (udpTraceFlow)
{
StringBuffer ipStr;
DBGLOG("UdpReceiver: sending request_received msg seq %" SEQF "u to node=%s", _flowSeq, dest.getHostText(ipStr).str());
Expand Down Expand Up @@ -707,7 +707,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
CriticalBlock b(psCrit);
msg.seen = packetsSeen.copy();
}
if (udpTraceLevel > 3 || udpTraceFlow)
if (udpTraceFlow)
{
StringBuffer ipStr;
DBGLOG("UdpReceiver: sending ok_to_send %u msg seq %" SEQF "u to node=%s", maxTransfer, flowSeq, dest.getHostText(ipStr).str());
Expand Down Expand Up @@ -1003,7 +1003,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
void doFlowRequest(const UdpRequestToSendMsg &msg)
{
flowRequestsReceived++;
if (udpTraceLevel > 5 || udpTraceFlow)
if (udpTraceFlow)
{
StringBuffer ipStr;
DBGLOG("UdpReceiver: received %s msg flowSeq %" SEQF "u sendSeq %" SEQF "u from node=%s", flowType::name(msg.cmd), msg.flowSeq, msg.sendSeq, msg.sourceNode.getTraceText(ipStr).str());
Expand Down Expand Up @@ -1054,7 +1054,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
if (elapsed >= udpPermitTimeout)
{
UdpSenderEntry * sender = finger->owner;
if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
if (udpTraceFlow || udpTraceTimeouts)
{
StringBuffer s;
DBGLOG("permit %" SEQF "u to node %s (%u packets) timed out after %u ms, rescheduling", sender->flowSeq, sender->dest.getHostText(s).str(), sender->getTotalReserved(), elapsed);
Expand All @@ -1066,7 +1066,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface

if (++sender->timeouts > udpMaxPermitDeadTimeouts && udpMaxPermitDeadTimeouts != 0)
{
if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
if (udpTraceFlow || udpTraceTimeouts)
{
StringBuffer s;
DBGLOG("permit to send %" SEQF "u to node %s timed out %u times - abandoning", sender->flowSeq, sender->dest.getHostText(s).str(), sender->timeouts);
Expand Down Expand Up @@ -1206,7 +1206,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
{
try
{
if (udpTraceLevel > 5 || udpTraceFlow)
if (udpTraceFlow)
{
DBGLOG("UdpReceiver: wait_read(%u)", timeout);
}
Expand Down Expand Up @@ -1353,11 +1353,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
if (sender->noteSeen(hdr))
{
if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
{
StringBuffer s;
DBGLOG("UdpReceiver: discarding unwanted resent packet %" SEQF "u %x from %s", hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
}
// We should perhaps track how often this happens, but it's not the same as unwantedDiscarded
hdr.node.clear(); // Used to indicate a duplicate that collate thread should discard. We don't discard on this thread as don't want to do anything that requires locks...
}
Expand Down Expand Up @@ -1509,8 +1504,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
virtual void detachCollator(const IMessageCollator *msgColl)
{
ruid_t ruid = msgColl->queryRUID();
if (udpTraceLevel >= 2)
DBGLOG("UdpReceiver: detach %p %u", msgColl, ruid);
{
CriticalBlock b(collatorsLock);
collators.erase(ruid);
Expand Down Expand Up @@ -1541,14 +1534,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
void collatePacket(DataBuffer *dataBuff)
{
const UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;

if (udpTraceLevel >= 4)
{
StringBuffer s;
DBGLOG("UdpReceiver: CPacketCollator - unQed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%d node=%s",
pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->length, pktHdr->node.getTraceText(s).str());
}

Linked <CMessageCollator> msgColl;
bool isDefault = false;
{
Expand Down Expand Up @@ -1591,8 +1576,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
virtual IMessageCollator *createMessageCollator(IRowManager *rowManager, ruid_t ruid)
{
CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid, encrypted);
if (udpTraceLevel > 2)
DBGLOG("UdpReceiver: createMessageCollator %p %u", msgColl, ruid);
{
CriticalBlock b(collatorsLock);
collators[ruid] = msgColl;
Expand Down
Loading