一、Flink概述
Apache Flink是一個分散式計算引擎,能夠處理有限數據流和無限數據流。 它可以為Spark,Hadoop和Storm等現有框架提供替代方案,並提供更好的性能和可擴展性。
Flink提供了一種稱為流處理的新型計算模式,這種計算模式可以處理事件流,並支持非常低的延遲和高的吞吐量。 Flink還提供了存儲和處理批處理數據的能力。 因此,Flink可以處理所有類型的數據流,從簡單的請求響應到數據挖掘和複雜的事件處理。
二、WordCount的概念
WordCount是一個可以統計文檔中每個單詞出現次數的程序。 它是 分散式系統的”Hello, World!”。 在分散式系統中,您需要處理文件或流,並將其劃分為片段,以在多個節點上並行處理。 在WordCount示例中,文件被劃分為「行」(每行文本),並且每個節點處理一部分數據(各行文本的單詞計數)。
三、Flink實現WordCount
1、環境設置
首先,在您的本地機器上安裝Java,然後下載並安裝Flink。 flinkwordcount程序可以在本地或分散式集群上運行。
2、創建Flink項目
使用Eclipse或IntelliJ等任何一個Java IDE,創建一個Maven項目,該項目包含所需的Flink依賴項。 您可以使用以下依賴關係在POM.xml中獲取所需的庫:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>1.14.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
</dependency>
3、實現Flink WordCount代碼
下面展示了Flink WordCount的代碼:
package com.flink.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class FlinkWordCount {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
DataStream text;
final StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
text = see.readTextFile(params.get("input"));
DataStream counts = text
.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) {
for (String word : value.split("\\s")) {
out.collect(new WordCount(word, 1));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count");
counts.writeAsText(params.get("output"), WriteMode.OVERWRITE);
see.execute();
}
public static class WordCount {
public String word;
public int count;
public WordCount() {}
public WordCount(String word, int count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
4、代碼解析
首先,我們從參數工具創建帶有ParameterTool的ExecutionEnvironment對象。 然後,我們讀取指定的輸入文件並使用flatMap函數將其轉換為鍵值對。 我們使用keyBy函數對WordCount(word,count)鍵入過的流”word”進行分組,並使累加器累加值。 該代碼使用窗口運算符,它將一定量的數據視為一個數據集,並在整個數據集上應用函數計算。
5、編譯和運行
編譯和運行flinkwordcount程序,使用以下命令:
./bin/flink run /path/to/flinkwordcount-0.0.1.jar --input /path/to/input/file --output /path/to/output/directory
6、結果輸出
運行flinkwordcount程序後,它將在指定的輸出目錄中生成一個文本文件,並在其中提供WordCount結果。以下是輸出文件的示例:
(hello, 1)
(world, 1)
(hello, 1)
四、總結
在本文中,我們學習了Apache Flink,並看到如何使用它來實現分散式系統中的WordCount程序。 Flink提供了一種稱為流處理的新型計算模式,該計算模式可以處理事件流,並支持非常低的延遲和高的吞吐量。 我們看到了如何使用flinkwordcount程序來計算文檔中每個單詞的出現次數。希望這篇文章對您有所幫助。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/283656.html