Skip to content

Commit

Permalink
Merge pull request #32 from r12f/cherry-pick-1509
Browse files Browse the repository at this point in the history
[sairedis/syncd] Implement bulk get support (#1509)
  • Loading branch information
r12f authored Mar 1, 2025
2 parents 2f4d39d + 995e71f commit 0c1750e
Show file tree
Hide file tree
Showing 29 changed files with 1,184 additions and 88 deletions.
13 changes: 10 additions & 3 deletions lib/ClientServerSai.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,18 @@ sai_status_t ClientServerSai::bulkGet(
_In_ sai_bulk_op_error_mode_t mode,
_Out_ sai_status_t *object_statuses)
{
MUTEX();
SWSS_LOG_ENTER();
REDIS_CHECK_API_INITIALIZED();

SWSS_LOG_ERROR("not implemented, FIXME");

return SAI_STATUS_NOT_IMPLEMENTED;
return m_sai->bulkGet(
object_type,
object_count,
object_id,
attr_count,
attr_list,
mode,
object_statuses);
}

// BULK QUAD ENTRY
Expand Down
34 changes: 34 additions & 0 deletions lib/Recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,40 @@ void Recorder::recordGenericGetResponse(
recordLine("G|" + sai_serialize_status(status) + "|" + Globals::joinFieldValues(arguments));
}

void Recorder::recordBulkGenericGet(
_In_ const std::string& key,
_In_ const std::vector<swss::FieldValueTuple>& arguments)
{
SWSS_LOG_ENTER();

std::string joined;

for (const auto &e: arguments)
{
joined += "||" + fvField(e) + "|" + fvValue(e);
}

// capital 'B' stands for Bulk GET operation. Note: 'G' already used for get response.

recordLine("B|" + key + joined);
}

void Recorder::recordBulkGenericGetResponse(
_In_ sai_status_t status,
_In_ const std::vector<swss::FieldValueTuple>& arguments)
{
SWSS_LOG_ENTER();

std::string joined;

for (const auto &e: arguments)
{
joined += "||" + fvField(e) + "|" + fvValue(e);
}

recordLine("G|" + sai_serialize_status(status) + "|" + joined);
}

void Recorder::recordGenericGetStats(
_In_ sai_object_type_t object_type,
_In_ sai_object_id_t object_id,
Expand Down
8 changes: 8 additions & 0 deletions lib/Recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@ namespace sairedis
_In_ uint32_t objectCount,
_In_ const sai_status_t *objectStatuses);

void recordBulkGenericGet(
_In_ const std::string& key,
_In_ const std::vector<swss::FieldValueTuple>& arguments);

void recordBulkGenericGetResponse(
_In_ sai_status_t status,
_In_ const std::vector<swss::FieldValueTuple>& arguments);

public: // SAI query interface API

void recordFlushFdbEntries(
Expand Down
135 changes: 133 additions & 2 deletions lib/RedisRemoteSaiInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "meta/PerformanceIntervalTimer.h"
#include "meta/Globals.h"

#include "swss/tokenize.h"

#include "config.h"

#include <inttypes.h>
Expand Down Expand Up @@ -1674,6 +1676,77 @@ sai_status_t RedisRemoteSaiInterface::waitForBulkResponse(
return SAI_STATUS_SUCCESS;
}

sai_status_t RedisRemoteSaiInterface::waitForBulkGetResponse(
_In_ sai_object_type_t objectType,
_In_ uint32_t object_count,
_In_ const uint32_t *attr_count,
_Inout_ sai_attribute_t **attr_list,
_Out_ sai_status_t *object_statuses)
{
SWSS_LOG_ENTER();

swss::KeyOpFieldsValuesTuple kco;

const auto status = m_communicationChannel->wait(REDIS_ASIC_STATE_COMMAND_GETRESPONSE, kco);

const auto &values = kfvFieldsValues(kco);

if (values.size() != object_count)
{
SWSS_LOG_THROW("wrong number of statuses, got %zu, expected %u", values.size(), object_count);
}

for (size_t idx = 0; idx < values.size(); idx++)
{
// field = status
// value = attrid=attrvalue|...

const auto& statusStr = fvField(values[idx]);
const auto& joined = fvValue(values[idx]);

const auto v = swss::tokenize(joined, '|');

std::vector<swss::FieldValueTuple> entries; // attributes per object id
entries.reserve(v.size());

for (size_t i = 0; i < v.size(); i++)
{
const std::string item = v.at(i);

auto start = item.find_first_of("=");

auto field = item.substr(0, start);
auto value = item.substr(start + 1);

entries.emplace_back(field, value);
}

// deserialize statuses for all objects
sai_deserialize_status(statusStr, object_statuses[idx]);

const auto objectStatus = object_statuses[idx];

if (objectStatus == SAI_STATUS_SUCCESS || objectStatus == SAI_STATUS_BUFFER_OVERFLOW)
{
const auto countOnly = (objectStatus == SAI_STATUS_BUFFER_OVERFLOW);

if (values.size() == 0)
{
SWSS_LOG_THROW("logic error, get response returned 0 values!, send api response or sync/async issue?");
}

SaiAttributeList list(objectType, entries, countOnly);

// no need for id fix since this is overflow
transfer_attributes(objectType, attr_count[idx], list.get_attr_list(), attr_list[idx], countOnly);
}
}

m_recorder->recordBulkGenericGetResponse(status, values);

return status;
}

sai_status_t RedisRemoteSaiInterface::bulkRemove(
_In_ sai_object_type_t object_type,
_In_ uint32_t object_count,
Expand Down Expand Up @@ -1764,9 +1837,67 @@ sai_status_t RedisRemoteSaiInterface::bulkGet(
{
SWSS_LOG_ENTER();

SWSS_LOG_ERROR("not implemented, FIXME");
std::vector<std::string> serializedObjectIds;
serializedObjectIds.reserve(object_count);

return SAI_STATUS_NOT_IMPLEMENTED;
for (uint32_t idx = 0; idx < object_count; idx++)
{
serializedObjectIds.emplace_back(sai_serialize_object_id(object_id[idx]));
}

return bulkGet(object_type, serializedObjectIds, attr_count, attr_list, mode, object_statuses);
}

sai_status_t RedisRemoteSaiInterface::bulkGet(
_In_ sai_object_type_t object_type,
_In_ const std::vector<std::string> &serialized_object_ids,
_In_ const uint32_t *attr_count,
_Inout_ sai_attribute_t **attr_list,
_In_ sai_bulk_op_error_mode_t mode,
_Inout_ sai_status_t *object_statuses)
{
SWSS_LOG_ENTER();

const auto serializedObjectType = sai_serialize_object_type(object_type);

std::vector<swss::FieldValueTuple> entries;
entries.reserve(serialized_object_ids.size());

for (size_t idx = 0; idx < serialized_object_ids.size(); idx++)
{
/*
* Since user may reuse buffers, then oid list buffers maybe not cleared
* and contain some garbage, let's clean them so we send all oids as null to
* syncd.
*/

Utils::clearOidValues(object_type, attr_count[idx], attr_list[idx]);

const auto entry = SaiAttributeList::serialize_attr_list(object_type, attr_count[idx], attr_list[idx], false);

const auto strAttr = Globals::joinFieldValues(entry);

swss::FieldValueTuple fvt(serialized_object_ids[idx] , strAttr);

entries.push_back(fvt);
}

/*
* We are adding number of entries to actually add ':' to be compatible
* with previous
*/

const auto key = serializedObjectType + ":" + std::to_string(entries.size());

m_communicationChannel->set(key, entries, REDIS_ASIC_STATE_COMMAND_BULK_GET);

m_recorder->recordBulkGenericGet(serializedObjectType, entries);

const auto object_count = static_cast<uint32_t>(serialized_object_ids.size());

const auto status = waitForBulkGetResponse(object_type, object_count, attr_count, attr_list, object_statuses);

return status;
}

sai_status_t RedisRemoteSaiInterface::bulkCreate(
Expand Down
25 changes: 25 additions & 0 deletions lib/RedisRemoteSaiInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ namespace sairedis
_In_ sai_bulk_op_error_mode_t mode,
_Out_ sai_status_t *object_statuses);

sai_status_t bulkGet(
_In_ sai_object_type_t object_type,
_In_ const std::vector<std::string> &serialized_object_ids,
_In_ const uint32_t *attr_count,
_Inout_ sai_attribute_t **attr_list,
_In_ sai_bulk_op_error_mode_t mode,
_Out_ sai_status_t *object_statuses);

private: // QUAD API response

/**
Expand Down Expand Up @@ -317,6 +325,23 @@ namespace sairedis
_In_ uint32_t object_count,
_Out_ sai_status_t *object_statuses);

/**
* @brief Wait for bulk GET response.
*
* Will wait for response from syncd. Method only used for bulk
* GET object. If object status is SUCCESS all values will be deserialized
* and transferred to user buffers. If object status is BUFFER_OVERFLOW
* then all non list values will be transferred, but LIST objects
* will only transfer COUNT item of list, without touching user
* list at all.
*/
sai_status_t waitForBulkGetResponse(
_In_ sai_object_type_t objectType,
_In_ uint32_t object_count,
_In_ const uint32_t *attr_count,
_Inout_ sai_attribute_t **attr_list,
_Out_ sai_status_t *object_statuses);

private: // stats API response

sai_status_t waitForGetStatsResponse(
Expand Down
14 changes: 11 additions & 3 deletions lib/Sai.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,19 @@ sai_status_t Sai::bulkGet(
_In_ sai_bulk_op_error_mode_t mode,
_Out_ sai_status_t *object_statuses)
{
MUTEX();
SWSS_LOG_ENTER();
REDIS_CHECK_API_INITIALIZED();
REDIS_CHECK_CONTEXT(*object_id);

SWSS_LOG_ERROR("not implemented, FIXME");

return SAI_STATUS_NOT_IMPLEMENTED;
return context->m_meta->bulkGet(
object_type,
object_count,
object_id,
attr_count,
attr_list,
mode,
object_statuses);
}

// BULK QUAD ENTRY
Expand Down
11 changes: 8 additions & 3 deletions lib/ServerSai.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,14 @@ sai_status_t ServerSai::bulkGet(
SWSS_LOG_ENTER();
REDIS_CHECK_API_INITIALIZED();

SWSS_LOG_ERROR("not implemented, FIXME");

return SAI_STATUS_NOT_IMPLEMENTED;
return m_sai->bulkGet(
object_type,
object_count,
object_id,
attr_count,
attr_list,
mode,
object_statuses);
}

// BULK QUAD ENTRY
Expand Down
52 changes: 50 additions & 2 deletions meta/Meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1270,9 +1270,57 @@ sai_status_t Meta::bulkGet(
{
SWSS_LOG_ENTER();

SWSS_LOG_ERROR("not implemented, FIXME");
PARAMETER_CHECK_IF_NOT_NULL(object_statuses);

return SAI_STATUS_NOT_IMPLEMENTED;
for (uint32_t idx = 0; idx < object_count; idx++)
{
object_statuses[idx] = SAI_STATUS_NOT_EXECUTED;
}

PARAMETER_CHECK_OBJECT_TYPE_VALID(object_type);
PARAMETER_CHECK_POSITIVE(object_count);
PARAMETER_CHECK_IF_NOT_NULL(attr_count);
PARAMETER_CHECK_IF_NOT_NULL(attr_list);

if (sai_metadata_get_enum_value_name(&sai_metadata_enum_sai_bulk_op_error_mode_t, mode) == nullptr)
{
SWSS_LOG_ERROR("mode value %d is not in range on %s", mode, sai_metadata_enum_sai_bulk_op_error_mode_t.name);

return SAI_STATUS_INVALID_PARAMETER;
}

std::vector<sai_object_meta_key_t> vmk;

for (uint32_t idx = 0; idx < object_count; idx++)
{
sai_status_t status = meta_sai_validate_oid(object_type, &object_id[idx], SAI_NULL_OBJECT_ID, false);

CHECK_STATUS_SUCCESS(status);

sai_object_meta_key_t meta_key = { .objecttype = object_type, .objectkey = { .key = { .object_id = object_id[idx] } } };

vmk.push_back(meta_key);

status = meta_generic_validation_get(meta_key, attr_count[idx], attr_list[idx]);

// FIXME: This macro returns on failure.
// When mode is SAI_BULK_OP_ERROR_MODE_IGNORE_ERROR we should continue instead of return.
// This issue exists for all bulk operations.
CHECK_STATUS_SUCCESS(status);
}

auto status = m_implementation->bulkGet(object_type, object_count, object_id, attr_count, attr_list, mode, object_statuses);

for (uint32_t idx = 0; idx < object_count; idx++)
{
if (object_statuses[idx] == SAI_STATUS_SUCCESS)
{
sai_object_id_t switch_id = switchIdQuery(object_id[idx]);
meta_generic_validation_post_get(vmk[idx], switch_id, attr_count[idx], attr_list[idx]);
}
}

return status;
}

sai_status_t Meta::bulkCreate(
Expand Down
Loading

0 comments on commit 0c1750e

Please sign in to comment.