Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package net.heberling.ismart.mqtt;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.*;

public class GatewayMqttClient {

private final IMqttClient client;

public GatewayMqttClient(URI mqttUri, String mqttUser, char[] mqttPassword) {
String publisherId = UUID.randomUUID().toString();
try {
client =
new MqttClient(mqttUri.toString(), publisherId, null) {
@Override
public void close() throws MqttException {
Thread.dumpStack();
disconnect();
super.close(true);
}
};
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
if (mqttUser != null) {
options.setUserName(mqttUser);
}
if (mqttPassword != null) {
options.setPassword(mqttPassword);
}
client.connect(options);
} catch (MqttException e) {
throw new MqttGatewayException("Error initializing mqtt client.", e);
}
}

public void publish(String topic, String message) throws MqttException {
MqttMessage msg = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
msg.setQos(0);
msg.setRetained(false);
client.publish(topic, msg);
}

public void publishRetained(String topic, String message) throws MqttException {
MqttMessage msg = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
msg.setQos(0);
msg.setRetained(true);
client.publish(topic, msg);
}

public void setCallback(MqttCallback callback) {
client.setCallback(callback);
}

public void subscribe(String topic) throws MqttException {
client.subscribe(topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public class MqttGatewayTopics {
public static final String CLIMATE_EXTERIOR_TEMPERATURE = CLIMATE + "/exteriorTemperature";
public static final String CLIMATE_INTERIOR_TEMPERATURE = CLIMATE + "/interiorTemperature";
public static final String CLIMATE_REMOTE_CLIMATE_STATE = CLIMATE + "/remoteClimateState";
public static final String CLIMATE_REMOTE_TEMPERATURE = CLIMATE + "/remoteTemperature";
public static final String DOORS = "doors";
public static final String DOORS_BONNET = DOORS + "/bonnet";
public static final String DOORS_BOOT = DOORS + "/boot";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -53,11 +52,8 @@
import org.bn.annotations.ASN1Enum;
import org.bn.annotations.ASN1Sequence;
import org.bn.coders.IASN1PreparedElement;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
Expand Down Expand Up @@ -165,7 +161,7 @@ public File convert(String value) throws Exception {
split = ",")
private Map<String, String> vinAbrpTokenMap = new HashMap<>();

private IMqttClient client;
private GatewayMqttClient client;

private final Map<String, VehicleHandler> vehicleHandlerMap = new HashMap<>();

Expand All @@ -174,149 +170,129 @@ public File convert(String value) throws Exception {

@Override
public Integer call() throws Exception { // your business logic goes here...
String publisherId = UUID.randomUUID().toString();
try (IMqttClient client =
new MqttClient(mqttUri.toString(), publisherId, null) {

var mqttAccountPrefix = "saic/" + saicUser;

this.client = new GatewayMqttClient(mqttUri, mqttUser, mqttPassword);

client.setCallback(
new MqttCallback() {
@Override
public void close() throws MqttException {
disconnect();
super.close(true);
}
}) {
this.client = client;
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
if (mqttUser != null) {
options.setUserName(mqttUser);
}
if (mqttPassword != null) {
options.setPassword(mqttPassword);
}
client.connect(options);

var mqttAccountPrefix = "saic/" + saicUser;

client.setCallback(
new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
LOGGER.info("Got message for topic {}: {}", topic, message);
final Matcher setValueMatcher =
Pattern.compile(".*/vehicles/([^/]*)/(.*)/set").matcher(topic);
final Matcher configurationValueMatcher =
Pattern.compile(".*/vehicles/([^/]*)/(.*)").matcher(topic);
if (setValueMatcher.matches()) {
new Thread(
() -> {
try {
vehicleHandlerMap
.get(setValueMatcher.group(1))
.handleMQTTCommand(setValueMatcher.group(2), message);
} catch (MqttException e) {
LOGGER.error(
"Could not handle message for topic {}: {}", topic, message, e);
}
})
.start();
} else if (configurationValueMatcher.matches()) {
VehicleState vehicleState =
getVehicleState(mqttAccountPrefix, configurationValueMatcher.group(1));
if (!vehicleState.isComplete()) {
vehicleState.configure(configurationValueMatcher.group(2), message);
}
public void connectionLost(Throwable cause) {}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
LOGGER.info("Got message for topic {}: {}", topic, message);
final Matcher setValueMatcher =
Pattern.compile(".*/vehicles/([^/]*)/(.*)/set").matcher(topic);
final Matcher configurationValueMatcher =
Pattern.compile(".*/vehicles/([^/]*)/(.*)").matcher(topic);
if (setValueMatcher.matches()) {
new Thread(
() -> {
try {
vehicleHandlerMap
.get(setValueMatcher.group(1))
.handleMQTTCommand(setValueMatcher.group(2), message);
} catch (MqttException e) {
LOGGER.error(
"Could not handle message for topic {}: {}", topic, message, e);
}
})
.start();
} else if (configurationValueMatcher.matches()) {
VehicleState vehicleState =
getVehicleState(mqttAccountPrefix, configurationValueMatcher.group(1));
if (!vehicleState.isComplete()) {
vehicleState.configure(configurationValueMatcher.group(2), message);
}
}
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {}
});
@Override
public void deliveryComplete(IMqttDeliveryToken token) {}
});

client.subscribe(mqttAccountPrefix + "/vehicles/+/+/+/set");
client.subscribe(mqttAccountPrefix + "/vehicles/+/+/+/+/set");
client.subscribe(mqttAccountPrefix + "/vehicles/+/" + REFRESH_MODE);
client.subscribe(mqttAccountPrefix + "/vehicles/+/" + REFRESH_PERIOD + "/+");
client.subscribe(mqttAccountPrefix + "/vehicles/+/+/+/set");
client.subscribe(mqttAccountPrefix + "/vehicles/+/+/+/+/set");
client.subscribe(mqttAccountPrefix + "/vehicles/+/" + REFRESH_MODE);
client.subscribe(mqttAccountPrefix + "/vehicles/+/" + REFRESH_PERIOD + "/+");

MessageCoder<MP_UserLoggingInReq> loginRequestMessageCoder =
new MessageCoder<>(MP_UserLoggingInReq.class);

MP_UserLoggingInReq applicationData = new MP_UserLoggingInReq();
applicationData.setPassword(saicPassword);
Message<MP_UserLoggingInReq> loginRequestMessage =
loginRequestMessageCoder.initializeMessage(
"0000000000000000000000000000000000000000000000000#".substring(saicUser.length())
+ saicUser,
null,
null,
"501",
513,
1,
applicationData);

MessageCoder<MP_UserLoggingInReq> loginRequestMessageCoder =
new MessageCoder<>(MP_UserLoggingInReq.class);
String loginRequest = loginRequestMessageCoder.encodeRequest(loginRequestMessage);

MP_UserLoggingInReq applicationData = new MP_UserLoggingInReq();
applicationData.setPassword(saicPassword);
Message<MP_UserLoggingInReq> loginRequestMessage =
loginRequestMessageCoder.initializeMessage(
"0000000000000000000000000000000000000000000000000#".substring(saicUser.length())
+ saicUser,
null,
null,
"501",
513,
1,
applicationData);
LOGGER.debug(toJSON(anonymized(loginRequestMessageCoder, loginRequestMessage)));

String loginRequest = loginRequestMessageCoder.encodeRequest(loginRequestMessage);
String loginResponse = Client.sendRequest(saicUri.resolve("/TAP.Web/ota.mp"), loginRequest);

LOGGER.debug(toJSON(anonymized(loginRequestMessageCoder, loginRequestMessage)));
Message<MP_UserLoggingInResp> loginResponseMessage =
new MessageCoder<>(MP_UserLoggingInResp.class).decodeResponse(loginResponse);

String loginResponse = Client.sendRequest(saicUri.resolve("/TAP.Web/ota.mp"), loginRequest);
// register for all known alarm types (not all might be actually delivered)
for (MP_AlarmSettingType.EnumType type : MP_AlarmSettingType.EnumType.values()) {
registerAlarmMessage(
loginResponseMessage.getBody().getUid(),
loginResponseMessage.getApplicationData().getToken(),
type);
}

Message<MP_UserLoggingInResp> loginResponseMessage =
new MessageCoder<>(MP_UserLoggingInResp.class).decodeResponse(loginResponse);
LOGGER.debug(
toJSON(anonymized(new MessageCoder<>(MP_UserLoggingInResp.class), loginResponseMessage)));
List<Future<?>> futures =
loginResponseMessage.getApplicationData().getVinList().stream()
.map(
vin -> {
VehicleHandler handler =
new VehicleHandler(
this,
client,
saicUri,
loginResponseMessage.getBody().getUid(),
loginResponseMessage.getApplicationData().getToken(),
mqttAccountPrefix,
vin,
getVehicleState(mqttAccountPrefix, vin.getVin()));
vehicleHandlerMap.put(vin.getVin(), handler);
return handler;
})
.map(
handler ->
(Callable<Object>)
() -> {
handler.handleVehicle();
return null;
})
.map(Executors.newSingleThreadExecutor()::submit)
.collect(Collectors.toList());

// register for all known alarm types (not all might be actually delivered)
for (MP_AlarmSettingType.EnumType type : MP_AlarmSettingType.EnumType.values()) {
registerAlarmMessage(
ScheduledFuture<?> pollingJob =
createMessagePoller(
loginResponseMessage.getBody().getUid(),
loginResponseMessage.getApplicationData().getToken(),
type);
}
mqttAccountPrefix);

LOGGER.debug(
toJSON(anonymized(new MessageCoder<>(MP_UserLoggingInResp.class), loginResponseMessage)));
List<Future<?>> futures =
loginResponseMessage.getApplicationData().getVinList().stream()
.map(
vin -> {
VehicleHandler handler =
new VehicleHandler(
this,
client,
saicUri,
loginResponseMessage.getBody().getUid(),
loginResponseMessage.getApplicationData().getToken(),
mqttAccountPrefix,
vin,
getVehicleState(mqttAccountPrefix, vin.getVin()));
vehicleHandlerMap.put(vin.getVin(), handler);
return handler;
})
.map(
handler ->
(Callable<Object>)
() -> {
handler.handleVehicle();
return null;
})
.map(Executors.newSingleThreadExecutor()::submit)
.collect(Collectors.toList());

ScheduledFuture<?> pollingJob =
createMessagePoller(
loginResponseMessage.getBody().getUid(),
loginResponseMessage.getApplicationData().getToken(),
mqttAccountPrefix);

futures.add(pollingJob);

for (Future<?> future : futures) {
// make sure we wait on all futures before exiting
future.get();
}
return 0;
futures.add(pollingJob);

for (Future<?> future : futures) {
// make sure we wait on all futures before exiting
future.get();
}
return 0;
}

private VehicleState getVehicleState(String mqttAccountPrefix, String vin) {
Expand Down Expand Up @@ -526,14 +502,8 @@ public String getAbrpUserToken(String vin) {
}

public void notifyMessage(String mqttMessagePrefix, SaicMessage message) throws MqttException {
MqttMessage msg =
new MqttMessage(SaicMqttGateway.toJSON(message).getBytes(StandardCharsets.UTF_8));
msg.setQos(0);
// Don't retain, so deleted messages are removed
// automatically from the broker
msg.setRetained(false);
client.publish(mqttMessagePrefix + "/" + message.getMessageId(), msg);

client.publish(
mqttMessagePrefix + "/" + message.getMessageId(), SaicMqttGateway.toJSON(message));
if (message.getVin() != null) {
vehicleHandlerMap.get(message.getVin()).notifyMessage(message);
}
Expand Down
Loading