Skip to content

Commit

Permalink
Merge pull request #93 from ExpediaDotCom/iteratorAgeMetricSupport
Browse files Browse the repository at this point in the history
Support for Iterator Age metric with KStreams Timestamp Extractors
  • Loading branch information
Kapil Rastogi authored Jan 31, 2019
2 parents 6dc4624 + 795b48c commit 7930b91
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.streams.processor.TimestampExtractor


class GraphEdgeTimestampExtractor extends TimestampExtractor {
class GraphEdgeTimestampExtractor extends TimestampExtractor with IteratorAgeMetricSupport {
override def extract(consumerRecord: ConsumerRecord[AnyRef, AnyRef], previousTimestamp: Long): Long = {

// sourceTimestamp of GraphEdge in millis
consumerRecord.value().asInstanceOf[GraphEdge].sourceTimestamp
val sourceTimestampMs = consumerRecord.value().asInstanceOf[GraphEdge].sourceTimestamp
updateIteratorAge(sourceTimestampMs)
sourceTimestampMs
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.expedia.www.haystack.commons.kstreams

import com.codahale.metrics.Histogram
import com.expedia.www.haystack.commons.metrics.MetricsSupport

trait IteratorAgeMetricSupport extends MetricsSupport {

val iteratorAge: Histogram = metricRegistry.histogram("kafka.iterator.age.ms")

def updateIteratorAge(timeInMs: Long): Unit = {
iteratorAge.update(System.currentTimeMillis() - timeInMs)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import com.expedia.metrics.MetricData
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.streams.processor.TimestampExtractor

class MetricDataTimestampExtractor extends TimestampExtractor {
class MetricDataTimestampExtractor extends TimestampExtractor with IteratorAgeMetricSupport {

override def extract(record: ConsumerRecord[AnyRef, AnyRef], previousTimestamp: Long): Long = {

//The startTime for metricpoints in computed in seconds and hence multiplying by 1000 to create the epochTimeInMs
record.value().asInstanceOf[MetricData].getTimestamp * 1000
//The startTime for metricData in computed in seconds and hence multiplying by 1000 to create the epochTimeInMs
val metricDataTimestampMs = record.value().asInstanceOf[MetricData].getTimestamp * 1000
updateIteratorAge(metricDataTimestampMs)
metricDataTimestampMs

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import com.expedia.open.tracing.Span
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.streams.processor.TimestampExtractor

class SpanTimestampExtractor extends TimestampExtractor {


class SpanTimestampExtractor extends TimestampExtractor with IteratorAgeMetricSupport {

override def extract(record: ConsumerRecord[AnyRef, AnyRef], previousTimestamp: Long): Long = {

//The startTime for span in computed in microseconds and hence dividing by 1000 to create the epochTimeInMs
record.value().asInstanceOf[Span].getStartTime / 1000

val spanStartTimeMs = record.value().asInstanceOf[Span].getStartTime / 1000
updateIteratorAge(spanStartTimeMs)
spanStartTimeMs
}
}

0 comments on commit 7930b91

Please sign in to comment.