Skip to content

Commit

Permalink
End to end test passed
Browse files Browse the repository at this point in the history
  • Loading branch information
Fred1155 committed Jan 1, 2025
1 parent 4e43df3 commit 9e32782
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 120 deletions.
5 changes: 5 additions & 0 deletions metric-publishers/emf-metric-publisher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
<version>${awsjavasdk.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.annotations.Immutable;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.annotations.ThreadSafe;
import software.amazon.awssdk.core.metrics.CoreMetric;
import software.amazon.awssdk.metrics.MetricCategory;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricLevel;
Expand Down Expand Up @@ -50,6 +51,13 @@
* monitoring and alerting.
* </p>
*
* <pre>
* // Create a CloudWatchMetricPublisher using a custom namespace.
* MetricPublisher metricPublisher = EmfMetricPublisher.builder()
* .namespace("MyApplication")
* .build();
* </pre>
*
*
* @see MetricPublisher The base interface for metric publishers
* @see MetricCollection For the collection of metrics to be published
Expand Down Expand Up @@ -135,7 +143,7 @@ public Builder namespace(String namespace) {
* this
* publisher.
*
* <p>If this is not specified, [] will be used.
* <p>If this is not specified, {@link CoreMetric#SERVICE_ID} and {@link CoreMetric#OPERATION_NAME} will be used.
*/
public Builder dimensions(Collection<SdkMetric<String>> dimensions) {
this.dimensions = new ArrayList<>(dimensions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,15 @@
import software.amazon.awssdk.utils.Logger;

/**
* # MetricEmfConverter
* Converts metrics into Amazon CloudWatch Embedded Metric Format (EMF).
* This internal class handles the transformation of various metric types
* into EMF-compatible format.
* ## Configuration
* The converter is initialized with an `EmfMetricConfiguration` that defines:
* - Metric categories to process
* - Other EMF-specific settings
* ## Implementation Notes
* - Boolean values are converted to numeric values (1.0 for true, 0.0 for false)
* - Null values are converted to 0.0
* - Very small numeric values (below 0.0001) are normalized to 0.0
* - Duration values are converted to milliseconds
* Converts {@link MetricCollection} into List of Amazon CloudWatch Embedded Metric Format (EMF) Strings.
* <p>
* This class is responsible for transforming {@link MetricCollection} into the EMF format required by CloudWatch.
* It handles the conversion of different metric types and ensures the output conforms to EMF specifications.
* </p>
*
*
* @see <a href="https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format.html">
* CloudWatch Embedded Metric Format Specification</a>
* @see EmfMetricConfiguration
*/
@SdkInternalApi
Expand All @@ -73,48 +65,14 @@ public MetricEmfConverter(EmfMetricConfiguration config) {
}

/**
* Processes and normalizes metric values for EMF formatting.
* The method handles various input types and normalizes them according to EMF requirements:
* - `null` : 0.0
* - `Boolean` : 1.0 (true) or 0.0 (false)
* - `Duration` : milliseconds value
* - `Double`: normalized to 0.0 if below threshold
*
* @param mRecord The metric record to process
*/
private void processAndWriteValue(JsonWriter jsonWriter, MetricRecord<?> mRecord) {
Object value = mRecord.value();
Class<?> valueClass = mRecord.metric().valueClass();

if (value == null) {
jsonWriter.writeValue(0.0);
} else if (Boolean.class.isAssignableFrom(valueClass)) {
jsonWriter.writeValue(value.equals(true) ? 1.0 : 0.0);
} else if (Duration.class.isAssignableFrom(valueClass)) {
Duration duration = (Duration) value;
double millisValue = duration.toMillis();
jsonWriter.writeValue(millisValue);
} else if (Double.class.isAssignableFrom(valueClass)) {
double doubleValue = (Double) value;
if (Math.abs(doubleValue) < ZERO_THRESHOLD) {
jsonWriter.writeValue(0.0);
} else {
jsonWriter.writeValue(doubleValue);
}
} else if (Integer.class.isAssignableFrom(valueClass)) {
jsonWriter.writeValue((Integer) value);
} else if (Long.class.isAssignableFrom(valueClass)) {
jsonWriter.writeValue((Long) value);
}
}

/**
* # Convert SDK Metrics to EMF Format
* <p>
* Convert SDK Metrics to EMF Format.
* Transforms a collection of SDK metrics into CloudWatch's Embedded Metric Format (EMF).
* The method processes standard SDK measurements and structures them according to
* CloudWatch's EMF specification.
* ## Example Output
* ```json
* </p>
* Example Output
* <pre>
* {
* "_aws": {
* "Timestamp": 1672963200,
Expand All @@ -126,10 +84,10 @@ private void processAndWriteValue(JsonWriter jsonWriter, MetricRecord<?> mRecord
* }]
* }]
* },
* "ServiceId": "XXXXXXXXXXXXX",
* "ServiceId": "DynamoDB",
* "AvailableConcurrency": 5
* }
* ```
* </pre>
*
* @param metricCollection Collection of SDK metrics to be converted
* @return List of EMF-formatted metrics ready for CloudWatch
Expand Down Expand Up @@ -168,6 +126,48 @@ public List<String> convertMetricCollectionToEmf(MetricCollection metricCollecti
return createEmfStrings(aggregatedMetrics);
}

/**
* Processes and normalizes metric values for EMF formatting.
* The method handles various input types and normalizes them according to EMF requirements:
* Value Conversion Rules:
* <ul>
* <li>Numbers (Integer, Long, Double, etc.) are converted to their native numeric format</li>
* <li>Duration values are converted to milliseconds</li>
* <li>Date/Time values are converted to epoch milliseconds</li>
* <li>Null values are omitted from the output</li>
* <li>Boolean values are converted to 1 (true) or 0 (false)</li>
* <li>Non-Dimension metrics with non-numeric values are omitted from the output</li>
* </ul>
*
* @param mRecord The metric record to process
*/
private void processAndWriteValue(JsonWriter jsonWriter, MetricRecord<?> mRecord) {
Object value = mRecord.value();
Class<?> valueClass = mRecord.metric().valueClass();

if (value == null) {
return;
}
if (Boolean.class.isAssignableFrom(valueClass)) {
jsonWriter.writeValue(value.equals(true) ? 1.0 : 0.0);
} else if (Duration.class.isAssignableFrom(valueClass)) {
Duration duration = (Duration) value;
double millisValue = duration.toMillis();
jsonWriter.writeValue(millisValue);
} else if (Double.class.isAssignableFrom(valueClass)) {
double doubleValue = (Double) value;
if (Math.abs(doubleValue) < ZERO_THRESHOLD) {
jsonWriter.writeValue(0.0);
} else {
jsonWriter.writeValue(doubleValue);
}
} else if (Integer.class.isAssignableFrom(valueClass)) {
jsonWriter.writeValue((Integer) value);
} else if (Long.class.isAssignableFrom(valueClass)) {
jsonWriter.writeValue((Long) value);
}
}

private List<String> createEmfStrings(Map<String, List<MetricRecord<?>>> aggregatedMetrics) {
List<String> emfStrings = new ArrayList<>();
Map<String, List<MetricRecord<?>>> currentMetricBatch = new HashMap<>();
Expand All @@ -179,8 +179,9 @@ private List<String> createEmfStrings(Map<String, List<MetricRecord<?>>> aggrega

if (records.size() > 100) {
records = records.subList(0, 100);
int size = records.size();
logger.warn(() -> "Some AWS SDK client-side metric data have been dropped because it exceeds the cloudwatch "
+ "requirements.");
+ "requirements. There are " + size + " values for metric " + metricName);
}

if (currentMetricNames.size() >= 100) {
Expand All @@ -190,7 +191,7 @@ private List<String> createEmfStrings(Map<String, List<MetricRecord<?>>> aggrega
}

currentMetricBatch.put(metricName, records);
if (!String.class.isAssignableFrom(records.get(0).metric().valueClass())) {
if (!isStringMetric(records.get(0))) {
currentMetricNames.put(metricName, records.get(0).metric().valueClass());
}

Expand Down Expand Up @@ -222,7 +223,7 @@ private void writeAwsObject(JsonWriter jsonWriter, Map<String, Class<?>> metricN


jsonWriter.writeFieldName("Timestamp");
jsonWriter.writeValue(clock.instant());
jsonWriter.writeValue(clock.instant().toEpochMilli());


jsonWriter.writeFieldName("LogGroupName");
Expand Down Expand Up @@ -266,54 +267,86 @@ private void writeMetricDefinitionArray(JsonWriter jsonWriter, Map<String, Clas
jsonWriter.writeFieldName("Metrics");
jsonWriter.writeStartArray();

metricNames.forEach((name, type) -> writeMetricDefinition(jsonWriter, name, type));

// Write metric definitions
metricNames.forEach((name, type) -> {
jsonWriter.writeStartObject();
jsonWriter.writeFieldName("Name");
jsonWriter.writeValue(name);
jsonWriter.writeEndArray();
}

String unit = getMetricUnit(type);
if (unit != null) {
jsonWriter.writeFieldName("Unit");
jsonWriter.writeValue(unit);
}
private void writeMetricDefinition(JsonWriter jsonWriter, String name, Class<?> type) {
jsonWriter.writeStartObject();
jsonWriter.writeFieldName("Name");
jsonWriter.writeValue(name);

jsonWriter.writeEndObject();
});
String unit = getMetricUnit(type);
if (unit != null) {
jsonWriter.writeFieldName("Unit");
jsonWriter.writeValue(unit);
}

jsonWriter.writeEndArray();
jsonWriter.writeEndObject();
}


private void writeMetricValues(JsonWriter jsonWriter, Map<String, List<MetricRecord<?>>> metrics) {
for (Map.Entry<String, List<MetricRecord<?>>> entry : metrics.entrySet()) {
String metricName = entry.getKey();
List<MetricRecord<?>> records = entry.getValue();

metrics.forEach((metricName, records) -> {
if (records.isEmpty()) {
return;
}
if (isDimension(metricName)) {
jsonWriter.writeFieldName(metricName);
jsonWriter.writeValue((String) records.get(0).value());
writeDimensionValue(jsonWriter, metricName, records);
} else {
if (records.get(0).value() instanceof String) {
continue;
}
if (records.size() == 1) {
jsonWriter.writeFieldName(metricName);
processAndWriteValue(jsonWriter, records.get(0));
} else {
jsonWriter.writeFieldName(metricName);
jsonWriter.writeStartArray();
for (MetricRecord<?> mRecord: records) {
processAndWriteValue(jsonWriter, mRecord);
}
jsonWriter.writeEndArray();
}
writeMetricRecord(jsonWriter, metricName, records);
}
});
}

private void writeDimensionValue(JsonWriter jsonWriter, String metricName, List<MetricRecord<?>> records) {
if (records.get(0).value() == null) {
return;
}

jsonWriter.writeFieldName(metricName);
jsonWriter.writeValue((String) records.get(0).value());
}

private void writeMetricRecord(JsonWriter jsonWriter, String metricName, List<MetricRecord<?>> records) {
MetricRecord<?> firstRecord = records.get(0);

if (!isNumericMetric(firstRecord) || (records.size() == 1 && firstRecord.value() == null)) {
return;
}

jsonWriter.writeFieldName(metricName);

if (records.size() == 1) {
processAndWriteValue(jsonWriter, firstRecord);
} else {
writeMetricArray(jsonWriter, records);
}
}

private boolean isStringMetric(MetricRecord<?> mRecord) {
return String.class.isAssignableFrom(mRecord.metric().valueClass());
}

private boolean isNumericMetric(MetricRecord<?> mRecord) {
return Integer.class.isAssignableFrom(mRecord.metric().valueClass())
|| Boolean.class.isAssignableFrom(mRecord.metric().valueClass())
|| Long.class.isAssignableFrom(mRecord.metric().valueClass())
|| Duration.class.isAssignableFrom(mRecord.metric().valueClass())
|| Double.class.isAssignableFrom(mRecord.metric().valueClass());
}


private void writeMetricArray(JsonWriter jsonWriter, List<MetricRecord<?>> records) {
jsonWriter.writeStartArray();
for (MetricRecord<?> mRecord : records) {
processAndWriteValue(jsonWriter, mRecord);
}
jsonWriter.writeEndArray();
}



private boolean isDimension(String metricName) {
return metricName != null && config.getDimensionStrings().contains(metricName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/apache2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
#

status = warn

appender.console.type = Console
appender.console.name = ConsoleAppender
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n%throwable

rootLogger.level = info
rootLogger.appenderRef.stdout.ref = ConsoleAppender

# Uncomment below to enable more specific logging
#
#logger.sdk.name = software.amazon.awssdk
#logger.sdk.level = debug
#
#logger.request.name = software.amazon.awssdk.request
#logger.request.level = debug
#
#logger.apache.name = org.apache.http.wire
#logger.apache.level = debug
#
#logger.netty.name = io.netty.handler.logging
#logger.netty.level = debug
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,4 @@ void Publish_EmptyMetrics() {
assertThat(loggedEvents()).hasSize(4);
}




}
Loading

0 comments on commit 9e32782

Please sign in to comment.