java - Apache Flink Streaming window WordCount -
i have following code count words sockettextstream. both cumulate word counts , time windowed word counts needed. program has issue cumulatecounts same windowed counts. why issue occurs? correct way calculate cumulate counts base on windowed counts?
streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment(); final hashmap<string, integer> cumulatecounts = new hashmap<string, integer>(); final datastream<tuple2<string, integer>> counts = env .sockettextstream("localhost", 9999) .flatmap(new splitter()) .window(time.of(5, timeunit.seconds)) .groupby(0).sum(1) .flatten(); counts.print(); counts.addsink(new sinkfunction<tuple2<string, integer>>() { @override public void invoke(tuple2<string, integer> value) throws exception { string word = value.f0; integer delta_count = value.f1; integer count = cumulatecounts.get(word); if (count == null) count = 0; count = count + delta_count; cumulatecounts.put(word, count); system.out.println("(" + word + "," + count.tostring() + ")"); } });
you should first group-by, , apply window on keyed data stream (your code works on flink 0.9.1 new api in flink 0.10.0 strict this):
final datastream<tuple2<string, integer>> counts = env .sockettextstream("localhost", 9999) .flatmap(new splitter()) .groupby(0) .window(time.of(5, timeunit.seconds)).sum(1) .flatten();
if apply window on non-keyed data stream, there single threaded window operator on single machine (ie, no parallelism) build window on whole stream (in flink 0.9.1, global window can split sub-windows groupby()
-- however, in flink 0.10.0 not work more). counts words, want build window each distinct key value, ie, first sub-stream per key value (via groupby()
) , apply window operator on each sub stream (thus, have own window operator instance each sub-stream, allowing parallel execution).
for global (cumulated) count, can simple apply groupby().sum()
construct. first, stream split sub-stream (one each key value). second, compute sum on stream. because stream not windowed, sum in computed (cumulative) , updated each incoming tuple (in more details, sum has initial result value of 0 , result updated each tuple result += tuple.value
). after each invocation of sum, new current result emitted.
in code, should not use special sink function follows:
counts.groupby(0).sum(1).print();
Comments
Post a Comment