内容简介:本文主要研究一下flink的ParameterToolflink-core-1.7.1-sources.jar!/org/apache/flink/api/common/ExecutionConfig.javaflink-java-1.7.1-sources.jar!/org/apache/flink/api/java/utils/ParameterTool.java
序
本文主要研究一下flink的ParameterTool
实例
fromPropertiesFile
String propertiesFilePath = "/home/sam/flink/myjob.properties"; ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath); File propertiesFile = new File(propertiesFilePath); ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile); InputStream propertiesFileInputStream = new FileInputStream(file); ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
- 使用ParameterTool.fromPropertiesFile从.properties文件创建ParameterTool
fromArgs
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
}
-
使用ParameterTool.fromArgs从命令行创建ParameterTool(
比如--input hdfs:///mydata --elements 42)
fromSystemProperties
ParameterTool parameter = ParameterTool.fromSystemProperties();
-
使用ParameterTool.fromSystemProperties从system properties创建ParameterTool(
比如-Dinput=hdfs:///mydata)
获取参数值
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.
- 可以使用ParameterTool的get、getRequired、getLong等方法获取参数值
设置为global
env.getConfig().setGlobalJobParameters(parameters);
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// ... do more ...
}
- 使用env.getConfig().setGlobalJobParameters将ParameterTool的访问范围设置为global
GlobalJobParameters
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/ExecutionConfig.java
public static class GlobalJobParameters implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Convert UserConfig into a {@code Map<String, String>} representation.
* This can be used by the runtime, for example for presenting the user config in the web frontend.
*
* @return Key/Value representation of the UserConfig
*/
public Map<String, String> toMap() {
return Collections.emptyMap();
}
}
- GlobalJobParameters里头有一个toMap方法,返回Collections.emptyMap()
ParameterTool
flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/utils/ParameterTool.java
@Public
public class ParameterTool extends ExecutionConfig.GlobalJobParameters implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY";
protected static final String DEFAULT_UNDEFINED = "<undefined>";
//......
// ------------------ ParameterUtil ------------------------
protected final Map<String, String> data;
// data which is only used on the client and does not need to be transmitted
protected transient Map<String, String> defaultData;
protected transient Set<String> unrequestedParameters;
private ParameterTool(Map<String, String> data) {
this.data = Collections.unmodifiableMap(new HashMap<>(data));
this.defaultData = new ConcurrentHashMap<>(data.size());
this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
unrequestedParameters.addAll(data.keySet());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ParameterTool that = (ParameterTool) o;
return Objects.equals(data, that.data) &&
Objects.equals(defaultData, that.defaultData) &&
Objects.equals(unrequestedParameters, that.unrequestedParameters);
}
@Override
public int hashCode() {
return Objects.hash(data, defaultData, unrequestedParameters);
}
@Override
public Map<String, String> toMap() {
return data;
}
//......
/**
* Returns {@link ParameterTool} for the given arguments. The arguments are keys followed by values.
* Keys have to start with '-' or '--'
*
* <p><strong>Example arguments:</strong>
* --key1 value1 --key2 value2 -key3 value3
*
* @param args Input array arguments
* @return A {@link ParameterTool}
*/
public static ParameterTool fromArgs(String[] args) {
final Map<String, String> map = new HashMap<>(args.length / 2);
int i = 0;
while (i < args.length) {
final String key;
if (args[i].startsWith("--")) {
key = args[i].substring(2);
} else if (args[i].startsWith("-")) {
key = args[i].substring(1);
} else {
throw new IllegalArgumentException(
String.format("Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.",
Arrays.toString(args), args[i]));
}
if (key.isEmpty()) {
throw new IllegalArgumentException(
"The input " + Arrays.toString(args) + " contains an empty argument");
}
i += 1; // try to find the value
if (i >= args.length) {
map.put(key, NO_VALUE_KEY);
} else if (NumberUtils.isNumber(args[i])) {
map.put(key, args[i]);
i += 1;
} else if (args[i].startsWith("--") || args[i].startsWith("-")) {
// the argument cannot be a negative number because we checked earlier
// -> the next argument is a parameter name
map.put(key, NO_VALUE_KEY);
} else {
map.put(key, args[i]);
i += 1;
}
}
return fromMap(map);
}
/**
* Returns {@link ParameterTool} for the given {@link Properties} file.
*
* @param path Path to the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(String path) throws IOException {
File propertiesFile = new File(path);
return fromPropertiesFile(propertiesFile);
}
/**
* Returns {@link ParameterTool} for the given {@link Properties} file.
*
* @param file File object to the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(File file) throws IOException {
if (!file.exists()) {
throw new FileNotFoundException("Properties file " + file.getAbsolutePath() + " does not exist");
}
try (FileInputStream fis = new FileInputStream(file)) {
return fromPropertiesFile(fis);
}
}
/**
* Returns {@link ParameterTool} for the given InputStream from {@link Properties} file.
*
* @param inputStream InputStream from the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(InputStream inputStream) throws IOException {
Properties props = new Properties();
props.load(inputStream);
return fromMap((Map) props);
}
/**
* Returns {@link ParameterTool} for the given map.
*
* @param map A map of arguments. Both Key and Value have to be Strings
* @return A {@link ParameterTool}
*/
public static ParameterTool fromMap(Map<String, String> map) {
Preconditions.checkNotNull(map, "Unable to initialize from empty map");
return new ParameterTool(map);
}
/**
* Returns {@link ParameterTool} from the system properties.
* Example on how to pass system properties:
* -Dkey1=value1 -Dkey2=value2
*
* @return A {@link ParameterTool}
*/
public static ParameterTool fromSystemProperties() {
return fromMap((Map) System.getProperties());
}
//......
/**
* Returns the String value for the given key.
* If the key does not exist it will return null.
*/
public String get(String key) {
addToDefaults(key, null);
unrequestedParameters.remove(key);
return data.get(key);
}
/**
* Returns the String value for the given key.
* If the key does not exist it will throw a {@link RuntimeException}.
*/
public String getRequired(String key) {
addToDefaults(key, null);
String value = get(key);
if (value == null) {
throw new RuntimeException("No data for required key '" + key + "'");
}
return value;
}
/**
* Returns the String value for the given key.
* If the key does not exist it will return the given default value.
*/
public String get(String key, String defaultValue) {
addToDefaults(key, defaultValue);
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return value;
}
}
/**
* Check if value is set.
*/
public boolean has(String value) {
addToDefaults(value, null);
unrequestedParameters.remove(value);
return data.containsKey(value);
}
// -------------- Integer
/**
* Returns the Integer value for the given key.
* The method fails if the key does not exist or the value is not an Integer.
*/
public int getInt(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Integer.parseInt(value);
}
/**
* Returns the Integer value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not an Integer.
*/
public int getInt(String key, int defaultValue) {
addToDefaults(key, Integer.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
}
return Integer.parseInt(value);
}
// -------------- LONG
/**
* Returns the Long value for the given key.
* The method fails if the key does not exist.
*/
public long getLong(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Long.parseLong(value);
}
/**
* Returns the Long value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Long.
*/
public long getLong(String key, long defaultValue) {
addToDefaults(key, Long.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
}
return Long.parseLong(value);
}
// -------------- FLOAT
/**
* Returns the Float value for the given key.
* The method fails if the key does not exist.
*/
public float getFloat(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Float.valueOf(value);
}
/**
* Returns the Float value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Float.
*/
public float getFloat(String key, float defaultValue) {
addToDefaults(key, Float.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Float.valueOf(value);
}
}
// -------------- DOUBLE
/**
* Returns the Double value for the given key.
* The method fails if the key does not exist.
*/
public double getDouble(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Double.valueOf(value);
}
/**
* Returns the Double value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Double.
*/
public double getDouble(String key, double defaultValue) {
addToDefaults(key, Double.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Double.valueOf(value);
}
}
// -------------- BOOLEAN
/**
* Returns the Boolean value for the given key.
* The method fails if the key does not exist.
*/
public boolean getBoolean(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Boolean.valueOf(value);
}
/**
* Returns the Boolean value for the given key. If the key does not exists it will return the default value given.
* The method returns whether the string of the value is "true" ignoring cases.
*/
public boolean getBoolean(String key, boolean defaultValue) {
addToDefaults(key, Boolean.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Boolean.valueOf(value);
}
}
// -------------- SHORT
/**
* Returns the Short value for the given key.
* The method fails if the key does not exist.
*/
public short getShort(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Short.valueOf(value);
}
/**
* Returns the Short value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Short.
*/
public short getShort(String key, short defaultValue) {
addToDefaults(key, Short.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Short.valueOf(value);
}
}
// -------------- BYTE
/**
* Returns the Byte value for the given key.
* The method fails if the key does not exist.
*/
public byte getByte(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Byte.valueOf(value);
}
/**
* Returns the Byte value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Byte.
*/
public byte getByte(String key, byte defaultValue) {
addToDefaults(key, Byte.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Byte.valueOf(value);
}
}
//......
}
- ParameterTool里头有data、defaultData、unrequestedParameters等属性,toMap方法返回的是data属性
- ParameterTool提供了fromPropertiesFile、fromArgs、fromSystemProperties、fromMap静态方法用于创建ParameterTool
- ParameterTool提供了get、getRequired、getInt、getLong、getFloat、getDouble、getBoolean、getShort、getByte等方法,每种类型的get均提供了一个支持defaultValue的方法
小结
- ParameterTool提供了fromPropertiesFile、fromArgs、fromSystemProperties、fromMap静态方法用于创建ParameterTool
- ParameterTool提供了get、getRequired、getInt、getLong、getFloat、getDouble、getBoolean、getShort、getByte等方法,每种类型的get均提供了一个支持defaultValue的方法
- ParameterTool继承了ExecutionConfig.GlobalJobParameters,其toMap方法返回的是data属性;使用env.getConfig().setGlobalJobParameters可以将ParameterTool的访问范围设置为global
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Masterminds of Programming
Federico Biancuzzi、Chromatic / O'Reilly Media / 2009-03-27 / USD 39.99
Description Masterminds of Programming features exclusive interviews with the creators of several historic and highly influential programming languages. Think along with Adin D. Falkoff (APL), Jame......一起来看看 《Masterminds of Programming》 这本书的介绍吧!