Skip to content

Commit

Permalink
feat: Add connector, define data model
Browse files Browse the repository at this point in the history
  • Loading branch information
tzebrowski committed Jan 26, 2024
1 parent 0cc7cfb commit 0cdde06
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 10 deletions.
29 changes: 23 additions & 6 deletions src/main/java/org/obd/metrics/command/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
**/
package org.obd.metrics.command;

import java.nio.ByteBuffer;
import java.util.UUID;

import lombok.EqualsAndHashCode;
Expand All @@ -40,25 +41,41 @@ public abstract class Command {

@Getter
protected final String mode;

@Getter
protected final String canMode;



protected Command(final String query, final String mode, final String label) {
this(query,mode,label,"");
this(query, mode, label, "");
}


protected Command(final byte[] canId, final byte[] data) {
this.data = merge(canId, data).array();
this.query = new String(this.data);
this.label = null;
this.mode = null;
this.canMode = null;
}

protected Command(final String query, final String mode, final String label, final String canMode) {
this.query = query;
this.label = label;
this.mode = mode;
this.data = (query + "\r").getBytes();
this.data = (query).getBytes();
this.canMode = canMode;
}

@Override
public String toString() {
return "[query=" + query + "]";
}

private ByteBuffer merge(final byte[] canId, final byte[] data) {
final byte[] allByteArray = new byte[canId.length + 1 + data.length];
final ByteBuffer buff = ByteBuffer.wrap(allByteArray);
buff.put(canId);
buff.put((byte) data.length);
buff.put(data);
return buff;
}
}
33 changes: 33 additions & 0 deletions src/main/java/org/obd/metrics/command/obd/CannelloniCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright 2019-2024, Tomasz Żebrowski
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.obd.metrics.command.obd;

import org.obd.metrics.command.Command;

public final class CannelloniCommand extends Command {
private final static String INIT_MESSAGE = "CANNELLONIv1";

public CannelloniCommand() {
super(INIT_MESSAGE, null, null);
}

public CannelloniCommand(final byte[] canId, final byte[] data) {
super(canId, data);
}
}
194 changes: 194 additions & 0 deletions src/main/java/org/obd/metrics/transport/CannelloniConnector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/**
* Copyright 2019-2024, Tomasz Żebrowski
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.obd.metrics.transport;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;

import org.obd.metrics.api.model.Adjustments;
import org.obd.metrics.command.Command;
import org.obd.metrics.command.obd.CannelloniCommand;
import org.obd.metrics.transport.message.ConnectorResponse;
import org.obd.metrics.transport.message.ConnectorResponseFactory;

import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
final class CannelloniConnector implements Connector {

private static final char NEXT_MESSAGE_SIGNAL = '\n';
private static final ConnectorResponse EMPTY_MESSAGE = ConnectorResponseFactory.wrap(new byte[] {}, 0, 0);

@Getter
private boolean faulty;

@NonNull
private OutputStream out;

@NonNull
private InputStream in;

@NonNull
private final AdapterConnection connection;
private final Adjustments adjustments;

private final byte[] buffer = new byte[BUFFER_SIZE];
private long tts = 0;
private boolean closed = false;

CannelloniConnector(final AdapterConnection connection, final Adjustments adjustments) throws IOException {
this.connection = connection;
this.adjustments = adjustments;
this.out = connection.openOutputStream();
this.in = connection.openInputStream();
reset();
}

@Override
public void close() {
log.info("Closing streams.");
closed = true;
faulty = false;
try {
if (out != null) {
out.close();
out = null;
}
} catch (final IOException e) {
}
try {
if (in != null) {
in.close();
in = null;
}
} catch (final IOException e) {
}

try {
connection.close();
} catch (final IOException e) {
}

}

@Override
public synchronized void transmit(@NonNull final Command command) {
tts = System.currentTimeMillis();
if (isFaulty()) {
log.warn("Previous IO failed. Cannot perform another IO operation");
} else {
try {
if (adjustments != null && adjustments.isDebugEnabled()) {
if ( command instanceof CannelloniCommand) {
log.info("TX: {}", printMessage((CannelloniCommand) command));
} else {
log.info("TX: {}", command.getQuery());
}
}
out.write(command.getData());
} catch (final IOException e) {
log.error("Failed to transmit command: {}", command, e);
reconnect();
}
}
}

@Override
public synchronized ConnectorResponse receive() {
if (isFaulty()) {
log.warn("Previous IO failed. Cannot perform another IO operation");
} else {
try {
if (in != null) {
short cnt = 0;
int nextByte;
char characterRead;

while ((nextByte = in.read()) > -1 && (characterRead = (char) nextByte) != NEXT_MESSAGE_SIGNAL
&& cnt != buffer.length) {
if (Characters.isCharacterAllowed(characterRead)) {
buffer[cnt++] = (byte) Character.toUpperCase(characterRead);
}
// CANNELLONIv1
if (buffer[0] == 'C' && buffer[1] == 'A' && buffer[2] == 'N' && buffer[3] == 'N'
&& buffer[4] == 'E' && buffer[5] == 'L' && buffer[6] == 'L' && buffer[7] == 'O'
&& buffer[8] == 'N' && buffer[9] == 'I' && buffer[10] == 'V' && buffer[11] == '1') {
break;
}
}

short start = 0;
if ((char) buffer[0] == 'S' && (char) buffer[1] == 'E' && (char) buffer[2] == 'A'
&& (char) buffer[3] == 'R') {
// SEARCHING...
start = 12;
cnt = (short) (cnt - start);
}

final ConnectorResponse response = ConnectorResponseFactory.wrap(buffer, start, start + cnt);

reset();

tts = System.currentTimeMillis() - tts;
if (adjustments != null && adjustments.isDebugEnabled()) {
log.info("RX: {}, processing time: {}ms", response.getMessage(), tts);
}

return response;
}
} catch (final IOException e) {
log.error("Failed to receive data", e);
reconnect();
}
}
return EMPTY_MESSAGE;
}

void reconnect() {
if (closed) {
log.error("Connection is closed. Do not try to reconnect.");
} else {
log.error("Connection is broken. Reconnecting...");
try {
connection.reconnect();
in = connection.openInputStream();
out = connection.openOutputStream();
faulty = false;
} catch (final IOException e) {
faulty = true;
}
}
}

private void reset() {
Arrays.fill(buffer, 0, buffer.length, (byte) 0);
}

private String printMessage(CannelloniCommand message) {
final StringBuilder buffer = new StringBuilder();
for (byte b : message.getData()) {
buffer.append(String.format("%02X ", b));
}
return buffer.toString();
}
}
13 changes: 11 additions & 2 deletions src/main/java/org/obd/metrics/transport/Connector.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import lombok.Builder;

public interface Connector extends Closeable, Service {
public static enum Type {
CANNELLONI, STREAM
}

static final int BUFFER_SIZE = 2 * 96;

boolean isFaulty();
Expand All @@ -38,8 +42,13 @@ public interface Connector extends Closeable, Service {
ConnectorResponse receive();

@Builder
static Connector create(final AdapterConnection connection, final Adjustments adjustments) throws IOException {
static Connector create(final AdapterConnection connection, final Adjustments adjustments, final Type type)
throws IOException {
connection.connect();
return new StreamConnector(connection, adjustments);
if (type == null || type == Type.STREAM) {
return new StreamConnector(connection, adjustments);
} else {
return new CannelloniConnector(connection, adjustments);
}
}
}
6 changes: 5 additions & 1 deletion src/main/java/org/obd/metrics/transport/StreamConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ public synchronized void transmit(@NonNull final Command command) {
log.info("TX: {}", command.getQuery());
}
if (out != null) {
out.write(command.getData());
for (byte b : command.getData()) {
System.out.println(("b: " + (b & 0xFF)));
out.write(b & 0xFF);
}

}
} catch (final IOException e) {
log.error("Failed to transmit command: {}", command, e);
Expand Down
59 changes: 59 additions & 0 deletions src/test/java/org/obd/metrics/api/integration/Med17Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,65 @@ public class Med17Test {
// [01, 03, 04, 05, 06, 07, 0b, 0c, 0d, 0e, 0f, 10, 11, 13, 15, 1c],
// raw=4100be3fa811]


public static void main(String[] args) throws InterruptedException, IOException {
final AdapterConnection connection = BluetoothConnection.openConnection();

final DataCollector collector = new DataCollector();

int commandFrequency = 6;
final Workflow workflow = Workflow
.instance()
.pids(Pids.DEFAULT)
.observer(collector)
.initialize();

final PIDsRegistry registry = PIDsRegistryFactory.get("mode01.json");
final Query query = Query.builder()
.pid(registry.findBy("15").getId())
.pid(registry.findBy("0D").getId())
.build();

final Adjustments optional = Adjustments
.builder()
.debugEnabled(Boolean.TRUE)
.stNxx(STNxxExtensions.builder().enabled(Boolean.FALSE).build())
.vehicleCapabilitiesReadingEnabled(Boolean.TRUE)
.adaptiveTimeoutPolicy(AdaptiveTimeoutPolicy
.builder()
.enabled(Boolean.TRUE)
.checkInterval(1)
.commandFrequency(commandFrequency)
.build())
.producerPolicy(ProducerPolicy.builder()
.priorityQueueEnabled(Boolean.TRUE)
.build())
.cachePolicy(CachePolicy.builder().resultCacheEnabled(false).build())
.batchPolicy(BatchPolicy.builder().enabled(Boolean.TRUE).build())
.build();

final Init init = Init.builder()
.header(Header.builder().header("7DF").mode("01").build())
.delayAfterInit(0)
.protocol(Protocol.CAN_11)
.sequence(DefaultCommandGroup.INIT)
.build();

workflow.start(connection, query, init, optional);

WorkflowFinalizer.finalizeAfter(workflow, 15000, () -> false);

final PidDefinitionRegistry rpm = workflow.getPidRegistry();

PidDefinition measuredPID = rpm.findBy(13l);
double ratePerSec = workflow.getDiagnostics().rate().findBy(RateType.MEAN, measuredPID).get().getValue();

log.info("Rate:{} -> {}", measuredPID.getPid(), ratePerSec);

Assertions.assertThat(ratePerSec).isGreaterThanOrEqualTo(commandFrequency);
}


@Test
public void stnTest() throws IOException, InterruptedException, ExecutionException {
final Logger logger = (Logger) LoggerFactory.getLogger("org.obd.metrics.transport.DefaultConnector");
Expand Down
Loading

0 comments on commit 0cdde06

Please sign in to comment.