Skip to content

Commit

Permalink
resolve review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rohityadav1993 committed Sep 1, 2024
1 parent 80eb155 commit 576d97f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pinot-connectors/pinot-flink-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
// fetch Pinot schema
Schema schema = PinotConnectionUtils.getSchema(client, "starbucksStores");
// fetch Pinot table config
TableConfig tableConfig = PinotConnectionUtils.getTableConfig(client, "starbucksStores", "OFFLINE");
TableConfig tableConfig = PinotConnectionUtils.getTableConfig(client, "starbucksStores", "REALTIME");

// create Flink Pinot Sink (partition it same as the realtime stream(e.g. kafka) in case of upsert tables)
srcDs.partitionCustom((Partitioner<Integer>) (key, partitions) -> key % partitions, r -> (Integer) r.getField("primaryKey"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ public class FlinkSegmentWriter implements SegmentWriter {
private transient Counter _processedRecords;
private transient volatile long _lastRecordProcessingTimeMs = 0;

public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup, String segmentNamePrefix,
public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup) {
this(indexOfSubtask, metricGroup, null, null);
}

public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup, @Nullable String segmentNamePrefix,
@Nullable Long segmentUploadTimeMs) {
_indexOfSubtask = indexOfSubtask;
_segmentNamePrefix = segmentNamePrefix;
Expand Down Expand Up @@ -136,7 +140,7 @@ public void init(TableConfig tableConfig, Schema schema, Map<String, String> bat

Map<String, String> batchConfigMap = _batchIngestionConfig.getBatchConfigMaps().get(0);
batchConfigMap.put(BatchConfigProperties.UPLOADED_REALTIME_PARTITION_ID, Integer.toString(_indexOfSubtask));
batchConfigMap.put(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS, String.valueOf(_segmentUploadTimeMs));
batchConfigMap.put(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS, Long.toString(_segmentUploadTimeMs));
batchConfigMap.computeIfAbsent(
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + "." + BatchConfigProperties.SEGMENT_NAME_PREFIX,
key -> StringUtils.isNotBlank(_segmentNamePrefix) ? _segmentNamePrefix
Expand All @@ -150,7 +154,7 @@ public void init(TableConfig tableConfig, Schema schema, Map<String, String> bat
batchConfigMap.put(segmentNamePostfixProp, segmentSuffix);

// For upsert tables must use the UploadedRealtimeSegmentName for right assignment of segments
if (_tableConfig.getTableType().equals(TableType.REALTIME)) {
if (_tableConfig.getTableType() == TableType.REALTIME) {
batchConfigMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ public class PinotSinkFunction<T> extends RichSinkFunction<T> implements Checkpo

private final PinotGenericRowConverter<T> _recordConverter;

private TableConfig _tableConfig;
private Schema _schema;
private final TableConfig _tableConfig;
private final Schema _schema;

private String _segmentNamePrefix;
@Nullable private final String _segmentNamePrefix;

// Used to set upload time in segment name, if not provided, current time is used
@Nullable private Long _segmentUploadTimeMs;
@Nullable private final Long _segmentUploadTimeMs;

private transient SegmentWriter _segmentWriter;
private transient SegmentUploader _segmentUploader;
Expand All @@ -77,17 +77,17 @@ public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, TableConfi

public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, TableConfig tableConfig, Schema schema,
long segmentFlushMaxNumRecords, int executorPoolSize) {
this(recordConverter, tableConfig, schema, segmentFlushMaxNumRecords, executorPoolSize, null, null);
}

public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, TableConfig tableConfig, Schema schema,
long segmentFlushMaxNumRecords, int executorPoolSize, @Nullable String segmentNamePrefix,
@Nullable Long segmentUploadTimeMs) {
_recordConverter = recordConverter;
_tableConfig = tableConfig;
_schema = schema;
_segmentFlushMaxNumRecords = segmentFlushMaxNumRecords;
_executorPoolSize = executorPoolSize;
}

public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, TableConfig tableConfig, Schema schema,
long segmentFlushMaxNumRecords, int executorPoolSize, String segmentNamePrefix,
@Nullable Long segmentUploadTimeMs) {
this(recordConverter, tableConfig, schema, segmentFlushMaxNumRecords, executorPoolSize);
_segmentNamePrefix = segmentNamePrefix;
_segmentUploadTimeMs = segmentUploadTimeMs;
}
Expand Down

0 comments on commit 576d97f

Please sign in to comment.