java - Apache Flink Streaming window WordCount -


i have following code count words sockettextstream. both cumulate word counts , time windowed word counts needed. program has issue cumulatecounts same windowed counts. why issue occurs? correct way calculate cumulate counts base on windowed counts?

streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment(); final hashmap<string, integer> cumulatecounts = new hashmap<string, integer>();  final datastream<tuple2<string, integer>> counts = env             .sockettextstream("localhost", 9999)             .flatmap(new splitter())             .window(time.of(5, timeunit.seconds))             .groupby(0).sum(1)             .flatten();  counts.print();  counts.addsink(new sinkfunction<tuple2<string, integer>>() {     @override     public void invoke(tuple2<string, integer> value) throws exception {         string word = value.f0;         integer delta_count = value.f1;         integer count = cumulatecounts.get(word);         if (count == null)             count = 0;         count = count + delta_count;         cumulatecounts.put(word, count);         system.out.println("(" + word + "," + count.tostring() + ")");     } }); 

you should first group-by, , apply window on keyed data stream (your code works on flink 0.9.1 new api in flink 0.10.0 strict this):

final datastream<tuple2<string, integer>> counts = env         .sockettextstream("localhost", 9999)         .flatmap(new splitter())         .groupby(0)         .window(time.of(5, timeunit.seconds)).sum(1)         .flatten(); 

if apply window on non-keyed data stream, there single threaded window operator on single machine (ie, no parallelism) build window on whole stream (in flink 0.9.1, global window can split sub-windows groupby() -- however, in flink 0.10.0 not work more). counts words, want build window each distinct key value, ie, first sub-stream per key value (via groupby()) , apply window operator on each sub stream (thus, have own window operator instance each sub-stream, allowing parallel execution).

for global (cumulated) count, can simple apply groupby().sum() construct. first, stream split sub-stream (one each key value). second, compute sum on stream. because stream not windowed, sum in computed (cumulative) , updated each incoming tuple (in more details, sum has initial result value of 0 , result updated each tuple result += tuple.value). after each invocation of sum, new current result emitted.

in code, should not use special sink function follows:

counts.groupby(0).sum(1).print(); 

Comments

Popular posts from this blog

matlab - error with cyclic autocorrelation function -

django - (fields.E300) Field defines a relation with model 'AbstractEmailUser' which is either not installed, or is abstract -

c# - What is a good .Net RefEdit control to use with ExcelDna? -