Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embed StreamConfig within ShardInfo #1304

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

furq-aws
Copy link
Contributor

@furq-aws furq-aws commented Apr 4, 2024

Issue #, if available:
N/A.

Description of changes:
Add StreamConfig as a field within ShardInfo.

Previously, StreamIdentifier instances were reconstructed using the optional streamIdentifierSerOpt field within ShardInfo (which has since been removed).
Now, ShardInfo maintains a reference to the StreamIdentifier through StreamConfig, eliminating the need for reconstruction.

This change encompasses any additional refactoring necessitated by embedding StreamConfig within ShardInfo.
For example, there's no longer a need to explicitly provide initialPositionInStreamExtended (a field of StreamConfig) when ShardInfo is already available.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Comment on lines +258 to +261
when(leaseCoordinator.getAssignments()).thenReturn(
Stream.of(firstSequenceNumber, secondSequenceNumber, finalSequenceNumber)
.map(SchedulerTest::constructLease)
.collect(Collectors.toList()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually changes the behavior - returns all three leases in each call to getAssignments, rather than returning one new lease for each of the three subsequent calls (as intended by the original code); will revert this portion in next push.

Comment on lines +48 to +51
@Getter(AccessLevel.NONE)
private final boolean isMultiStreamMode;
@Getter(AccessLevel.NONE)
private final String streamIdentifierStr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these two member variables if they are just extracted out from streamConfig? I see you are using them in some functions in this class for comparisons but would it be more clean to extract it from streamConfig individually instead?

Comment on lines +960 to 964
/*
* NOTE: RecordsPublisher#createGetRecordsCache(ShardInfo, StreamConfig, MetricsFactory) is deprecated.
* RecordsPublisher#createGetRecordsCache(ShardInfo, MetricsFactory) will be called directly in the future.
*/
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason we are not changing this now is that there could be some clients which pass in their own interface of RetrievalFactory and may not implement the new function yet? Wouldn't that still mean its backwards incompatible if we ever change its, so it would have to be a major version upgrade?

* @param shardInfo The {@link ShardInfo} representing the shard for which records are to be retrieved.
* @param metricsFactory The {@link MetricsFactory} for recording metrics.
* @return A {@link RecordsPublisher} instance for retrieving records from the shard.
*/
RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting this was deprecated before and we are undeprecating it. Was there a reason this was deprecated before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes seems like it was deprecated before but this also suggests that ShardInfo was in a way a public contract too, the custom implementation may depend on shardInfo.streamIdentifierSerOpt :( So we cannot remove it, have to mark it deprecated, keep it same as before but dont use it in any of KCL default implementation and only use StreamConfig

Copy link
Contributor

@akidambisrinivasan akidambisrinivasan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass, still need to review the tests

if (streamIdentifierStr.isPresent()) {
final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get());
final StreamIdentifier streamIdentifier = shardInfo.streamConfig().streamIdentifier();
if (streamIdentifier.isMultiStreamInstance()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check? Can it just be if StringUtils.isBlank(shardInfo.streamConfig().consumerArn()) then use defaultConsumerArn?

The other difference between the two is whether we pass streamIdentifier.serialize() or not. Id say, always pass streamIdentifier, this value is only used for logging, and I think its trying to be smart about whether to only log shardId or streamSer+shardId. I dont get why that is needed, I think its ok to log streamIdentifier.toString()+shardId and for single stream case it would be streamName+shardId, its probably redundant information but we should not be complicating code and having 2 constructors and adding if statements and exposing a public method from streamIdentifier to achieve it. Its an overkill.

At this point, it maybe possible that customer may have script to parse our logs, thats the only time we should be concerned about changing logs. But I highly doubt if we need to be worried about it. We can ask around, Ill check with abhit to see what he thinks about changing logs, I think it should be fine. We should simplify it. But ill double check to make sure im not missing anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got confirmed its ok to change logs

*
* @return true if this is a multi-stream instance, false otherwise.
*/
public boolean isMultiStreamInstance() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont think this needs to be public, see my comment on the usage of this in during the construction of FanOutRecordsPublisher

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if you make the other change and we probably dont need a method because its used in only one place. Also I'd like to avoid saying a streamIdentifier is single stream instance or multi stream instance. What does it even mean? You can create a streamIdentifier from streamName, streamArn or streamSer, and its just an ID. I know this is existing code, it was probably meant to force customers to use a streamIdentifier constructed using streamArn or streamArn in multistream mode, but why? If is use just streamName in multi-stream mode, it should still work just fine?

Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty();
final StreamConfig streamConfig = getOrCreateStreamConfig(streamIdentifierSerialization);

final String shardId = isMultiStreamLease ? ((MultiStreamLease) lease).shardId() : lease.leaseKey();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally these should be encapsulated in the class itself. lease.shardId() should give leaseKey if its Lease and shardId if its MultiStreamLease.

final boolean isMultiStreamLease = lease instanceof MultiStreamLease;

final Optional<String> streamIdentifierSerialization = isMultiStreamLease ?
Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as below, encapsulate this inside the Lease and MultiStreamLease classes so you dont need these if checks, you can do
Optional.ofNullable(lease.streamIdentifier), make Lease.streamIdentifier return null


private StreamConfig getOrCreateStreamConfig(final Optional<String> streamIdentifierSerialization) {
if (!streamIdentifierSerialization.isPresent()) {
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double negations are always hard to read, can you use Validate.isFalse(isMultistreamMod, "...")

@@ -139,7 +134,7 @@ public boolean equals(Object obj) {
ShardInfo other = (ShardInfo) obj;
return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken)
.append(parentShardIds, other.parentShardIds).append(shardId, other.shardId)
.append(streamIdentifierSerOpt.orElse(""), other.streamIdentifierSerOpt.orElse("")).isEquals();
.append(streamIdentifierStr, other.streamIdentifierStr).isEquals();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh you want to compare StreamConfig instead you no longer need the streamIdentifierStr

private final String shardId;
private final String concurrencyToken;
// Sorted list of parent shardIds.
private final List<String> parentShardIds;
private final ExtendedSequenceNumber checkpoint;
private final StreamConfig streamConfig;

@Getter(AccessLevel.NONE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes thank you, this is necessary because this is class exposes a public contract which cannot be violated and we should not expose more than what is necessary. But I do agree with lucien that we probably dont need them and get it from streamConfig when needed.

@@ -120,8 +119,7 @@ public TaskResult call() {
*/
final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_TRACKER_OPERATION);
final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
shardInfo.streamIdentifierSerOpt()
.ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId)));
MetricsUtil.addStreamId(shardScope, shardInfo.streamConfig().streamIdentifier());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay we dont want to do this by default, for single stream application, this can change dimension and may break public contract on the metrics exposed by KCL.

Seems like then we need some method from streamIdentifier, you had isMultiStreamInstance. We could use it, but I dint want to expose that method based on the comment I left there. Lets think if there is another way.

@@ -218,8 +216,7 @@ private void callProcessRecords(ProcessRecordsInput input, List<KinesisClientRec
.millisBehindLatest(input.millisBehindLatest()).build();

final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
shardInfo.streamIdentifierSerOpt()
.ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId)));
MetricsUtil.addStreamId(scope, shardInfo.streamConfig().streamIdentifier());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:( dont know why this was part of the public interface, thanks for marking it deprecated, its never called from outside and its only called from within the class so it dint need to be part of the public interface, could just have been a private method.

Comment on lines +1075 to +1098
private ShardInfo constructShardInfoFromLease(final Lease lease) {
final boolean isMultiStreamLease = lease instanceof MultiStreamLease;

final Optional<String> streamIdentifierSerialization = isMultiStreamLease ?
Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty();
final StreamConfig streamConfig = getOrCreateStreamConfig(streamIdentifierSerialization);

final String shardId = isMultiStreamLease ? ((MultiStreamLease) lease).shardId() : lease.leaseKey();
return new ShardInfo(
shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig);
}

private StreamConfig getOrCreateStreamConfig(final Optional<String> streamIdentifierSerialization) {
if (!streamIdentifierSerialization.isPresent()) {
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
final StreamConfig streamConfig = currentStreamConfigMap.values().iterator().next();
Validate.notNull(streamConfig, "StreamConfig should not be null");
return streamConfig;
}

final StreamIdentifier streamIdentifier =
StreamIdentifier.multiStreamInstance(streamIdentifierSerialization.get());
return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private ShardInfo constructShardInfoFromLease(final Lease lease) {
final boolean isMultiStreamLease = lease instanceof MultiStreamLease;
final Optional<String> streamIdentifierSerialization = isMultiStreamLease ?
Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty();
final StreamConfig streamConfig = getOrCreateStreamConfig(streamIdentifierSerialization);
final String shardId = isMultiStreamLease ? ((MultiStreamLease) lease).shardId() : lease.leaseKey();
return new ShardInfo(
shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig);
}
private StreamConfig getOrCreateStreamConfig(final Optional<String> streamIdentifierSerialization) {
if (!streamIdentifierSerialization.isPresent()) {
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
final StreamConfig streamConfig = currentStreamConfigMap.values().iterator().next();
Validate.notNull(streamConfig, "StreamConfig should not be null");
return streamConfig;
}
final StreamIdentifier streamIdentifier =
StreamIdentifier.multiStreamInstance(streamIdentifierSerialization.get());
return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier));
}
private ShardInfo constructShardInfoFromLease(final Lease lease) {
Optional<MultiStreamLease> msl = Optional.of(lease).filter(l->l instanceof MultiStreamLease).map(l->(MultiStreamLease) l);
String shardId = msl.map(MultiStreamLease::shardId).orElse(lease.leaseKey());
StreamConfig streamConfig = msl.map(MultiStreamLease::streamIdentifier).map(streamIdentifierSerialization -> {
final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierSerialization);
return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier));
}).orElseGet(()->{
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
return Optional.ofNullable(currentStreamConfigMap.values().iterator().next()).get();
});
return new ShardInfo(
shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig);
}

This makes it a little more clear that the empty optional represents an uncastable Lease.

Comment on lines +258 to +261
when(leaseCoordinator.getAssignments()).thenReturn(
Stream.of(firstSequenceNumber, secondSequenceNumber, finalSequenceNumber)
.map(SchedulerTest::constructLease)
.collect(Collectors.toList()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
when(leaseCoordinator.getAssignments()).thenReturn(
Stream.of(firstSequenceNumber, secondSequenceNumber, finalSequenceNumber)
.map(SchedulerTest::constructLease)
.collect(Collectors.toList()));
when(leaseCoordinator.getAssignments()).thenReturn(
ImmutableList.of(
constructLease(firstSequenceNumber),
constructLease(secondSequenceNumber),
constructLease(finalSequenceNumber)));

Avoid control flow in unit tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants