Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.github.cloudiator.deployment.domain;

import java.util.List;
import java.util.Map;
import java.util.Optional;

public interface HdfsInterface extends TaskInterface {

String file();

Optional<String> className();

List<String> arguments();

Map<String, String> hdfsArguments();

Map<String, String> hdfsConfiguration();


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.github.cloudiator.deployment.domain;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.Lists;
import io.github.cloudiator.deployment.security.VariableContext;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class HdfsInterfaceBuilder {

private String file;
private String className;
private List<String> arguments;
private Map<String, String> hdfsArguments;
private Map<String, String> hdfsConfiguration;
private ProcessMapping processMapping;

private HdfsInterfaceBuilder() {
arguments = new LinkedList<>();
hdfsArguments = new HashMap<>();
hdfsConfiguration = new HashMap<>();
}

private HdfsInterfaceBuilder(HdfsInterface hdfsInterface) {
file = hdfsInterface.file();
className = hdfsInterface.className().orElse(null);
arguments = new LinkedList<>(hdfsInterface.arguments());
hdfsArguments = new HashMap<>(hdfsInterface.hdfsArguments());
hdfsConfiguration = new HashMap<>(hdfsInterface.hdfsConfiguration());
processMapping = hdfsInterface.processMapping();
}

public static HdfsInterfaceBuilder newBuilder() {
return new HdfsInterfaceBuilder();
}

public static HdfsInterfaceBuilder of(HdfsInterface hdfsInterface) {
return new HdfsInterfaceBuilder(hdfsInterface);
}

public HdfsInterfaceBuilder file(String file) {
this.file = file;
return this;
}

public HdfsInterfaceBuilder className(String className) {
this.className = className;
return this;
}

public HdfsInterfaceBuilder arguments(Iterable<String> arguments) {
this.arguments = Lists.newLinkedList(arguments);
return this;
}

public HdfsInterfaceBuilder addArgument(String argument) {
checkNotNull(argument, "argument is null");
arguments.add(argument);
return this;
}

public HdfsInterfaceBuilder hdfsArguments(Map<String, String> hdfsArguments) {
this.hdfsArguments = hdfsArguments;
return this;
}

public HdfsInterfaceBuilder putHdfsArguments(String key, String value) {
checkNotNull(key, "key is null");
checkNotNull(value, "value is null");
hdfsArguments.put(key, value);
return this;
}

public HdfsInterfaceBuilder hdfsConfiguration(Map<String, String> hdfsConfiguration) {
this.hdfsConfiguration = hdfsConfiguration;
return this;
}

public HdfsInterfaceBuilder putHdfsConfiguration(String key, String value) {
checkNotNull(key, "key is null");
checkNotNull(value, "value is null");
hdfsConfiguration.put(key, value);
return this;
}

public HdfsInterfaceBuilder processMapping(ProcessMapping processMapping) {
this.processMapping = processMapping;
return this;
}

public HdfsInterfaceBuilder decorate(VariableContext variableContext) {

arguments.replaceAll(variableContext::parse);
hdfsArguments.replaceAll((k, v) -> variableContext.parse(v));
hdfsConfiguration.replaceAll((k, v) -> variableContext.parse(v));

return this;
}


public HdfsInterface build() {
return new HdfsInterfaceImpl(file, className, arguments, hdfsArguments, hdfsConfiguration,
processMapping);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.github.cloudiator.deployment.domain;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.MoreObjects;
import io.github.cloudiator.deployment.security.VariableContext;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;

public class HdfsInterfaceImpl implements HdfsInterface {

private final String file;
@Nullable
private final String className;
private final List<String> arguments;
private final Map<String, String> hdfsArguments;
private final Map<String, String> hdfsConfiguration;
private final ProcessMapping processMapping;

public HdfsInterfaceImpl(String file, @Nullable String className,
List<String> arguments, Map<String, String> hdfsArguments,
Map<String, String> hdfsConfiguration,
ProcessMapping processMapping) {

checkNotNull(file, "file is null");
checkArgument(!file.isEmpty(), "file is empty");
this.file = file;

if (className != null) {
checkArgument(!className.isEmpty(), "class name is empty");
}
this.className = className;
checkNotNull(arguments, "arguments is null");
this.arguments = arguments;
checkNotNull(hdfsArguments, "hdfs arguments is null");
this.hdfsArguments = hdfsArguments;
checkNotNull(hdfsConfiguration, "hdfs configuration is null");
this.hdfsConfiguration = hdfsConfiguration;

checkNotNull(processMapping, "processMapping is null");
this.processMapping = processMapping;

}


@Override
public String file() {
return file;
}

@Override
public Optional<String> className() {
return Optional.ofNullable(className);
}

@Override
public List<String> arguments() {
return arguments;
}

@Override
public Map<String, String> hdfsArguments() {
return hdfsArguments;
}

@Override
public Map<String, String> hdfsConfiguration() {
return hdfsConfiguration;
}

@Override
public ProcessMapping processMapping() {
return processMapping;
}

@Override
public boolean isStaticallyConfigured() {
return true;
}

@Override
public boolean requiresManualWait(TaskInterface dependency) {
return true;
}

@Override
public TaskInterface decorateEnvironment(Environment environment) {

final HdfsInterfaceBuilder hdfsInterfaceBuilder = HdfsInterfaceBuilder.of(this);
environment.forEach((key, value) -> {
hdfsInterfaceBuilder.addArgument("--" + key);
hdfsInterfaceBuilder.addArgument(value);
});

return hdfsInterfaceBuilder.build();
}

@Override
public TaskInterface decorateVariables(VariableContext variableContext) {
return HdfsInterfaceBuilder.of(this).decorate(variableContext).build();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("file", file).add("className", className)
.add("arguments", arguments).add("hdfsArguments", hdfsArguments)
.add("hdfsConfiguration", hdfsConfiguration).add("processMapping", processMapping)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.github.cloudiator.deployment.messaging;

import com.google.common.base.Strings;
import de.uniulm.omi.cloudiator.util.TwoWayConverter;
import io.github.cloudiator.deployment.domain.ProcessMapping;
import io.github.cloudiator.deployment.domain.HdfsInterface;
import io.github.cloudiator.deployment.domain.HdfsInterfaceBuilder;
import org.cloudiator.messages.entities.TaskEntities;
import org.cloudiator.messages.entities.TaskEntities.HdfsInterface.Builder;

public class HdfsInterfaceConverter implements
TwoWayConverter<TaskEntities.HdfsInterface, HdfsInterface> {

public static final HdfsInterfaceConverter INSTANCE = new HdfsInterfaceConverter();
public static final ProcessMappingConverter PROCESS_MAPPING_CONVERTER = new ProcessMappingConverter();

private HdfsInterfaceConverter() {
}

@Override
public TaskEntities.HdfsInterface applyBack(HdfsInterface hdfsInterface) {
Builder builder = TaskEntities.HdfsInterface.newBuilder();

if (hdfsInterface.className().isPresent()) {
builder.setClassName(hdfsInterface.className().get());
}

builder.setFile(hdfsInterface.file()).addAllArguments(hdfsInterface.arguments())
.putAllHdfsArguments(hdfsInterface.hdfsArguments())
.putAllHdfsConfiguration(hdfsInterface.hdfsConfiguration());

builder.setProcessMapping(PROCESS_MAPPING_CONVERTER.applyBack(hdfsInterface.processMapping()));

return builder.build();
}

@Override
public HdfsInterface apply(TaskEntities.HdfsInterface hdfsInterface) {

final HdfsInterfaceBuilder hdfsInterfaceBuilder = HdfsInterfaceBuilder.newBuilder()
.file(hdfsInterface.getFile())

.arguments(hdfsInterface.getArgumentsList())
.hdfsArguments(hdfsInterface.getHdfsArgumentsMap())
.hdfsConfiguration(hdfsInterface.getHdfsConfigurationMap());

if (!Strings.isNullOrEmpty(hdfsInterface.getClassName())) {
hdfsInterfaceBuilder.className(hdfsInterface.getClassName());
}

hdfsInterfaceBuilder
.processMapping(PROCESS_MAPPING_CONVERTER.apply(hdfsInterface.getProcessMapping()));

return hdfsInterfaceBuilder.build();
}

private static class ProcessMappingConverter implements
TwoWayConverter<TaskEntities.ProcessMapping, ProcessMapping> {

@Override
public TaskEntities.ProcessMapping applyBack(ProcessMapping processMapping) {
switch (processMapping) {
case SINGLE:
return TaskEntities.ProcessMapping.SINGLE;
case CLUSTER:
return TaskEntities.ProcessMapping.CLUSTER;
default:
throw new AssertionError("Unknown ProcessMapping type " + processMapping);
}
}

@Override
public ProcessMapping apply(TaskEntities.ProcessMapping processMapping) {
switch (processMapping) {
case CLUSTER:
return ProcessMapping.CLUSTER;
case SINGLE:
return ProcessMapping.SINGLE;
case UNRECOGNIZED:
default:
throw new AssertionError("Unknown process mapping type " + processMapping);
}
}
}
}
Loading