diff --git a/deployment-common/src/main/java/io/github/cloudiator/deployment/domain/HdfsInterface.java b/deployment-common/src/main/java/io/github/cloudiator/deployment/domain/HdfsInterface.java new file mode 100644 index 0000000..71a9802 --- /dev/null +++ b/deployment-common/src/main/java/io/github/cloudiator/deployment/domain/HdfsInterface.java @@ -0,0 +1,20 @@ +package io.github.cloudiator.deployment.domain; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public interface HdfsInterface extends TaskInterface { + + String file(); + + Optional className(); + + List arguments(); + + Map hdfsArguments(); + + Map hdfsConfiguration(); + + +} diff --git a/deployment-common/src/main/java/io/github/cloudiator/deployment/domain/HdfsInterfaceBuilder.java b/deployment-common/src/main/java/io/github/cloudiator/deployment/domain/HdfsInterfaceBuilder.java new file mode 100644 index 0000000..f92de11 --- /dev/null +++ b/deployment-common/src/main/java/io/github/cloudiator/deployment/domain/HdfsInterfaceBuilder.java @@ -0,0 +1,110 @@ +package io.github.cloudiator.deployment.domain; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Lists; +import io.github.cloudiator.deployment.security.VariableContext; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class HdfsInterfaceBuilder { + + private String file; + private String className; + private List arguments; + private Map hdfsArguments; + private Map hdfsConfiguration; + private ProcessMapping processMapping; + + private HdfsInterfaceBuilder() { + arguments = new LinkedList<>(); + hdfsArguments = new HashMap<>(); + hdfsConfiguration = new HashMap<>(); + } + + private HdfsInterfaceBuilder(HdfsInterface hdfsInterface) { + file = hdfsInterface.file(); + className = hdfsInterface.className().orElse(null); + arguments = new LinkedList<>(hdfsInterface.arguments()); + hdfsArguments = new HashMap<>(hdfsInterface.hdfsArguments()); + hdfsConfiguration = new HashMap<>(hdfsInterface.hdfsConfiguration()); + processMapping = hdfsInterface.processMapping(); + } + + public static HdfsInterfaceBuilder newBuilder() { + return new HdfsInterfaceBuilder(); + } + + public static HdfsInterfaceBuilder of(HdfsInterface hdfsInterface) { + return new HdfsInterfaceBuilder(hdfsInterface); + } + + public HdfsInterfaceBuilder file(String file) { + this.file = file; + return this; + } + + public HdfsInterfaceBuilder className(String className) { + this.className = className; + return this; + } + + public HdfsInterfaceBuilder arguments(Iterable arguments) { + this.arguments = Lists.newLinkedList(arguments); + return this; + } + + public HdfsInterfaceBuilder addArgument(String argument) { + checkNotNull(argument, "argument is null"); + arguments.add(argument); + return this; + } + + public HdfsInterfaceBuilder hdfsArguments(Map hdfsArguments) { + this.hdfsArguments = hdfsArguments; + return this; + } + + public HdfsInterfaceBuilder putHdfsArguments(String key, String value) { + checkNotNull(key, "key is null"); + checkNotNull(value, "value is null"); + hdfsArguments.put(key, value); + return this; + } + + public HdfsInterfaceBuilder hdfsConfiguration(Map hdfsConfiguration) { + this.hdfsConfiguration = hdfsConfiguration; + return this; + } + + public HdfsInterfaceBuilder putHdfsConfiguration(String key, String value) { + checkNotNull(key, "key is null"); + checkNotNull(value, "value is null"); + hdfsConfiguration.put(key, value); + return this; + } + + public HdfsInterfaceBuilder processMapping(ProcessMapping processMapping) { + this.processMapping = processMapping; + return this; + } + + public HdfsInterfaceBuilder decorate(VariableContext variableContext) { + + arguments.replaceAll(variableContext::parse); + hdfsArguments.replaceAll((k, v) -> variableContext.parse(v)); + hdfsConfiguration.replaceAll((k, v) -> variableContext.parse(v)); + + return this; + } + + + public HdfsInterface build() { + return new HdfsInterfaceImpl(file, className, arguments, hdfsArguments, hdfsConfiguration, + processMapping); + } + + +} diff --git a/deployment-common/src/main/java/io/github/cloudiator/deployment/domain/HdfsInterfaceImpl.java b/deployment-common/src/main/java/io/github/cloudiator/deployment/domain/HdfsInterfaceImpl.java new file mode 100644 index 0000000..a2c9278 --- /dev/null +++ b/deployment-common/src/main/java/io/github/cloudiator/deployment/domain/HdfsInterfaceImpl.java @@ -0,0 +1,113 @@ +package io.github.cloudiator.deployment.domain; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; +import io.github.cloudiator.deployment.security.VariableContext; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; + +public class HdfsInterfaceImpl implements HdfsInterface { + + private final String file; + @Nullable + private final String className; + private final List arguments; + private final Map hdfsArguments; + private final Map hdfsConfiguration; + private final ProcessMapping processMapping; + + public HdfsInterfaceImpl(String file, @Nullable String className, + List arguments, Map hdfsArguments, + Map hdfsConfiguration, + ProcessMapping processMapping) { + + checkNotNull(file, "file is null"); + checkArgument(!file.isEmpty(), "file is empty"); + this.file = file; + + if (className != null) { + checkArgument(!className.isEmpty(), "class name is empty"); + } + this.className = className; + checkNotNull(arguments, "arguments is null"); + this.arguments = arguments; + checkNotNull(hdfsArguments, "hdfs arguments is null"); + this.hdfsArguments = hdfsArguments; + checkNotNull(hdfsConfiguration, "hdfs configuration is null"); + this.hdfsConfiguration = hdfsConfiguration; + + checkNotNull(processMapping, "processMapping is null"); + this.processMapping = processMapping; + + } + + + @Override + public String file() { + return file; + } + + @Override + public Optional className() { + return Optional.ofNullable(className); + } + + @Override + public List arguments() { + return arguments; + } + + @Override + public Map hdfsArguments() { + return hdfsArguments; + } + + @Override + public Map hdfsConfiguration() { + return hdfsConfiguration; + } + + @Override + public ProcessMapping processMapping() { + return processMapping; + } + + @Override + public boolean isStaticallyConfigured() { + return true; + } + + @Override + public boolean requiresManualWait(TaskInterface dependency) { + return true; + } + + @Override + public TaskInterface decorateEnvironment(Environment environment) { + + final HdfsInterfaceBuilder hdfsInterfaceBuilder = HdfsInterfaceBuilder.of(this); + environment.forEach((key, value) -> { + hdfsInterfaceBuilder.addArgument("--" + key); + hdfsInterfaceBuilder.addArgument(value); + }); + + return hdfsInterfaceBuilder.build(); + } + + @Override + public TaskInterface decorateVariables(VariableContext variableContext) { + return HdfsInterfaceBuilder.of(this).decorate(variableContext).build(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("file", file).add("className", className) + .add("arguments", arguments).add("hdfsArguments", hdfsArguments) + .add("hdfsConfiguration", hdfsConfiguration).add("processMapping", processMapping) + .toString(); + } +} diff --git a/deployment-common/src/main/java/io/github/cloudiator/deployment/messaging/HdfsInterfaceConverter.java b/deployment-common/src/main/java/io/github/cloudiator/deployment/messaging/HdfsInterfaceConverter.java new file mode 100644 index 0000000..bd0803a --- /dev/null +++ b/deployment-common/src/main/java/io/github/cloudiator/deployment/messaging/HdfsInterfaceConverter.java @@ -0,0 +1,85 @@ +package io.github.cloudiator.deployment.messaging; + +import com.google.common.base.Strings; +import de.uniulm.omi.cloudiator.util.TwoWayConverter; +import io.github.cloudiator.deployment.domain.ProcessMapping; +import io.github.cloudiator.deployment.domain.HdfsInterface; +import io.github.cloudiator.deployment.domain.HdfsInterfaceBuilder; +import org.cloudiator.messages.entities.TaskEntities; +import org.cloudiator.messages.entities.TaskEntities.HdfsInterface.Builder; + +public class HdfsInterfaceConverter implements + TwoWayConverter { + + public static final HdfsInterfaceConverter INSTANCE = new HdfsInterfaceConverter(); + public static final ProcessMappingConverter PROCESS_MAPPING_CONVERTER = new ProcessMappingConverter(); + + private HdfsInterfaceConverter() { + } + + @Override + public TaskEntities.HdfsInterface applyBack(HdfsInterface hdfsInterface) { + Builder builder = TaskEntities.HdfsInterface.newBuilder(); + + if (hdfsInterface.className().isPresent()) { + builder.setClassName(hdfsInterface.className().get()); + } + + builder.setFile(hdfsInterface.file()).addAllArguments(hdfsInterface.arguments()) + .putAllHdfsArguments(hdfsInterface.hdfsArguments()) + .putAllHdfsConfiguration(hdfsInterface.hdfsConfiguration()); + + builder.setProcessMapping(PROCESS_MAPPING_CONVERTER.applyBack(hdfsInterface.processMapping())); + + return builder.build(); + } + + @Override + public HdfsInterface apply(TaskEntities.HdfsInterface hdfsInterface) { + + final HdfsInterfaceBuilder hdfsInterfaceBuilder = HdfsInterfaceBuilder.newBuilder() + .file(hdfsInterface.getFile()) + + .arguments(hdfsInterface.getArgumentsList()) + .hdfsArguments(hdfsInterface.getHdfsArgumentsMap()) + .hdfsConfiguration(hdfsInterface.getHdfsConfigurationMap()); + + if (!Strings.isNullOrEmpty(hdfsInterface.getClassName())) { + hdfsInterfaceBuilder.className(hdfsInterface.getClassName()); + } + + hdfsInterfaceBuilder + .processMapping(PROCESS_MAPPING_CONVERTER.apply(hdfsInterface.getProcessMapping())); + + return hdfsInterfaceBuilder.build(); + } + + private static class ProcessMappingConverter implements + TwoWayConverter { + + @Override + public TaskEntities.ProcessMapping applyBack(ProcessMapping processMapping) { + switch (processMapping) { + case SINGLE: + return TaskEntities.ProcessMapping.SINGLE; + case CLUSTER: + return TaskEntities.ProcessMapping.CLUSTER; + default: + throw new AssertionError("Unknown ProcessMapping type " + processMapping); + } + } + + @Override + public ProcessMapping apply(TaskEntities.ProcessMapping processMapping) { + switch (processMapping) { + case CLUSTER: + return ProcessMapping.CLUSTER; + case SINGLE: + return ProcessMapping.SINGLE; + case UNRECOGNIZED: + default: + throw new AssertionError("Unknown process mapping type " + processMapping); + } + } + } +} diff --git a/deployment-common/src/main/java/io/github/cloudiator/persistance/HdfsTaskInterfaceModel.java b/deployment-common/src/main/java/io/github/cloudiator/persistance/HdfsTaskInterfaceModel.java new file mode 100644 index 0000000..b6dae08 --- /dev/null +++ b/deployment-common/src/main/java/io/github/cloudiator/persistance/HdfsTaskInterfaceModel.java @@ -0,0 +1,113 @@ +package io.github.cloudiator.persistance; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.github.cloudiator.deployment.domain.ProcessMapping; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import javax.persistence.Column; +import javax.persistence.ElementCollection; +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; +import javax.persistence.OrderColumn; + +@Entity +public class HdfsTaskInterfaceModel extends TaskInterfaceModel { + + @Column(nullable = false) + private String file; + + @Nullable + private String className; + + @OrderColumn + @ElementCollection + private List arguments = new LinkedList<>(); + + @ElementCollection + private Map hdfsArguments = new HashMap<>(); + + @ElementCollection + private Map hdfsConfiguration = new HashMap<>(); + + @Enumerated(EnumType.STRING) + @Column(nullable = false) + private ProcessMapping processMapping; + + @Nullable + public String getClassName() { + return className; + } + + public List getArguments() { + return ImmutableList.copyOf(arguments); + } + + public HdfsTaskInterfaceModel addArgument(String argument) { + checkNotNull(argument, "argument is null"); + arguments.add(argument); + return this; + } + + public Map getHdfsArguments() { + return ImmutableMap.copyOf(hdfsArguments); + } + + public HdfsTaskInterfaceModel putHdfsArgument(String key, String value) { + checkNotNull(key, "key is null"); + checkNotNull(value, "value is null"); + hdfsArguments.put(key, value); + return this; + } + + public Map getHdfsConfiguration() { + return ImmutableMap.copyOf(hdfsConfiguration); + } + + public HdfsTaskInterfaceModel putHdfsConfiguration(String key, String value) { + checkNotNull(key, "key is null"); + checkNotNull(value, "value is null"); + hdfsConfiguration.put(key, value); + return this; + } + + + public String getFile() { + return file; + } + + public ProcessMapping getProcessMapping() { + return processMapping; + } + + /** + * Empty hibernate constructor + */ + protected HdfsTaskInterfaceModel() { + } + + HdfsTaskInterfaceModel(TaskModel taskModel, String file, @Nullable String className, + List arguments, Map hdfsArguments, + Map jdfsConfiguration, ProcessMapping processMapping) { + super(taskModel); + checkNotNull(file, "file is null"); + checkNotNull(arguments, "arguments is null"); + checkNotNull(hdfsArguments, "hdfsArguments is null"); + checkNotNull(hdfsConfiguration, "hdfsConfiguration is null"); + checkNotNull(processMapping, "processMapping is null"); + + this.file = file; + this.className = className; + this.arguments = arguments; + this.hdfsArguments = hdfsArguments; + this.hdfsConfiguration = hdfsConfiguration; + this.processMapping = processMapping; + } + +} diff --git a/hdfs/Dockerfile b/hdfs/Dockerfile new file mode 100644 index 0000000..fbb15e2 --- /dev/null +++ b/hdfs/Dockerfile @@ -0,0 +1,8 @@ +FROM openjdk:8-jre-alpine + +WORKDIR /data + +ADD target/hdfs-agent.jar . +ADD entry.sh . + +ENTRYPOINT ["./entry.sh"] diff --git a/hdfs/ci/publish_docker.sh b/hdfs/ci/publish_docker.sh new file mode 100755 index 0000000..384dc0f --- /dev/null +++ b/hdfs/ci/publish_docker.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +# build multiple times +# see https://github.com/GoogleContainerTools/jib/issues/802 + +echo "Publishing hdfs-agent to docker" + +cd hdfs + +export TAG=`if [ "$TRAVIS_BRANCH" == "master" ]; then echo "latest"; else echo ${TRAVIS_BRANCH} ; fi` +mvn -q -Ddocker.tag=${TAG} -Djib.to.auth.username=${DOCKER_USER} -Djib.to.auth.password=${DOCKER_PASS} jib:build + +mvn -q -Ddocker.tag=${COMMIT} -Djib.to.auth.username=${DOCKER_USER} -Djib.to.auth.password=${DOCKER_PASS} jib:build +mvn -q -Ddocker.tag=travis-${TRAVIS_BUILD_NUMBER} -Djib.to.auth.username=${DOCKER_USER} -Djib.to.auth.password=${DOCKER_PASS} jib:build + +cd .. diff --git a/hdfs/entry.sh b/hdfs/entry.sh new file mode 100644 index 0000000..022febd --- /dev/null +++ b/hdfs/entry.sh @@ -0,0 +1,6 @@ +#!/bin/sh + +set -x + +# Run the service +java -jar hdfs-agent.jar diff --git a/hdfs/pom.xml b/hdfs/pom.xml new file mode 100644 index 0000000..2372bce --- /dev/null +++ b/hdfs/pom.xml @@ -0,0 +1,104 @@ + + + + + + project + io.github.cloudiator.deployment + 0.3.0-SNAPSHOT + + 4.0.0 + + hdfs + + + manual + + + + + io.github.cloudiator.common + common-messaging + + + io.github.cloudiator.iaas + iaas-common + 0.3.0-SNAPSHOT + + + io.github.cloudiator.deployment + deployment-common + 0.3.0-SNAPSHOT + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + + org.apache.httpcomponents + httpclient + 4.5.6 + + + + + + + + + com.google.cloud.tools + jib-maven-plugin + + + gcr.io/distroless/java:debug + + + registry.hub.docker.com/cloudiator/hdfs-agent:${docker.tag} + + + true + + + + + maven-assembly-plugin + + + + io.github.cloudiator.deployment.hdfs.HdfsAgent + + + + jar-with-dependencies + + hdfs-agent + false + + + + + + + diff --git a/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/CreateHdfsClusterSubscriber.java b/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/CreateHdfsClusterSubscriber.java new file mode 100644 index 0000000..8f1b38c --- /dev/null +++ b/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/CreateHdfsClusterSubscriber.java @@ -0,0 +1,76 @@ +package io.github.cloudiator.deployment.hdfs; + +import com.google.inject.Inject; +import io.github.cloudiator.domain.Node; +import io.github.cloudiator.messaging.NodeToNodeMessageConverter; +import java.util.Set; +import java.util.stream.Collectors; +import org.cloudiator.messages.Process.SparkClusterCreatedResponse; +import org.cloudiator.messaging.MessageInterface; +import org.cloudiator.messaging.services.ProcessService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.cloudiator.messages.General.Error; + + +public class CreateHdfsClusterSubscriber implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(CreateHdfsClusterSubscriber.class); + private final ProcessService processService; + private final MessageInterface messageInterface; + private final CreateHdfsProcessStrategy createHdfsProcessStrategy; + private static final NodeToNodeMessageConverter NODE_MESSAGE_CONVERTER = NodeToNodeMessageConverter.INSTANCE; + + @Inject + public CreateHdfsClusterSubscriber( + ProcessService processService, + CreateHdfsProcessStrategy createHdfsProcessStrategy, + MessageInterface messageInterface) { + this.processService = processService; + this.createHdfsProcessStrategy = createHdfsProcessStrategy; + this.messageInterface = messageInterface; + } + + @Override + public void run() { + + LOGGER.debug("Create HdfsClusterSubscriber started and waiting for requests..."); + + processService.subscribeCreateHdfsClusterRequest( + (id, content) -> { + + try { + + final String userId = content.getUserId(); + + + Set nodes = content.getNodes().getNodesList().stream() + .map(NODE_MESSAGE_CONVERTER::applyBack).collect( + Collectors.toSet()); + + + + createHdfsProcessStrategy.executeClusterDeployment(userId,nodes); + + final hdfsClusterCreatedResponse hdfsClusterCreatedResponse = hdfsClusterCreatedResponse.newBuilder().build(); + + messageInterface.reply(id, hdfsClusterCreatedResponse); + + + } catch (Exception e) { + final String errorMessage = String + .format("Exception %s while processing request %s with id %s.", e.getMessage(), + content, id); + + LOGGER.error(errorMessage, e); + + messageInterface.reply(HdfsClusterCreatedResponse.class, id, + Error.newBuilder().setMessage(errorMessage).setCode(500).build()); + + } + + + }); + + } +} diff --git a/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/CreateHdfsProcessStrategy.java b/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/CreateHdfsProcessStrategy.java new file mode 100644 index 0000000..ae22b2e --- /dev/null +++ b/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/CreateHdfsProcessStrategy.java @@ -0,0 +1,168 @@ +package io.github.cloudiator.deployment.hdfs; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import de.uniulm.omi.cloudiator.domain.Identifiable; +import de.uniulm.omi.cloudiator.util.configuration.Configuration; +import io.github.cloudiator.deployment.config.Constants; +import io.github.cloudiator.deployment.domain.CloudiatorClusterProcessBuilder; +import io.github.cloudiator.deployment.domain.CloudiatorProcess; +import io.github.cloudiator.deployment.domain.CloudiatorProcess.ProcessState; +import io.github.cloudiator.deployment.domain.CloudiatorProcess.Type; +import io.github.cloudiator.deployment.domain.HdfsInterface; +import io.github.cloudiator.deployment.domain.Task; +import io.github.cloudiator.domain.Node; +import io.github.cloudiator.messaging.NodeToNodeMessageConverter; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import javax.inject.Named; +import org.apache.http.HttpStatus; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.BasicResponseHandler; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.cloudiator.messages.Installation.InstallationRequest; +import org.cloudiator.messages.Installation.InstallationResponse; +import org.cloudiator.messages.InstallationEntities.Installation; +import org.cloudiator.messages.InstallationEntities.Installation.Builder; +import org.cloudiator.messages.InstallationEntities.Tool; +import org.cloudiator.messaging.SettableFutureResponseCallback; +import org.cloudiator.messaging.services.InstallationRequestService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class CreateHdfsProcessStrategy { + + private static final Logger LOGGER = LoggerFactory.getLogger(CreateHdfsProcessStrategy.class); + + private static final NodeToNodeMessageConverter NODE_MESSAGE_CONVERTER = NodeToNodeMessageConverter.INSTANCE; + + private static final String HDFS_ARGUMENT_DELIMITER = ","; + + + /** + * Hdfs Default Settings + * currently adding no default settings + */ + + + private InstallationRequestService installationRequestService; + + @Named(Constants.INSTALL_MELODIC_TOOLS) + @Inject(optional = true) + boolean installMelodicTools = false; + + @Inject + CreateHdfsProcessStrategy(InstallationRequestService installationRequestService) { + this.installationRequestService = installationRequestService; + } + + + private void installHdfsDataNodes(String userId, Set nodes) { + + for (Node node : nodes) { + + LOGGER.debug("Installing Docker and Hdfs data node on node: " + node.id()); + + final Builder builder = Installation.newBuilder() + .setNode(NODE_MESSAGE_CONVERTER.apply(node)) + .addTool(Tool.DOCKER) + .addTool(Tool.HDFS_DATA); + + if (installMelodicTools) { + builder + .addTool(Tool.ALLUXIO_CLIENT) + .addTool(Tool.DLMS_AGENT); + } + + final InstallationRequest installationRequest = InstallationRequest.newBuilder() + .setUserId(userId).setInstallation(builder.build()).build(); + + final SettableFutureResponseCallback futureResponseCallback = SettableFutureResponseCallback + .create(); + + installationRequestService + .createInstallationRequestAsync(installationRequest, futureResponseCallback); + try { + futureResponseCallback.get(); + } catch (InterruptedException e) { + throw new IllegalStateException( + "Docker and Hdfs data node installation was interrupted during installation request.", + e); + } catch (ExecutionException e) { + throw new IllegalStateException("Error during Docker and Hdfd data node installation", + e.getCause()); + } + + LOGGER.debug("Finished Docker and HDFS data node installation on node: " + node.id()); + } + + + } + + + + public void executeClusterDeployment(String userId, Set nodes) { + + LOGGER.info(String + .format("Deploying a new Hdfs cluster for user: %s on nodes %s", + userId, nodes)); + + try { + + LOGGER.debug("Triggering Hdfs data node installations..."); + this.installHdfsDataNodes(userId, nodes); + + + LOGGER.debug("Successfully deployed HDFS cluster!"); + + + + } catch (Exception e) { + throw new IllegalStateException("Could not deploy HDFS cluster on nodes " + nodes, e); + } + + } + + + public CloudiatorProcess executeJobSubmission(String userId, String schedule, Task task, + HdfsInterface hdfsInterface,Set nodes) { + + LOGGER.info(String + .format("Submitting new HdfsJobSubmission for user: %s, schedule %s, task %s on nodes %s", + userId, schedule, task, nodes)); + + try { + + UUID uuid = UUID.randomUUID(); + String temporaryHdfsProcessUid = uuid.toString(); + + return CloudiatorClusterProcessBuilder.create().id(temporaryHdfsProcessUid) + .originId(temporaryHdfsProcessUid) + .userId(userId) + .type(Type.HDFS) + .taskInterface(HdfsInterface.class.getCanonicalName()) + .state(ProcessState.RUNNING) + .addAllNodes(nodes.stream().map(Identifiable::id).collect(Collectors.toList())) + .taskName(task.name()).scheduleId(schedule).startNow().build(); + + } catch (Exception e) { + throw new IllegalStateException("Could not deploy task " + task, e); + } + + } + +} diff --git a/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/CreateHdfsProcessSubscriber.java b/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/CreateHdfsProcessSubscriber.java new file mode 100644 index 0000000..1bce06b --- /dev/null +++ b/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/CreateHdfsProcessSubscriber.java @@ -0,0 +1,108 @@ +package io.github.cloudiator.deployment.hdfs; + +import com.google.inject.Inject; +import io.github.cloudiator.deployment.domain.CloudiatorProcess; +import io.github.cloudiator.deployment.domain.Job; +import io.github.cloudiator.deployment.domain.SparkInterface; +import io.github.cloudiator.deployment.messaging.JobConverter; +import io.github.cloudiator.deployment.messaging.ProcessMessageConverter; +import io.github.cloudiator.deployment.messaging.SparkInterfaceConverter; +import io.github.cloudiator.domain.Node; +import io.github.cloudiator.messaging.NodeToNodeMessageConverter; +import java.util.Set; +import java.util.stream.Collectors; +import org.cloudiator.messages.General.Error; +import org.cloudiator.messages.Process.SparkProcessCreatedResponse; +import org.cloudiator.messaging.MessageInterface; +import org.cloudiator.messaging.services.ProcessService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class CreateHdfsProcessSubscriber implements Runnable { + + private final ProcessService processService; + private static final JobConverter JOB_CONVERTER = JobConverter.INSTANCE; + private final CreateHdfsProcessStrategy createHdfsProcessStrategy; + private static final Logger LOGGER = LoggerFactory.getLogger(CreateHdfsProcessSubscriber.class); + private static final ProcessMessageConverter PROCESS_MESSAGE_CONVERTER = ProcessMessageConverter.INSTANCE; + private final MessageInterface messageInterface; + private static final NodeToNodeMessageConverter NODE_MESSAGE_CONVERTER = NodeToNodeMessageConverter.INSTANCE; + + @Inject + public CreateHdfsProcessSubscriber( + ProcessService processService, + CreateHdfsProcessStrategy createHdfsProcessStrategy, + MessageInterface messageInterface) { + this.processService = processService; + this.createHdfsProcessStrategy = createHdfsProcessStrategy; + this.messageInterface = messageInterface; + } + + + @Override + public void run() { + + LOGGER.debug("CreateHdfsProcessRequestSubscriber started and waiting for requests..."); + + processService.subscribeCreateHdfsProcessRequest( + (id, content) -> { + + try { + + final String userId = content.getUserId(); + final Job job = JOB_CONVERTER.apply(conten.getHdfs().getJob()); + final String task = content.getHdfs().getTask(); + + switch (content.getHdfs().getRunsOnCase()) { + + case NODES: + break; + case NODE: + throw new UnsupportedOperationException( + "Running hdfs on single node is currently unsupported."); + case RUNSON_NOT_SET: + default: + throw new AssertionError( + "Illegal RunsOn Case " + content.getSpark().getRunsOnCase()); + } + + Set nodes = content.getHdfs().getNodes().getNodesList().stream() + .map(NODE_MESSAGE_CONVERTER::applyBack).collect( + Collectors.toSet()); + + final HdfsInterface hdfsInterface = HdfsInterfaceConverter.INSTANCE + .apply(content.getHdfs().getHdfsInterface()); + + final String schedule = content.getHdfs().getSchedule(); + + final CloudiatorProcess cloudiatorProcess = createHdfsProcessStrategy + .executeJobSubmission(userId, schedule, job.getTask(task).orElseThrow( + () -> new IllegalStateException( + String.format("Job %s does not contain task %s", job, task))), + sparkInterface, + nodes); + + final HdfsProcessCreatedResponse hdfsProcessCreatedResponse = HdfsProcessCreatedResponse + .newBuilder() + .setProcess(PROCESS_MESSAGE_CONVERTER.applyBack(cloudiatorProcess)).build(); + + messageInterface.reply(id, hdfsProcessCreatedResponse); + + } catch (Exception e) { + final String errorMessage = String + .format("Exception %s while processing request %s with id %s.", e.getMessage(), + content, id); + + LOGGER.error(errorMessage, e); + + messageInterface.reply(HdfsProcessCreatedResponse.class, id, + Error.newBuilder().setMessage(errorMessage).setCode(500).build()); + + } + + + }); + } + +} diff --git a/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/HdfsAgent.java b/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/HdfsAgent.java new file mode 100644 index 0000000..2ca0837 --- /dev/null +++ b/hdfs/src/main/java/io/github/cloudiator/deployment/hdfs/HdfsAgent.java @@ -0,0 +1,26 @@ +package io.github.cloudiator.deployment.hdfs; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import io.github.cloudiator.deployment.config.DeploymentContext; +import io.github.cloudiator.deployment.config.DeploymentModule; +import org.cloudiator.messaging.kafka.KafkaContext; +import org.cloudiator.messaging.kafka.KafkaMessagingModule; +import org.cloudiator.messaging.services.MessageServiceModule; + + +public class HdfsAgent { + + private static final Injector INJECTOR = Guice + .createInjector( + new MessageServiceModule(), + new DeploymentModule(new DeploymentContext()), + new KafkaMessagingModule(new KafkaContext())); + + + public static void main(String[] args) { + INJECTOR.getInstance(CreateHdfsProcessSubscriber.class).run(); + INJECTOR.getInstance(CreateHdfsClusterSubscriber.class).run(); + } + +} diff --git a/hdfs/src/main/resources/.gitignore b/hdfs/src/main/resources/.gitignore new file mode 100644 index 0000000..c218192 --- /dev/null +++ b/hdfs/src/main/resources/.gitignore @@ -0,0 +1 @@ +application.conf diff --git a/hdfs/src/main/resources/logback.xml b/hdfs/src/main/resources/logback.xml new file mode 100644 index 0000000..1b48f71 --- /dev/null +++ b/hdfs/src/main/resources/logback.xml @@ -0,0 +1,37 @@ + + + + + + + + %date{yyyy-MM-dd HH:mm:ss ZZZZ} [%level] from %logger in %thread - + %message%n%xException + + + + + + + + + + + + + diff --git a/hdfs/src/main/resources/reference.conf b/hdfs/src/main/resources/reference.conf new file mode 100644 index 0000000..b09bc2d --- /dev/null +++ b/hdfs/src/main/resources/reference.conf @@ -0,0 +1,12 @@ +kafka.bootstrapServers = "localhost:9092" +kafka.bootstrapServers = ${?KAFKA_BOOTSTRAP_SERVERS} +kafka.groupId = spark +kafka.groupId = ${?KAFKA_GROUP_ID} +kafka.responseTimeout = 50000 +kafka.responseTimeout = ${?KAFKA_RESPONSE_TIMEOUT} + +livy.server = "localhost:8998" +livy.server = ${?LIVY_SERVER} + +deployment.install.melodic.tools = false +deployment.install.melodic.tools = ${?DEPLOYMENT_INSTALL_MELODIC_TOOLS} \ No newline at end of file diff --git a/lance/.checkstyle b/lance/.checkstyle new file mode 100644 index 0000000..b7471b5 --- /dev/null +++ b/lance/.checkstyle @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/config/SchedulerModule.java b/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/config/SchedulerModule.java index 9c806d4..588b62a 100644 --- a/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/config/SchedulerModule.java +++ b/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/config/SchedulerModule.java @@ -54,6 +54,9 @@ import io.github.cloudiator.deployment.scheduler.processes.SimulationProcessSpawner; import io.github.cloudiator.deployment.scheduler.processes.SparkProcessKillerImpl; import io.github.cloudiator.deployment.scheduler.processes.SparkProcessSpawnerImpl; +import io.github.cloudiator.deployment.scheduler.processes.HdfsProcessKillerImpl; +import io.github.cloudiator.deployment.scheduler.processes.HdfsProcessSpawnerImpl; + import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +116,7 @@ protected void configure() { .newSetBinder(binder(), ProcessSpawner.class); processSpawnerMultibinder.addBinding().to(LanceProcessSpawnerImpl.class); processSpawnerMultibinder.addBinding().to(SparkProcessSpawnerImpl.class); + processSpawnerMultibinder.addBinding().to(HdfsProcessSpawnerImpl.class); processSpawnerMultibinder.addBinding().to(FaasProcessSpawnerImpl.class); processSpawnerMultibinder.addBinding().to(SimulationProcessSpawner.class); bind(ProcessSpawner.class).to(CompositeProcessSpawnerImpl.class); @@ -123,6 +127,7 @@ protected void configure() { processKillerMultibinder.addBinding().to(LanceProcessKillerImpl.class); processKillerMultibinder.addBinding().to(SparkProcessKillerImpl.class); processKillerMultibinder.addBinding().to(SimulationProcessKiller.class); + processKillerMultibinder.addBinding().to(HdfsProcessKillerImpl.class); //todo: implement process killer for FaaS. Probably also no-op? bind(ProcessKiller.class).to(CompositeProcessKiller.class); diff --git a/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/processes/HdfsProcessKillerImpl.java b/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/processes/HdfsProcessKillerImpl.java new file mode 100644 index 0000000..fab379b --- /dev/null +++ b/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/processes/HdfsProcessKillerImpl.java @@ -0,0 +1,28 @@ +/* + * Copyright 2014-2019 University of Ulm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.cloudiator.deployment.scheduler.processes; + +import io.github.cloudiator.deployment.domain.CloudiatorProcess; +import io.github.cloudiator.deployment.domain.CloudiatorProcess.Type; + +public class HdfsProcessKillerImpl extends NoOpProcessKiller { + + @Override + public boolean supports(CloudiatorProcess cloudiatorProcess) { + return cloudiatorProcess.type().equals(Type.HDFS); + } +} diff --git a/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/processes/HdfsProcessSpawnerImpl.java b/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/processes/HdfsProcessSpawnerImpl.java new file mode 100644 index 0000000..6709d62 --- /dev/null +++ b/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/processes/HdfsProcessSpawnerImpl.java @@ -0,0 +1,154 @@ +/* + * Copyright 2014-2019 University of Ulm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.cloudiator.deployment.scheduler.processes; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.MoreObjects; +import com.google.inject.Inject; +import io.github.cloudiator.deployment.domain.CloudiatorClusterProcess; +import io.github.cloudiator.deployment.domain.CloudiatorProcess; +import io.github.cloudiator.deployment.domain.CloudiatorSingleProcess; +import io.github.cloudiator.deployment.domain.Job; +import io.github.cloudiator.deployment.domain.HdfsInterface; +import io.github.cloudiator.deployment.domain.Task; +import io.github.cloudiator.deployment.domain.TaskInterface; +import io.github.cloudiator.deployment.messaging.JobConverter; +import io.github.cloudiator.deployment.messaging.ProcessMessageConverter; +import io.github.cloudiator.deployment.messaging.HdfsInterfaceConverter; +import io.github.cloudiator.domain.Node; +import io.github.cloudiator.messaging.NodeToNodeMessageConverter; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import org.cloudiator.messages.Process.CreateHdfsClusterRequest; +import org.cloudiator.messages.Process.CreateHdfsProcessRequest; +import org.cloudiator.messages.Process.HdfsClusterCreatedResponse; +import org.cloudiator.messages.Process.HdfsProcessCreatedResponse; +import org.cloudiator.messages.entities.ProcessEntities.Nodes; +import org.cloudiator.messages.entities.ProcessEntities.HdfsProcess; +import org.cloudiator.messages.entities.ProcessEntities.HdfsProcess.Builder; +import org.cloudiator.messaging.SettableFutureResponseCallback; +import org.cloudiator.messaging.services.ProcessService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HdfsProcessSpawnerImpl implements ProcessSpawner { + + private final ProcessService processService; + private static final JobConverter JOB_CONVERTER = JobConverter.INSTANCE; + private static final NodeToNodeMessageConverter NODE_CONVERTER = NodeToNodeMessageConverter.INSTANCE; + private static final ProcessMessageConverter PROCESS_MESSAGE_CONVERTER = ProcessMessageConverter.INSTANCE; + private static final Logger LOGGER = LoggerFactory + .getLogger(LanceProcessSpawnerImpl.class); + + @Inject + public HdfsProcessSpawnerImpl(ProcessService processService) { + this.processService = processService; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).toString(); + } + + @Override + public boolean supports(TaskInterface taskInterface) { + return taskInterface instanceof HdfsInterface; + } + + private Builder builder(String schedule, Job job, Task task, + TaskInterface taskInterface) { + + return HdfsProcess.newBuilder() + .setSchedule(schedule) + .setJob(JOB_CONVERTER.applyBack(job)) + .setTask(task.name()) + .setHdfsInterface(HdfsInterfaceConverter.INSTANCE.applyBack( + (HdfsInterface) taskInterface)); + + } + + private CloudiatorProcess executeRequest(String userId, HdfsProcess hdfsProcess) + throws ProcessSpawningException { + + final CreateHdfsClusterRequest clusterRequest = CreateHdfsClusterRequest.newBuilder() + .setNodes(hdfsProcess.getNodes()).setUserId(userId).build(); + try { + + SettableFutureResponseCallback settableFutureResponseCallback = SettableFutureResponseCallback + .create(); + + processService + .createHdfsClusterAsync(clusterRequest, settableFutureResponseCallback); + + settableFutureResponseCallback.get(); + + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Error while deploying Hdfs cluster! " + e.getMessage()); + throw new ProcessSpawningException("Error while deploying Hdfs cluster! " + e.getMessage(), + e); + } + + final CreateHdfsProcessRequest processRequest = CreateHdfsProcessRequest.newBuilder() + .setHdfs(hdfsProcess).setUserId(userId).build(); + + SettableFutureResponseCallback futureResponseCallback = SettableFutureResponseCallback + .create( + hdfsProcessCreatedResponse -> PROCESS_MESSAGE_CONVERTER + .apply(hdfsProcessCreatedResponse.getProcess())); + + processService.createHdfsProcessAsync(processRequest, futureResponseCallback); + + try { + return futureResponseCallback.get(); + } catch (InterruptedException e) { + throw new IllegalStateException( + String.format("%s got interrupted while spawning process", this), e); + } catch (ExecutionException e) { + throw new ProcessSpawningException(e.getCause().getMessage(), e); + } + + } + + @Override + public CloudiatorSingleProcess spawn(String userId, String schedule, Job job, Task task, + TaskInterface taskInterface, Node node) throws ProcessSpawningException { + + checkState(supports(taskInterface), String + .format("TaskInterface of type %s is not supported by %s", + taskInterface.getClass().getName(), this)); + + return (CloudiatorSingleProcess) executeRequest(userId, + builder(schedule, job, task, taskInterface).setNode(NODE_CONVERTER.apply(node)).build()); + } + + @Override + public CloudiatorClusterProcess spawn(String userId, String schedule, Job job, Task task, + TaskInterface taskInterface, Set nodes) throws ProcessSpawningException { + + checkState(supports(taskInterface), String + .format("TaskInterface of type %s is not supported by %s", + taskInterface.getClass().getName(), this)); + + return (CloudiatorClusterProcess) executeRequest(userId, + builder(schedule, job, task, taskInterface).setNodes(Nodes.newBuilder() + .addAllNodes(nodes.stream().map(NODE_CONVERTER).collect(Collectors.toList())).build()) + .build()); + } +} diff --git a/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/scaling/ScalingEngine.java b/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/scaling/ScalingEngine.java index 9fdadb2..083e741 100644 --- a/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/scaling/ScalingEngine.java +++ b/scheduler/src/main/java/io/github/cloudiator/deployment/scheduler/scaling/ScalingEngine.java @@ -63,6 +63,9 @@ import org.cloudiator.messages.Process.CreateSparkClusterRequest; import org.cloudiator.messages.Process.DeleteProcessRequest; import org.cloudiator.messages.Process.SparkClusterCreatedResponse; +import org.cloudiator.messages.Process.CreateHdfsdClusterRequest; +import org.cloudiator.messages.Process.HdfsClusterCreatedResponse; + import org.cloudiator.messages.entities.ProcessEntities.Nodes; import org.cloudiator.messaging.ResponseException; import org.cloudiator.messaging.SettableFutureResponseCallback; @@ -384,6 +387,20 @@ private Collection scaleOutCluster(Schedule schedule, Task ta settableFutureResponseCallback.get(); + + final CreateHdfsClusterRequest hdfsClusterRequest = CreateHdfsClusterRequest.newBuilder() + .setUserId(schedule.userId()).setNodes( + Nodes.newBuilder().addAllNodes(startedNodes.stream().map( + NodeToNodeMessageConverter.INSTANCE).collect(Collectors.toSet())).build()) + .build(); + + final SettableFutureResponseCallback settableFutureResponseCallback = SettableFutureResponseCallback + .create(); + + processService.createHdfsClusterAsync(hdfsClusterRequest, settableFutureResponseCallback); + + settableFutureResponseCallback.get(); + final CloudiatorClusterProcess modifiedProcess = CloudiatorClusterProcessBuilder .of((CloudiatorClusterProcess) cloudiatorProcess) .addAllNodes(startedNodes.stream().map(