Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable support for MQTT stitcher in stirling #1918

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@ pl_cc_test(
":cc_library",
],
)

pl_cc_test(
name = "stitcher_test",
srcs = ["stitcher_test.cc"],
deps = [
":cc_library",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace protocols {

namespace mqtt {

// This is modeling a 4 bit field specifying the control packet type
enum class MqttControlPacketType : uint8_t {
CONNECT = 1,
CONNACK = 2,
Expand Down Expand Up @@ -745,6 +746,11 @@ size_t FindFrameBoundary<mqtt::Message>(message_type_t /*type*/, std::string_vie
return start_pos + buf.length();
}

template <>
mqtt::packet_id_t GetStreamID(mqtt::Message* message) {
return message->header_fields["packet_identifier"];
}

} // namespace protocols
} // namespace stirling
} // namespace px
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ template <>
size_t FindFrameBoundary<mqtt::Message>(message_type_t type, std::string_view buf, size_t start_pos,
NoState* state);

template <>
mqtt::packet_id_t GetStreamID(mqtt::Message* message);

} // namespace protocols
} // namespace stirling
} // namespace px
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Copyright 2018- The Pixie Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

#include "src/stirling/source_connectors/socket_tracer/protocols/mqtt/stitcher.h"

#include <algorithm>
#include <deque>
#include <map>
#include <set>
#include <string>
#include <tuple>
#include <utility>

#include "src/common/json/json.h"
ddelnano marked this conversation as resolved.
Show resolved Hide resolved
#include "src/stirling/source_connectors/socket_tracer/protocols/mqtt/types.h"

namespace px {
namespace stirling {
namespace protocols {
namespace mqtt {

typedef uint8_t MatchKey;
ddelnano marked this conversation as resolved.
Show resolved Hide resolved

constexpr MatchKey UnmatchedResp = 0xff;

std::map<MatchKey, MatchKey> MapRequestToResponse = {
// CONNECT to CONNACK
{0x10, 0x20},
// PUBLISH QOS 0 to Dummy response
{0x30, UnmatchedResp},
{0x31, UnmatchedResp},
{0x38, UnmatchedResp},
{0x39, UnmatchedResp},
// PUBLISH QOS 1 to PUBACK
{0x32, 0x40},
{0x33, 0x40},
{0x3a, 0x40},
{0x3b, 0x40},
// PUBLISH QOS 2 to PUBREC
{0x34, 0x50},
{0x35, 0x50},
{0x3c, 0x50},
{0x3d, 0x50},
// PUBREL to PUBCOMP
{0x60, 0x70},
// SUBSCRIBE to SUBACK
{0X80, 0X90},
// UNSUBSCRIBE to UNSUBACK
{0xa0, 0xb0},
// PINGREQ to PINGRESP
{0xc0, 0xd0},
// DISCONNECT to Dummy response
{0xe0, UnmatchedResp},
// AUTH to Dummy response
{0xf0, UnmatchedResp}};

std::set<std::tuple<uint32_t, uint32_t>> UnansweredPublish;
ddelnano marked this conversation as resolved.
Show resolved Hide resolved

// Possible to have the server sending PUBLISH with same packet identifier as client PUBLISH before
// it sends PUBACK, causing server PUBLISH to be put into response deque instead of request deque.
// TODO: Reverse logic to match requests that have erroneously been put into response deque

MatchKey getMatchKey(mqtt::Message& frame) {
return (frame.control_packet_type << 4) | static_cast<uint8_t>(frame.dup) << 3 |
(frame.header_fields["qos"] & 0x3) << 1 | static_cast<uint8_t>(frame.retain);
}

RecordsWithErrorCount<Record> StitchFrames(
absl::flat_hash_map<packet_id_t, std::deque<Message>>* req_frames,
absl::flat_hash_map<packet_id_t, std::deque<Message>>* resp_frames) {
std::vector<Record> entries;
int error_count = 0;

// iterate through all deques of requests associated with a specific streamID and find the
// matching response
for (auto& [packet_id, req_deque] : *req_frames) {
// goal is to match the request to the closest appropriate response to the specific control type
// based on timestamp

// get the response deque corresponding to the packet ID of the request deque
auto pos = resp_frames->find(packet_id);
// note that not finding a corresponding response deque is not indicative of error, as in
// case of MQTT packets that do not have responses like Publish with QOS 0
std::deque<mqtt::Message> empty_deque;
std::deque<mqtt::Message>& resp_deque = (pos != resp_frames->end()) ? pos->second : empty_deque;

// track the latest response timestamp to compare against request frame's timestamp later.
uint64_t latest_resp_ts = resp_deque.empty() ? 0 : resp_deque.back().timestamp_ns;
// finding the closest appropriate response from response deque in terms of timestamp and type
// for each request in the request deque
for (mqtt::Message& req_frame : req_deque) {
// if the request is a PUBLISH (QOS 1 or QOS 2) with dup false, entry needs to be made in
// UnansweredPublish
if (req_frame.control_packet_type == 3 && req_frame.header_fields["qos"] != 0 &&
!req_frame.dup) {
UnansweredPublish.insert(std::tuple<uint32_t, uint32_t>(
req_frame.header_fields["packet_identifier"], req_frame.header_fields["qos"]));
}
// if the request is a duplicate PUBLISH, find out if the original PUBLISH has been matched
if (req_frame.control_packet_type == 3 && req_frame.header_fields["qos"] != 0 &&
req_frame.dup) {
if (UnansweredPublish.find(std::tuple<uint32_t, uint32_t>(
req_frame.header_fields["packet_identifier"], req_frame.header_fields["qos"])) ==
UnansweredPublish.end()) {
req_frame.consumed = true;
continue;
ddelnano marked this conversation as resolved.
Show resolved Hide resolved
}
}

// getting the appropriate response match value for the request match key
MatchKey request_match_key = getMatchKey(req_frame);
auto iter = MapRequestToResponse.find(request_match_key);
if (iter == MapRequestToResponse.end()) {
VLOG(1) << absl::Substitute("Could not find any responses for frame type = $0",
request_match_key);
continue;
}
if (iter->second == UnmatchedResp) {
// Request without responses found
req_frame.consumed = true;
latest_resp_ts = req_frame.timestamp_ns + 1;
mqtt::Message dummy_resp;
entries.push_back({std::move(req_frame), std::move(dummy_resp)});
continue;
}
MatchKey response_match_value = iter->second;

// finding the first response frame with timestamp greater than request frame
auto first_timestamp_iter =
std::lower_bound(resp_deque.begin(), resp_deque.end(), req_frame.timestamp_ns,
[](const mqtt::Message& message, const uint64_t ts) {
return ts > message.timestamp_ns;
});
if (first_timestamp_iter == resp_deque.end()) {
VLOG(1) << absl::Substitute("Could not find any responses after timestamp = $0",
req_frame.timestamp_ns);
continue;
}

// finding the first appropriate response frame with the desired control packet type and flags
auto response_frame_iter = std::find_if(
first_timestamp_iter, resp_deque.end(), [response_match_value](mqtt::Message& message) {
return (getMatchKey(message) == response_match_value) & !message.consumed;
});
if (response_frame_iter == resp_deque.end()) {
VLOG(1) << absl::Substitute(
"Could not find any responses with control packet type and flag = $0",
response_match_value);
continue;
}
mqtt::Message& resp_frame = *response_frame_iter;

// if the response is PUBACK/PUBREC, then remove the associated (packet_identifier, qos) tuple
// from the UnansweredPublish set
if (resp_frame.control_packet_type == 4) {
UnansweredPublish.erase(
std::tuple<uint64_t, uint64_t>(resp_frame.header_fields["packet_identifier"], 1));
}
if (resp_frame.control_packet_type == 5) {
UnansweredPublish.erase(
std::tuple<uint64_t, uint64_t>(resp_frame.header_fields["packet_identifier"], 2));
}
ddelnano marked this conversation as resolved.
Show resolved Hide resolved

req_frame.consumed = true;
resp_frame.consumed = true;
entries.push_back({std::move(req_frame), std::move(resp_frame)});
}

// clearing the req_deque and resp_deque
auto erase_until_iter = req_deque.begin();
auto iter = req_deque.begin();
while (iter != req_deque.end() && (iter->timestamp_ns < latest_resp_ts)) {
if (iter->consumed) {
++erase_until_iter;
}
if (!iter->consumed && !(iter == req_deque.end() - 1) && ((erase_until_iter + 1)->consumed)) {
++error_count;
++erase_until_iter;
}
++iter;
}
req_deque.erase(req_deque.begin(), erase_until_iter);
}

// Verify which deque server side PUBLISH frames are inserted into. It's suspected that these
// PUBLISH requests will end up in the resp deque and will cause the resp deque cleanup logic to
// erroneously drop request frames
// TODO: Verify that the frames in response deque are not request frames before dropping

// iterate through all response dequeues to find out which ones haven't been consumed
for (auto& [packet_id, resp_deque] : *resp_frames) {
for (auto& resp : resp_deque) {
if (!resp.consumed) {
error_count++;
}
}
resp_deque.clear();
}
ddelnano marked this conversation as resolved.
Show resolved Hide resolved

return {entries, error_count};
}
} // namespace mqtt
} // namespace protocols
} // namespace stirling
} // namespace px
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2018- The Pixie Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <absl/container/flat_hash_map.h>
#include <deque>
#include <string>
#include <vector>

#include "src/stirling/source_connectors/socket_tracer/protocols/common/interface.h"
#include "src/stirling/source_connectors/socket_tracer/protocols/mqtt/types.h"

namespace px {
namespace stirling {
namespace protocols {
namespace mqtt {

/**
* StitchFrames is the entry point of the MQTT Stitcher. It loops through the req_frames,
* matches them with the corresponding resp_frames, and optionally produces an entry to emit.
*
* @param req_frames: deque of all request frames.
* @param resp_frames: deque of all response frames.
* @return A vector of entries to be appended to table store.
*/
RecordsWithErrorCount<Record> StitchFrames(
absl::flat_hash_map<packet_id_t, std::deque<Message>>* req_frames,
absl::flat_hash_map<packet_id_t, std::deque<Message>>* resp_frames);

} // namespace mqtt

template <>
inline RecordsWithErrorCount<mqtt::Record> StitchFrames(
absl::flat_hash_map<mqtt::packet_id_t, std::deque<mqtt::Message>>* req_messages,
absl::flat_hash_map<mqtt::packet_id_t, std::deque<mqtt::Message>>* res_messages,
NoState* /* state */) {
return mqtt::StitchFrames(req_messages, res_messages);
}

} // namespace protocols
} // namespace stirling
} // namespace px
Loading
Loading