Skip to content

Commit

Permalink
fix: Address incompatibility of ipaddress (facebookincubator#12134)
Browse files Browse the repository at this point in the history
Summary:

 Presto may perform constant folding on queries before sending the fragment to velox workers. However, when the workers receive the fragments, the fragments may contain types which had a different implementation than how velox implemented the type. This incompatibility results in incorrect results.

For example, this PR fixes the type incompatibility between Java coordinator and C++ worker for `ipaddress` types. 
 - Java coordinator, ipaddress is represented as a slice of 16 bytes which if represented as a number, would be big endian. 
- C++ worker, ipaddress is represented as an int128_t, in little endian form.

The discrepancy between these two can be see with on native engine, the result set will be `::ffff:1.2.3.4` represented in reverse byte order
```
SELECT 
  CAST(ip AS ipaddress) as casted_ip 
FROM 
  (
    VALUES 
      ('::ffff:1.2.3.4')
  ) AS t (ip)
```

To address this issue, we can reverse the byte order of the ipaddress type sent from and to Java.

**Note**: 

- This issue is not exclusive to ipaddrss, and other custom types in velox which have different underlying type/implementation than Java may suffer from this issue as well.

- We can likely enhance the fuzzer to help catch cases like this at diff time once custom fuzzer inputs are landed (facebookincubator#11466)

Reviewed By: Yuhta

Differential Revision: D68284630
  • Loading branch information
yuandagits authored and facebook-github-bot committed Feb 5, 2025
1 parent 93facc8 commit ad5b079
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 0 deletions.
50 changes: 50 additions & 0 deletions velox/serializers/PrestoSerializerDeserializationUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,46 @@ void readDecimalValues(
}
}

int128_t readIpAddress(ByteInputStream* source) {
// Java stores ipaddress as a binary, and thus the binary
// is always in big endian byte order. In Velox, ipaddress
// is a custom type with underlying type of int128_t, which
// is always stored as little endian byte order. This means
// to ensure compatibility between the coordinator and velox,
// we need to actually convert the 16 bytes read from coordinator
// to little endian.
const int128_t beIpIntAddr = source->read<int128_t>();
return reverseIpAddressByteOrder(beIpIntAddr);
}

void readIpAddressValues(
ByteInputStream* source,
vector_size_t size,
vector_size_t offset,
const BufferPtr& nulls,
vector_size_t nullCount,
const BufferPtr& values) {
auto rawValues = values->asMutable<int128_t>();
if (nullCount) {
checkValuesSize<int128_t>(values, nulls, size, offset);

vector_size_t toClear = offset;
bits::forEachSetBit(
nulls->as<uint64_t>(), offset, offset + size, [&](vector_size_t row) {
// Set the values between the last non-null and this to type default.
for (; toClear < row; ++toClear) {
rawValues[toClear] = 0;
}
rawValues[row] = readIpAddress(source);
toClear = row + 1;
});
} else {
for (vector_size_t row = 0; row < size; ++row) {
rawValues[offset + row] = readIpAddress(source);
}
}
}

int128_t readUuidValue(ByteInputStream* source) {
// ByteInputStream does not support reading int128_t values.
// UUIDs are serialized as 2 uint64 values with msb value first.
Expand Down Expand Up @@ -625,6 +665,16 @@ void read(
values);
return;
}
if (isIPAddressType(type)) {
readIpAddressValues(
source,
numNewValues,
resultOffset,
flatResult->nulls(),
nullCount,
values);
return;
}
readValues<T>(
source,
numNewValues,
Expand Down
15 changes: 15 additions & 0 deletions velox/serializers/PrestoSerializerSerializationUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,14 @@ void appendNonNull(
numNonNull,
values,
toJavaUuidValue);
} else if (stream->isIpAddress()) {
copyWordsWithRows(
output,
rows.data(),
nonNullIndices,
numNonNull,
values,
reverseIpAddressByteOrder);
} else {
copyWordsWithRows(
output, rows.data(), nonNullIndices, numNonNull, values);
Expand Down Expand Up @@ -608,6 +616,13 @@ void serializeFlatVector(
output, rows.data(), rows.size(), rawValues, toJavaDecimalValue);
} else if (stream->isUuid()) {
copyWords(output, rows.data(), rows.size(), rawValues, toJavaUuidValue);
} else if (stream->isIpAddress()) {
copyWords(
output,
rows.data(),
rows.size(),
rawValues,
reverseIpAddressByteOrder);
} else {
copyWords(output, rows.data(), rows.size(), rawValues);
}
Expand Down
6 changes: 6 additions & 0 deletions velox/serializers/PrestoSerializerSerializationUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
* limitations under the License.
*/
#pragma once
#include <folly/IPAddressV6.h>

#include "velox/common/memory/ByteStream.h"
#include "velox/functions/prestosql/types/IPAddressType.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/type/DecimalUtil.h"
#include "velox/type/Type.h"
Expand Down Expand Up @@ -54,6 +56,10 @@ static inline const std::string_view kRLE{"RLE"};
static inline const std::string_view kDictionary{"DICTIONARY"};

void initBitsToMapOnce();
FOLLY_ALWAYS_INLINE int128_t
reverseIpAddressByteOrder(int128_t currentIpBytes) {
return DecimalUtil::bigEndian(currentIpBytes);
}

FOLLY_ALWAYS_INLINE int128_t toJavaDecimalValue(int128_t value) {
// Presto Java UnscaledDecimal128 representation uses signed magnitude
Expand Down
3 changes: 3 additions & 0 deletions velox/serializers/VectorStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ VectorStream::VectorStream(
nullsFirst_(opts.nullsFirst),
isLongDecimal_(type_->isLongDecimal()),
isUuid_(isUuidType(type_)),
isIpAddress_(isIPAddressType(type_)),
opts_(opts),
encoding_(getEncoding(encoding, vector)),
nulls_(streamArena, true, true),
Expand Down Expand Up @@ -353,6 +354,8 @@ void VectorStream::append(folly::Range<const int128_t*> values) {
val = toJavaDecimalValue(value);
} else if (isUuid_) {
val = toJavaUuidValue(value);
} else if (isIpAddress_) {
val = reverseIpAddressByteOrder(value);
}
values_.append<int128_t>(folly::Range(&val, 1));
}
Expand Down
5 changes: 5 additions & 0 deletions velox/serializers/VectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class VectorStream {
return isUuid_;
}

bool isIpAddress() const {
return isIpAddress_;
}

void clear();

private:
Expand All @@ -196,6 +200,7 @@ class VectorStream {
const bool nullsFirst_;
const bool isLongDecimal_;
const bool isUuid_;
const bool isIpAddress_;
const PrestoVectorSerde::PrestoOptions opts_;
std::optional<VectorEncoding::Simple> encoding_;
int32_t nonNullCount_{0};
Expand Down
19 changes: 19 additions & 0 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <vector>
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/memory/ByteStream.h"
#include "velox/functions/prestosql/types/IPAddressType.h"
#include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h"
#include "velox/serializers/PrestoVectorLexer.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"
Expand Down Expand Up @@ -1164,6 +1165,24 @@ TEST_P(PrestoSerializerTest, longDecimal) {
testRoundTrip(vector);
}

TEST_P(PrestoSerializerTest, ipaddress) {
auto ipaddress = makeFlatVector<int128_t>(
100,
[](auto row) {
return HugeInt::build(folly::Random::rand64(), folly::Random::rand64());
},
/* isNullAt */ nullptr,
IPADDRESS());

testRoundTrip(ipaddress);

// Add some nulls.
for (auto i = 0; i < 100; i += 7) {
ipaddress->setNull(i, true);
}
testRoundTrip(ipaddress);
}

TEST_P(PrestoSerializerTest, uuid) {
auto vector = makeFlatVector<int128_t>(
200, [](vector_size_t row) { return (int128_t)0xD1 << row % 120; });
Expand Down

0 comments on commit ad5b079

Please sign in to comment.