From cc48613d07127e873396d17e8f88f0cdec9fd10b Mon Sep 17 00:00:00 2001 From: Samantha Chan Date: Tue, 10 Oct 2017 18:14:28 +0000 Subject: [PATCH 1/3] Add service to calculate R peaks using CEP operator --- .../.classpath | 12 +++ .../.project | 29 ++++++ ...cation.RPeakDetectCep-BuildConfig.splbuild | 25 +++++ .../.settings/org.eclipse.core.runtime.prefs | 2 + .../.settings/org.eclipse.jdt.apt.core.prefs | 3 + .../.settings/org.eclipse.jdt.core.prefs | 2 + .../application/.namespace | 0 .../application/RPeakDetectCep.spl | 96 +++++++++++++++++++ .../info.xml | 19 ++++ 9 files changed, 188 insertions(+) create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.classpath create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.project create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/application.RPeakDetectCep-BuildConfig.splbuild create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.core.runtime.prefs create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.jdt.apt.core.prefs create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.jdt.core.prefs create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/.namespace create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/RPeakDetectCep.spl create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/info.xml diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.classpath b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.classpath new file mode 100644 index 0000000..cbb0e4c --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.classpath @@ -0,0 +1,12 @@ + + + + + + + + + + + + diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.project b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.project new file mode 100644 index 0000000..b4b89c1 --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.project @@ -0,0 +1,29 @@ + + + com.ibm.streamsx.health.analyze.rpeak.cep + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.xtext.ui.shared.xtextBuilder + + + + + com.ibm.streams.studio.splproject.builder.SPLProjectBuilder + + + + + + org.eclipse.xtext.ui.shared.xtextNature + com.ibm.streams.studio.splproject.SPLProjectNature + org.eclipse.jdt.core.javanature + + diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/application.RPeakDetectCep-BuildConfig.splbuild b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/application.RPeakDetectCep-BuildConfig.splbuild new file mode 100644 index 0000000..1ce3540 --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/application.RPeakDetectCep-BuildConfig.splbuild @@ -0,0 +1,25 @@ + + + +SPL Build Configuration: BuildConfig +F +T +FDEF +application::RPeakDetectCep +BuildConfig + + +T + + + + +F +T + +F + +BuildConfig + +F + \ No newline at end of file diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.core.runtime.prefs b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.core.runtime.prefs new file mode 100644 index 0000000..5a0ad22 --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.core.runtime.prefs @@ -0,0 +1,2 @@ +eclipse.preferences.version=1 +line.separator=\n diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.jdt.apt.core.prefs b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.jdt.apt.core.prefs new file mode 100644 index 0000000..7b0c050 --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.jdt.apt.core.prefs @@ -0,0 +1,3 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.apt.aptEnabled=true +org.eclipse.jdt.apt.reconcileEnabled=false diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.jdt.core.prefs b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..0b3561a --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,2 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.processAnnotations=enabled diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/.namespace b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/.namespace new file mode 100644 index 0000000..e69de29 diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/RPeakDetectCep.spl b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/RPeakDetectCep.spl new file mode 100644 index 0000000..0582045 --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/RPeakDetectCep.spl @@ -0,0 +1,96 @@ +namespace application ; + +use com.ibm.streamsx.health.ingest.connector::IngestSubscribe ; +use com.ibm.streamsx.health.ingest.types::* ; +use com.ibm.streams.cep::MatchRegex ; +use com.ibm.streamsx.topology.topic::Publish ; +use com.ibm.streamsx.json::TupleToJSON ; +type PeakEvent = rstring patientId, int64 ts, float64 data, float64 max, + float64 min, int32 count, list eventData, list eventTs ; +type RPeakEvent = rstring patientId, int64 ts, float64 data ; +type RREvent = rstring patientId, int64 rrInterval, list events ; + +composite RPeakDetectCep +{ + param + expression $subTopic : getSubmissionTimeValue("topic") ; + expression $pubTopicRPeak : "analyze/rpeak/cep/r" ; + expression $pubTopicRR : "analyze/rpeak/cep/rr" ; + expression $signal : getSubmissionTimeValue("readingCode", "ECG") ; + expression $peakThreshold : (float64)getSubmissionTimeValue("peakThreshold", "0.8") ; + graph + (stream IngestSubscribe_1_out0) as IngestSubscribe_1 = + IngestSubscribe() + { + param + topic : $subTopic ; + } + + (stream MatchRegex_2_out0) as MatchRegex_2 = + MatchRegex(Functor_3_out0 as I0) + { + param + pattern : ". rise+ drop drop drop" ; + predicates : { rise = data >= Last(data), drop = data < Last(data) } ; + partitionBy : I0.patientId ; + output + MatchRegex_2_out0 : max = Max(data), min = Min(data), count = Count(), + eventData = Collect(data), eventTs = Collect(ts) ; + } + + (stream Functor_3_out0) as + Functor_3 = Functor(IngestSubscribe_1_out0 as I0) + { + param + filter : I0.reading.readingType.code == $signal ; + output + Functor_3_out0 : patientId = I0.patientId, ts = I0.reading.ts, data = + I0.reading.value ; + } + + () as Publish_4 = Publish(TupleToJSON_5_out0 as inPort0Alias) + { + param + topic : $pubTopicRPeak ; + } + + (stream TupleToJSON_5_out0) as TupleToJSON_5 = + TupleToJSON(RPeakEvents as inPort0Alias) + { + } + + (stream RPeakEvents) as Custom_6 = Custom(MatchRegex_2_out0 as + I0) + { + logic + onTuple I0 : if(max - eventData [ 0 ] > $peakThreshold) + { + submit({ patientId = patientId, ts = eventTs [ size(eventTs) - 4 ], data = + max }, RPeakEvents) ; + } + + } + + (stream RREvents) as Aggregate_7 = Aggregate(RPeakEvents as I0) + { + window + I0 : sliding, count(2), count(1), partitioned ; + param + partitionBy: I0.patientId; + output + RREvents : rrInterval = I0 [ 0 ].ts - I0 [ 1 ].ts, events = Collect(I0) ; + } + + () as Publish_8 = Publish(TupleToJSONRR_out0) + { + param + topic : $pubTopicRR ; + } + + (stream TupleToJSONRR_out0) as TupleToJSONRR = + TupleToJSON(RREvents as inPort0Alias) + { + } + +} + diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/info.xml b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/info.xml new file mode 100644 index 0000000..27abafb --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/info.xml @@ -0,0 +1,19 @@ + + + + com.ibm.streamsx.health.analyze.rpeak.cep + + 1.0.0 + 4.2.1.1 + + + + com.ibm.streams.cep + 2.1.0 + + + com.ibm.streamsx.health.ingest + [1.1.0,2.0.0) + + + \ No newline at end of file From 580d28191b29b1b1020245328ea71f491941f4dc Mon Sep 17 00:00:00 2001 From: Samantha Chan Date: Tue, 10 Oct 2017 19:35:48 +0000 Subject: [PATCH 2/3] Create Example Service --- .../.classpath | 14 ++ .../.project | 29 ++++ .../.settings/org.eclipse.core.runtime.prefs | 2 + .../.settings/org.eclipse.jdt.apt.core.prefs | 3 + .../.settings/org.eclipse.jdt.core.prefs | 2 + .../README.md | 44 +++++ .../build.gradle | 84 ++++++++++ .../example/service/AbstractService.java | 153 ++++++++++++++++++ .../example/service/ExampleHealthService.java | 33 ++++ .../example/service/IServiceConstants.java | 15 ++ .../info.xml | 15 ++ .../service.properties | 19 +++ 12 files changed, 413 insertions(+) create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/.classpath create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/.project create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.core.runtime.prefs create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.jdt.apt.core.prefs create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.jdt.core.prefs create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/README.md create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/build.gradle create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/AbstractService.java create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/ExampleHealthService.java create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/IServiceConstants.java create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/info.xml create mode 100644 samples/ExampleService/com.ibm.streamsx.health.example.service/service.properties diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/.classpath b/samples/ExampleService/com.ibm.streamsx.health.example.service/.classpath new file mode 100644 index 0000000..58a12be --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/.classpath @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/.project b/samples/ExampleService/com.ibm.streamsx.health.example.service/.project new file mode 100644 index 0000000..d5ebc01 --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/.project @@ -0,0 +1,29 @@ + + + com.ibm.streamsx.health.example.service + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.xtext.ui.shared.xtextBuilder + + + + + com.ibm.streams.studio.splproject.builder.SPLProjectBuilder + + + + + + org.eclipse.xtext.ui.shared.xtextNature + com.ibm.streams.studio.splproject.SPLProjectNature + org.eclipse.jdt.core.javanature + + diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.core.runtime.prefs b/samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.core.runtime.prefs new file mode 100644 index 0000000..5a0ad22 --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.core.runtime.prefs @@ -0,0 +1,2 @@ +eclipse.preferences.version=1 +line.separator=\n diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.jdt.apt.core.prefs b/samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.jdt.apt.core.prefs new file mode 100644 index 0000000..7b0c050 --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.jdt.apt.core.prefs @@ -0,0 +1,3 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.apt.aptEnabled=true +org.eclipse.jdt.apt.reconcileEnabled=false diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.jdt.core.prefs b/samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..0b3561a --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,2 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.processAnnotations=enabled diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/README.md b/samples/ExampleService/com.ibm.streamsx.health.example.service/README.md new file mode 100644 index 0000000..57a3c71 --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/README.md @@ -0,0 +1,44 @@ +# Streams Healthcare Example Service + +This project demonstrates how one can create a service to work with Streams Healthcare Analtyics paltform. + +## Directory Structure + +Service has this directory structure: + +* info.xml - toolkit information of the service +* build.gradle - gradle script for building service +* service.properties - contain properties for customizing the behavior of the service +* impl/java/src - java source code of one or more services +* opt/lib - third-party libraries required by the services + +## Service + +The service is written in Java, using the Streams Java Application API. A service should adhere to the following guidelines: + +* The service class should reside in a package with a *.service suffix. (e.g. com.ibm.streamsx.health.example.service) +* The service class should be named with at `Service` suffix. (e.g. ExampleHealthService) +* The service class should subclass from AbstractService. Abstract service provides infrastructure support: + * Handling of properties files for service customization + * Adding third-party libraries (*.jar) from opt/lib directory to the application bundle + * Handling of submission time parameters + * Handling of job submission in different streams context +* Customization of a service should be done in a service.properties file. All services should support the following properties. These properties are handled by AbstractService by default. + * debug + * streamscontext + * vmargs +* The service class is responsible for the following: + * Creation of a topology. This example is written purely in Java. But this can be used to wrapper a service written in SPL. + * Define additional properties a service may support and handling of the properties. + +## Building a Service + +A service is to be built using the build.gradle script. The script is set up to build any Java code residing in the impl/java/src directory. +If a service require a third-party library, client is responsible to define these libraries in the build script. The dependencies will be downloaded +by the build script and stored in the opt/lib directory. + +## Executing a Service + +build.gradle is set up to execute the service, using the **execute** target. +Clients are expected to configure this target to identify the main class and jar file tor running the service. +Clients may also define additional execute targets if the project contains more than one service. \ No newline at end of file diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/build.gradle b/samples/ExampleService/com.ibm.streamsx.health.example.service/build.gradle new file mode 100644 index 0000000..f173c7a --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/build.gradle @@ -0,0 +1,84 @@ +apply plugin: 'java' +apply plugin: 'eclipse' + +sourceSets { + main { + java { + srcDirs = ['impl/java/src'] + } + resources { + srcDirs = ['etc'] + } + } +} + +ext { + STREAMS_INSTALL = System.getenv('STREAMS_INSTALL') +} + + +repositories { + mavenCentral() + + flatDir { + dirs STREAMS_INSTALL+'/lib',STREAMS_INSTALL+'/ext/lib',STREAMS_INSTALL+'/toolkits/com.ibm.streamsx.topology/lib' + } +} + + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 + +dependencies { + // compile any project that this service may need + compile project(':ingest:common:com.ibm.streamsx.health.ingest') + + // Streams dependencies + compile name: 'com.ibm.streams.operator' + compile name: 'com.ibm.streams.operator.samples' + compile name: 'com.ibm.streamsx.topology' + compile name: 'JSON4J' + compile name: 'commons-math-2.2' +} + +task copyLib(type: Copy) { + into "$projectDir/release/opt/lib" + from configurations.runtime +} + +task copyJar(type: Copy) { + into "$projectDir/release" + from "$projectDir/build/libs/" + include '**/*.jar' +} + +task rmRelease(type: Delete) { + delete 'release' +} + +task copyLibDev(type: Copy) { + into "$projectDir/opt/lib" + from configurations.runtime +} + +task deps << { + tasks.copyLibDev.execute() + tasks.copyLib.execute() +} + + +jar.doLast { + tasks.copyLib.execute() + tasks.copyJar.execute() +} + +clean.doLast{ + tasks.rmRelease.execute() +} + +task execute(type: JavaExec) { + main='com.ibm.streamsx.health.example.service.ExampleHealthService' + classpath = configurations.runtime + files("$buildDir/libs/com.ibm.streamsx.health.example.service.jar") +} + + diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/AbstractService.java b/samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/AbstractService.java new file mode 100644 index 0000000..933f78b --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/AbstractService.java @@ -0,0 +1,153 @@ +package com.ibm.streamsx.health.example.service; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.log4j.Logger; + +import com.ibm.streams.operator.logging.TraceLevel; +import com.ibm.streamsx.topology.Topology; +import com.ibm.streamsx.topology.context.ContextProperties; +import com.ibm.streamsx.topology.context.StreamsContextFactory; + +public abstract class AbstractService implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private Properties properties; + + protected static Logger TRACE = Logger.getLogger(AbstractService.class); + + public AbstractService() { + + initializeService(); + } + + protected void initializeService() { + // Properties is serializable. This code is called twice. + // At the first time, when the Java application is run, it is called locally to generate + // the SPL file. At this time, the properties file should be accessible + // and we read the data into a properties object. + // At the second time, the Java object is being deserialized. At this time, the properties + // object is sent to the remote host as part of the seraizlied object. We do not need + // to read the file again and the properties attributes will still be available. + if (properties == null) + properties = readProperties(); + } + + protected Properties readProperties() + { + try { + FileInputStream iStream = new FileInputStream("service.properties"); + Properties properties = new Properties(); + properties.load(iStream); + iStream.close(); + + return properties; + + } catch (FileNotFoundException e) { + TRACE.error("Unable to read patientIds.properties"); + } catch (IOException e) { + TRACE.error("Unable to read patientIds.properties"); + } + return null; + } + + public Properties getProperties() { + return properties; + } + + protected void addDependencies(Topology topology) { + + File dir = new File("opt/lib"); + + // only include the required Jar files in classpath + // exclude any IBM Streams jar files and topology jar files + File[] jarFiles = dir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.endsWith("jar") && (name.indexOf("streams.") == -1) + && (name.indexOf("com.ibm.streamsx.topology") == -1); + } + }); + + if (jarFiles != null) { + for (File file : jarFiles) { + topology.addJarDependency(file.getAbsolutePath()); + } + } + } + + protected boolean isDebug() + { + String property = getProperties().getProperty(IServiceConstants.KEY_DEBUG); + if (property!=null && !property.isEmpty()) + { + return Boolean.valueOf(property); + } + return false; + } + + protected String getStreamsContext() { + String property = getProperties().getProperty(IServiceConstants.KEY_STREAMS_CONTEXT); + if (property !=null && !property.isEmpty()) + { + return property.toUpperCase(); + } + return "DISTRIBUTED"; + } + + protected String getVmArgs() + { + return getProperties().getProperty(IServiceConstants.KEY_VMARGS); + } + + protected void submit(Topology topology) { + try { + Map subProperties = new HashMap<>(); + + String vmArgs = getVmArgs(); + + if (vmArgs != null && !vmArgs.isEmpty()) { + // Add addition VM Arguments as specified in properties file + subProperties.put(ContextProperties.VMARGS, vmArgs); + } + + if (isDebug()) + subProperties.put(ContextProperties.TRACING_LEVEL, TraceLevel.DEBUG); + + addSubmssionTimeParams(subProperties); + + StreamsContextFactory.getStreamsContext(getStreamsContext()).submit(topology, subProperties); + } catch (Exception e) { + TRACE.error("Unable to submit topology", e); + } + } + + protected Map addSubmssionTimeParams(Map params) { + return params; + } + + protected void run() { + + Topology topo = createTopology(); + addDependencies(topo); + submit(topo); + + } + + protected abstract Topology createTopology(); + + + +} diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/ExampleHealthService.java b/samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/ExampleHealthService.java new file mode 100644 index 0000000..48b13b0 --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/ExampleHealthService.java @@ -0,0 +1,33 @@ +package com.ibm.streamsx.health.example.service; + +import com.ibm.streamsx.health.ingest.types.connector.SubscribeConnector; +import com.ibm.streamsx.health.ingest.types.model.Observation; +import com.ibm.streamsx.topology.TStream; +import com.ibm.streamsx.topology.Topology; + +public class ExampleHealthService extends AbstractService{ + + /** + * + */ + private static final long serialVersionUID = 1L; + + public static void main(String[] args) { + ExampleHealthService service = new ExampleHealthService(); + service.run(); + } + + @Override + protected Topology createTopology() { + + Topology topo = new Topology("ExampleHealthService"); + + String topic = getProperties().getProperty("topic"); + TStream data = SubscribeConnector.subscribe(topo, topic); + + data.print(); + + return topo; + } + +} diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/IServiceConstants.java b/samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/IServiceConstants.java new file mode 100644 index 0000000..0cbf22f --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/impl/java/src/com/ibm/streamsx/health/example/service/IServiceConstants.java @@ -0,0 +1,15 @@ +//******************************************************************************* +//* Copyright (C) 2017 International Business Machines Corporation +//* All Rights Reserved +//******************************************************************************* + +package com.ibm.streamsx.health.example.service; + +public interface IServiceConstants { + + // Keys in properties file + String KEY_DEBUG = "debug"; + String KEY_STREAMS_CONTEXT = "streamscontext"; + String KEY_VMARGS = "vmargs"; + +} diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/info.xml b/samples/ExampleService/com.ibm.streamsx.health.example.service/info.xml new file mode 100644 index 0000000..3b428ee --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/info.xml @@ -0,0 +1,15 @@ + + + + com.ibm.streamsx.health.example.service + + 1.0.0 + 4.2.1.1 + + + + com.ibm.streamsx.health.ingest + [1.1.0,2.0.0) + + + \ No newline at end of file diff --git a/samples/ExampleService/com.ibm.streamsx.health.example.service/service.properties b/samples/ExampleService/com.ibm.streamsx.health.example.service/service.properties new file mode 100644 index 0000000..8633431 --- /dev/null +++ b/samples/ExampleService/com.ibm.streamsx.health.example.service/service.properties @@ -0,0 +1,19 @@ +# Example property for service +# Topic to subscribe data from +topic=ingest-beacon + +# turn on debugging of service +# - submit the job with logging level set to DEBUG +# - add more print statements to facilitate debugging +debug= + +# Streams Context to submit job for +# Possible values: EMBEDDED, STANDALONE, BUNDLE, DISTRIBUTED, default: DISTRIBUTED +streamscontext=DISTRIBUTED + +# Add VM arguments to JVM, default is empty +# The following is the example vm args to enable debugging for the service +# This argument enable the JVM debug engine, allowing debug client to attach to the process remotely +#vmargs=-agentlib:jdwp=transport=dt_socket,suspend=y,server=y,address=127.0.0.1:7777 +vmargs= + From 407f5e2d27cbc18f941ba4fee30c168af2878c05 Mon Sep 17 00:00:00 2001 From: Samantha Chan Date: Tue, 10 Oct 2017 21:47:18 +0000 Subject: [PATCH 3/3] Enable Cep RPeak Detect as a microservice --- .../.classpath | 1 + ...ervice.RPeakDetectCep-BuildConfig.splbuild | 25 +++ .../application/RPeakDetectCep.spl | 96 ----------- .../build.gradle | 95 +++++++++++ .../.namespace | 0 .../RPeakDetectCep.spl | 111 ++++++++++++ .../rpeak/cep/service/AbstractService.java | 158 ++++++++++++++++++ .../rpeak/cep/service/IServiceConstants.java | 21 +++ .../cep/service/RPeakDetectCepService.java | 70 ++++++++ .../service.properties | 25 +++ 10 files changed, 506 insertions(+), 96 deletions(-) create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/com.ibm.streamsx.health.analyze.rpeak.cep.service.RPeakDetectCep-BuildConfig.splbuild delete mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/RPeakDetectCep.spl create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/build.gradle rename analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/{application => com.ibm.streamsx.health.analyze.rpeak.cep.service}/.namespace (100%) create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/com.ibm.streamsx.health.analyze.rpeak.cep.service/RPeakDetectCep.spl create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/AbstractService.java create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/IServiceConstants.java create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/RPeakDetectCepService.java create mode 100644 analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/service.properties diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.classpath b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.classpath index cbb0e4c..1680bbe 100644 --- a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.classpath +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.classpath @@ -8,5 +8,6 @@ + diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/com.ibm.streamsx.health.analyze.rpeak.cep.service.RPeakDetectCep-BuildConfig.splbuild b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/com.ibm.streamsx.health.analyze.rpeak.cep.service.RPeakDetectCep-BuildConfig.splbuild new file mode 100644 index 0000000..e7abd06 --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/.settings/com.ibm.streamsx.health.analyze.rpeak.cep.service.RPeakDetectCep-BuildConfig.splbuild @@ -0,0 +1,25 @@ + + + +SPL Build Configuration: BuildConfig +F +T +FDEF +com.ibm.streamsx.health.analyze.rpeak.cep.service::RPeakDetectCep +BuildConfig + + +T + + + + +F +T + +F + +BuildConfig + +F + \ No newline at end of file diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/RPeakDetectCep.spl b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/RPeakDetectCep.spl deleted file mode 100644 index 0582045..0000000 --- a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/RPeakDetectCep.spl +++ /dev/null @@ -1,96 +0,0 @@ -namespace application ; - -use com.ibm.streamsx.health.ingest.connector::IngestSubscribe ; -use com.ibm.streamsx.health.ingest.types::* ; -use com.ibm.streams.cep::MatchRegex ; -use com.ibm.streamsx.topology.topic::Publish ; -use com.ibm.streamsx.json::TupleToJSON ; -type PeakEvent = rstring patientId, int64 ts, float64 data, float64 max, - float64 min, int32 count, list eventData, list eventTs ; -type RPeakEvent = rstring patientId, int64 ts, float64 data ; -type RREvent = rstring patientId, int64 rrInterval, list events ; - -composite RPeakDetectCep -{ - param - expression $subTopic : getSubmissionTimeValue("topic") ; - expression $pubTopicRPeak : "analyze/rpeak/cep/r" ; - expression $pubTopicRR : "analyze/rpeak/cep/rr" ; - expression $signal : getSubmissionTimeValue("readingCode", "ECG") ; - expression $peakThreshold : (float64)getSubmissionTimeValue("peakThreshold", "0.8") ; - graph - (stream IngestSubscribe_1_out0) as IngestSubscribe_1 = - IngestSubscribe() - { - param - topic : $subTopic ; - } - - (stream MatchRegex_2_out0) as MatchRegex_2 = - MatchRegex(Functor_3_out0 as I0) - { - param - pattern : ". rise+ drop drop drop" ; - predicates : { rise = data >= Last(data), drop = data < Last(data) } ; - partitionBy : I0.patientId ; - output - MatchRegex_2_out0 : max = Max(data), min = Min(data), count = Count(), - eventData = Collect(data), eventTs = Collect(ts) ; - } - - (stream Functor_3_out0) as - Functor_3 = Functor(IngestSubscribe_1_out0 as I0) - { - param - filter : I0.reading.readingType.code == $signal ; - output - Functor_3_out0 : patientId = I0.patientId, ts = I0.reading.ts, data = - I0.reading.value ; - } - - () as Publish_4 = Publish(TupleToJSON_5_out0 as inPort0Alias) - { - param - topic : $pubTopicRPeak ; - } - - (stream TupleToJSON_5_out0) as TupleToJSON_5 = - TupleToJSON(RPeakEvents as inPort0Alias) - { - } - - (stream RPeakEvents) as Custom_6 = Custom(MatchRegex_2_out0 as - I0) - { - logic - onTuple I0 : if(max - eventData [ 0 ] > $peakThreshold) - { - submit({ patientId = patientId, ts = eventTs [ size(eventTs) - 4 ], data = - max }, RPeakEvents) ; - } - - } - - (stream RREvents) as Aggregate_7 = Aggregate(RPeakEvents as I0) - { - window - I0 : sliding, count(2), count(1), partitioned ; - param - partitionBy: I0.patientId; - output - RREvents : rrInterval = I0 [ 0 ].ts - I0 [ 1 ].ts, events = Collect(I0) ; - } - - () as Publish_8 = Publish(TupleToJSONRR_out0) - { - param - topic : $pubTopicRR ; - } - - (stream TupleToJSONRR_out0) as TupleToJSONRR = - TupleToJSON(RREvents as inPort0Alias) - { - } - -} - diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/build.gradle b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/build.gradle new file mode 100644 index 0000000..eb147b0 --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/build.gradle @@ -0,0 +1,95 @@ +apply plugin: 'java' +apply plugin: 'eclipse' +apply from: "$rootProject.projectDir/common.gradle" + +sourceSets { + main { + java { + srcDirs = ['impl/java/src'] + } + resources { + srcDirs = ['etc'] + } + } +} + +ext { + STREAMS_INSTALL = System.getenv('STREAMS_INSTALL') +} + + +repositories { + mavenCentral() + + flatDir { + dirs STREAMS_INSTALL+'/lib',STREAMS_INSTALL+'/ext/lib',STREAMS_INSTALL+'/toolkits/com.ibm.streamsx.topology/lib' + } +} + + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 + +dependencies { + // compile any project that this service may need + compile project(':ingest:common:com.ibm.streamsx.health.ingest') + + // Streams dependencies + compile name: 'com.ibm.streams.operator' + compile name: 'com.ibm.streams.operator.samples' + compile name: 'com.ibm.streamsx.topology' + compile name: 'JSON4J' + compile name: 'commons-math-2.2' +} + +task buildToolkit { + doLast { + splMakeToolkit() + } +} + +build.doLast { + tasks.buildToolkit.execute() +} + +task copyLib(type: Copy) { + into "$projectDir/release/opt/lib" + from configurations.runtime +} + +task copyJar(type: Copy) { + into "$projectDir/release" + from "$projectDir/build/libs/" + include '**/*.jar' +} + +task rmRelease(type: Delete) { + delete 'release' +} + +task copyLibDev(type: Copy) { + into "$projectDir/opt/lib" + from configurations.runtime +} + +task deps << { + tasks.copyLibDev.execute() + tasks.copyLib.execute() +} + + +jar.doLast { + tasks.copyLib.execute() + tasks.copyJar.execute() +} + +clean.doLast{ + tasks.rmRelease.execute() +} + +task execute(type: JavaExec) { + main='com.ibm.streamsx.health.analyze.rpeak.cep.service.RPeakDetectCepService' + classpath = configurations.runtime + files("$buildDir/libs/com.ibm.streamsx.health.analyze.rpeak.cep.jar") +} + + diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/.namespace b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/com.ibm.streamsx.health.analyze.rpeak.cep.service/.namespace similarity index 100% rename from analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/application/.namespace rename to analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/com.ibm.streamsx.health.analyze.rpeak.cep.service/.namespace diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/com.ibm.streamsx.health.analyze.rpeak.cep.service/RPeakDetectCep.spl b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/com.ibm.streamsx.health.analyze.rpeak.cep.service/RPeakDetectCep.spl new file mode 100644 index 0000000..b4aac8a --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/com.ibm.streamsx.health.analyze.rpeak.cep.service/RPeakDetectCep.spl @@ -0,0 +1,111 @@ +//******************************************************************************* +//* Copyright (C) 2017 International Business Machines Corporation +//* All Rights Reserved +//******************************************************************************* +namespace com.ibm.streamsx.health.analyze.rpeak.cep.service ; + +use com.ibm.streamsx.health.ingest.connector::IngestSubscribe ; +use com.ibm.streamsx.health.ingest.types::* ; +use com.ibm.streams.cep::MatchRegex ; +use com.ibm.streamsx.topology.topic::Publish ; +use com.ibm.streamsx.json::TupleToJSON ; + + +type PeakEvent = rstring patientId, int64 ts, float64 data, float64 max, + float64 min, int32 count, list eventData, list eventTs ; +type RPeakEvent = rstring patientId, int64 ts, float64 data ; +type RREvent = rstring patientId, int64 rr, list events ; + +composite RPeakDetectCep() +{ + param + expression $subTopic : getSubmissionTimeValue("topic") ; + expression $pubTopicRPeak : "analyze/rpeak/cep/r" ; + expression $pubTopicRR : "analyze/rpeak/cep/rr" ; + expression $readingCode : getSubmissionTimeValue("readingCode") ; + expression $peakThreshold :(float64) + getSubmissionTimeValue("peakThreshold", "0.8") ; + graph + (stream SubscribeToObservations) = + IngestSubscribe() + { + param + topic : $subTopic ; + } + + (stream PeakDetectEvents) = + MatchRegex(ECGSignal as I0) + { + param + pattern : ". rise+ drop drop drop" ; + predicates : { rise = data >= Last(data), drop = data < Last(data) } ; + partitionBy : I0.patientId ; + output + PeakDetectEvents : max = Max(data), min = Min(data), count = Count(), + eventData = Collect(data), eventTs = Collect(ts) ; + } + + (stream ECGSignal) = Functor(SubscribeToObservations as I0) + { + param + filter : I0.reading.readingType.code == $readingCode ; + output + ECGSignal : patientId = I0.patientId, ts = I0.reading.ts, data = + I0.reading.value ; + } + + () as PublishR = Publish(RPeakToJson as inPort0Alias) + { + param + topic : $pubTopicRPeak ; + } + + (stream RPeakToJson)= + TupleToJSON(RPeakEvents) + { + } + + (stream RPeakEvents) = Custom(PeakDetectEvents as + I0) + { + logic + onTuple I0 : if(max - eventData [ 0 ] > $peakThreshold) + { + submit({ patientId = patientId, ts = eventTs [ size(eventTs) - 4 ], data = + max }, RPeakEvents) ; + } + + } + + (stream events> RPeakEventList) = + Aggregate(RPeakEvents as I0) + { + window + I0 : sliding, count(2), count(1), partitioned ; + param + partitionBy : I0.patientId ; + output + RPeakEventList : events = Collect(I0) ; + } + + () as PublishRR = Publish(RRToJson) + { + param + topic : $pubTopicRR ; + } + + (stream RRToJson) = + TupleToJSON(RREvents as inPort0Alias) + { + } + + (stream RREvents) = + Functor(RPeakEventList as I0) + { + output + RREvents: patientId = I0.events[0].patientId, rr=I0.events[1].ts-I0.events[0].ts; + + } + +} + diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/AbstractService.java b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/AbstractService.java new file mode 100644 index 0000000..c2e6ea9 --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/AbstractService.java @@ -0,0 +1,158 @@ +//******************************************************************************* +//* Copyright (C) 2017 International Business Machines Corporation +//* All Rights Reserved +//******************************************************************************* + +package com.ibm.streamsx.health.analyze.rpeak.cep.service; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.log4j.Logger; + +import com.ibm.streams.operator.logging.TraceLevel; +import com.ibm.streamsx.topology.Topology; +import com.ibm.streamsx.topology.context.ContextProperties; +import com.ibm.streamsx.topology.context.StreamsContextFactory; + +public abstract class AbstractService implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private Properties properties; + + protected static Logger TRACE = Logger.getLogger(AbstractService.class); + + public AbstractService() { + + initializeService(); + } + + protected void initializeService() { + // Properties is serializable. This code is called twice. + // At the first time, when the Java application is run, it is called locally to generate + // the SPL file. At this time, the properties file should be accessible + // and we read the data into a properties object. + // At the second time, the Java object is being deserialized. At this time, the properties + // object is sent to the remote host as part of the seraizlied object. We do not need + // to read the file again and the properties attributes will still be available. + if (properties == null) + properties = readProperties(); + } + + protected Properties readProperties() + { + try { + FileInputStream iStream = new FileInputStream("service.properties"); + Properties properties = new Properties(); + properties.load(iStream); + iStream.close(); + + return properties; + + } catch (FileNotFoundException e) { + TRACE.error("Unable to read patientIds.properties"); + } catch (IOException e) { + TRACE.error("Unable to read patientIds.properties"); + } + return null; + } + + public Properties getProperties() { + return properties; + } + + protected void addDependencies(Topology topology) { + + File dir = new File("opt/lib"); + + // only include the required Jar files in classpath + // exclude any IBM Streams jar files and topology jar files + File[] jarFiles = dir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.endsWith("jar") && (name.indexOf("streams.") == -1) + && (name.indexOf("com.ibm.streamsx.topology") == -1); + } + }); + + if (jarFiles != null) { + for (File file : jarFiles) { + topology.addJarDependency(file.getAbsolutePath()); + } + } + } + + protected boolean isDebug() + { + String property = getProperties().getProperty(IServiceConstants.KEY_DEBUG); + if (property!=null && !property.isEmpty()) + { + return Boolean.valueOf(property); + } + return false; + } + + protected String getStreamsContext() { + String property = getProperties().getProperty(IServiceConstants.KEY_STREAMS_CONTEXT); + if (property !=null && !property.isEmpty()) + { + return property.toUpperCase(); + } + return "DISTRIBUTED"; + } + + protected String getVmArgs() + { + return getProperties().getProperty(IServiceConstants.KEY_VMARGS); + } + + protected void submit(Topology topology) { + try { + Map subProperties = new HashMap<>(); + + String vmArgs = getVmArgs(); + + if (vmArgs != null && !vmArgs.isEmpty()) { + // Add addition VM Arguments as specified in properties file + subProperties.put(ContextProperties.VMARGS, vmArgs); + } + + if (isDebug()) + subProperties.put(ContextProperties.TRACING_LEVEL, TraceLevel.DEBUG); + + addSubmssionTimeParams(subProperties); + + StreamsContextFactory.getStreamsContext(getStreamsContext()).submit(topology, subProperties); + } catch (Exception e) { + TRACE.error("Unable to submit topology", e); + } + } + + protected Map addSubmssionTimeParams(Map params) { + return params; + } + + protected void run() { + + Topology topo = createTopology(); + addDependencies(topo); + submit(topo); + + } + + protected abstract Topology createTopology(); + + + +} diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/IServiceConstants.java b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/IServiceConstants.java new file mode 100644 index 0000000..658c383 --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/IServiceConstants.java @@ -0,0 +1,21 @@ +//******************************************************************************* +//* Copyright (C) 2017 International Business Machines Corporation +//* All Rights Reserved +//******************************************************************************* + +package com.ibm.streamsx.health.analyze.rpeak.cep.service; + +public interface IServiceConstants { + + // Keys in properties file + String KEY_DEBUG = "debug"; + String KEY_STREAMS_CONTEXT = "streamscontext"; + String KEY_VMARGS = "vmargs"; + + + // Service properties + String KEY_TOPIC="topic"; + String KEY_READINGCODE="readingCode"; + String KEY_PEAKTHRESHOLD="peakThreshold"; + +} diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/RPeakDetectCepService.java b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/RPeakDetectCepService.java new file mode 100644 index 0000000..a9c3fbf --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/impl/java/src/com/ibm/streamsx/health/analyze/rpeak/cep/service/RPeakDetectCepService.java @@ -0,0 +1,70 @@ +//******************************************************************************* +//* Copyright (C) 2017 International Business Machines Corporation +//* All Rights Reserved +//******************************************************************************* + +package com.ibm.streamsx.health.analyze.rpeak.cep.service; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import com.ibm.streamsx.topology.Topology; +import com.ibm.streamsx.topology.spl.SPL; +import com.ibm.xtq.xslt.runtime.RuntimeError; + +public class RPeakDetectCepService extends AbstractService{ + + /** + * + */ + private static final long serialVersionUID = 1L; + + public static void main(String[] args) { + RPeakDetectCepService service = new RPeakDetectCepService(); + service.run(); + } + + @Override + protected Topology createTopology() { + + Topology topo = new Topology("RPeakDetectCepService"); + + try { + SPL.addToolkit(topo, new File("./")); + SPL.addToolkit(topo, new File("../../../ingest/common/com.ibm.streamsx.health.ingest")); + } catch (IOException e) { + throw new RuntimeError(e); + } + + String topic = getTopic(); + if (topic == null) + throw new RuntimeException("Topic cannot be found in service.properties"); + + String readingCode = getReadingCode(); + Double threshold = getPeakThreshold(); + + Map params = new HashMap<>(); + params.put("subTopic", topic); + params.put("readingCode", readingCode); + params.put("peakThreshold", threshold); + + SPL.invokeOperator(topo, "RPeakCep", "com.ibm.streamsx.health.analyze.rpeak.cep.service::RPeakDetectCep", null, null, params); + + return topo; + } + + private String getTopic() { + return getProperties().getProperty(IServiceConstants.KEY_TOPIC, null); + } + + private Double getPeakThreshold() { + return Double.valueOf(getProperties().getProperty(IServiceConstants.KEY_PEAKTHRESHOLD, "0.8")); + } + + private String getReadingCode() { + return getProperties().getProperty(IServiceConstants.KEY_READINGCODE, "ECG"); + } + +} diff --git a/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/service.properties b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/service.properties new file mode 100644 index 0000000..3570801 --- /dev/null +++ b/analyze/ecg/com.ibm.streamsx.health.analyze.rpeak.cep/service.properties @@ -0,0 +1,25 @@ +# Topic to ingegst data from +topic=ingest-beacon + +# Reading code of the signal to analyze +readingCode=X100-8 + +# The service detects steep jump in the signal. By default a steep jump is defined as +# 0.8V between the min and max value in a window of data where a peak is detected +peakThreshold=0.8 + +# turn on debugging of service +# - submit the job with logging level set to DEBUG +# - add more print statements to facilitate debugging +debug= + +# Streams Context to submit job for +# Possible values: EMBEDDED, STANDALONE, BUNDLE, DISTRIBUTED, default: DISTRIBUTED +streamscontext=DISTRIBUTED + +# Add VM arguments to JVM, default is empty +# The following is the example vm args to enable debugging for the service +# This argument enable the JVM debug engine, allowing debug client to attach to the process remotely +#vmargs=-agentlib:jdwp=transport=dt_socket,suspend=y,server=y,address=127.0.0.1:7777 +vmargs= +