@rickyChen
2017-03-03T17:23:40.000000Z
字数 2650
阅读 2699
Java
JavaDStream<String> lines = messages.map(s -> s.substring(0, 5))
// Function[T1, R]
JavaDStream<String> lines = messages.map(new Function<String, String>() {
// call(v1: T1): R
public String call(String s) {
return s.substring(0, 5);
}
});
class GetLength implements Function<String, int> {
public Inter call(String s) { return s.length(); }
}
JavaDStream<String> lineLengths = lines.map(new GetLength())
int totalLength = lineLengths.reduce((a, b) -> a + b)
// Function2[T1, T2, R]
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}
int totalLength = lineLengths.reduce(new Sum());
JavaPairDStream<String, Integer> wordCounts = wordCount.reduceByKey((a, b) -> a + b);
// Function2[T1, T2, R]
JavaPairDStream<String, Integer> wordCounts = wordCount.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
JavaDStream<String> words = lines.flatMap(x -> Lists.newArrayList(x.split(" ")));
// FlatMapFunction[T, R]
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
// call(t: T): Iterator[R]
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaDStream<Tuple2<String, String>> mapLines = lines.mapPartitions(parts -> {
List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
while(parts.hasNext()){
String msg = parts.next();
String ip = msg.split(" ")[0];
String domain = msg.split(" ")[1];
list.add(new Tuple2<String, String>(ip, domain));
};
return list;
});
// FlatMapFunction[T, R]
JavaDStream<Tuple2<String, String>> mapLines = lines.mapPartitions(
new FlatMapFunction<Iterator<String>, Tuple2<String, String>>() {
List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
// call(t: T): Iterator[R]
public Iterable<Tuple2<String, String>> call(Iterator<String> s){
while(s.hasNext()){
String msg = s.next();
String ip = msg.split(" ")[0];
String domain = msg.split(" ")[1];
list.add(new Tuple2<String, String>(ip, domain));
}
return list;
}
}
);
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2(s, 1));
// PairFunction[T, K, V]
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
// call(t: T): (K, V)
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});