本文目錄一覽:
- 1、flink 使用什麼語言開發的
- 2、怎麼在java的flink中調用python程序?
- 3、Apache Flink現在在大數據處理方面能夠和Apache Spark分庭抗禮么
- 4、轉載:阿里巴巴為什麼選擇Apache Flink?
flink 使用什麼語言開發的
方法:
1:Redhat系統或者Fedora或者CentOs的Linux發行版,那麼在Linux終端輸入命令回車:
su – root
這樣就可以切換到root許可權了
2:Ubuntu系統,在Linux終端輸入命令回車:
sudo su – root
然後這樣也可以切換到root許可權了.
怎麼在java的flink中調用python程序?
一、在java類中直接執行python語句
import org.python.util.PythonInterpreter;
public class FirstJavaScript {
public static void main(String args[]) {
PythonInterpreter interpreter = new PythonInterpreter();
interpreter.exec(“days=(‘mod’,’Tue’,’Wed’,’Thu’,’Fri’,’Sat’,’Sun’); “);
interpreter.exec(“print days[1];”);
}// main
}
調用的結果是Tue,在控制台顯示出來,這是直接進行調用的。
二、在java中調用本機python腳本中的函數
首先建立一個python腳本,名字為:my_utils.py
def adder(a, b):
return a + b
然後建立一個java類,用來測試,
java類代碼 FirstJavaScript:
import org.python.core.PyFunction;
import org.python.core.PyInteger;
import org.python.core.PyObject;
import org.python.util.PythonInterpreter;
public class FirstJavaScript {
public static void main(String args[]) {
PythonInterpreter interpreter = new PythonInterpreter();
interpreter.execfile(“C:\\Python27\\programs\\my_utils.py”);
PyFunction func = (PyFunction) interpreter.get(“adder”,
PyFunction.class);
int a = 2010, b = 2;
PyObject pyobj = func.__call__(new PyInteger(a), new PyInteger(b));
System.out.println(“anwser = ” + pyobj.toString());
}// main
}
得到的結果是:anwser = 2012
三、使用java直接執行python腳本
建立腳本inputpy
#open files
print ‘hello’
number=[3,5,2,0,6]
print number
number.sort()
print number
number.append(0)
print number
print number.count(0)
print number.index(5)
建立java類,調用這個腳本:
import org.python.util.PythonInterpreter;
public class FirstJavaScript {
public static void main(String args[]) {
PythonInterpreter interpreter = new PythonInterpreter();
interpreter.execfile(“C:\\Python27\\programs\\input.py”);
}// main
}
得到的結果是:
hello
[3, 5, 2, 0, 6]
[0, 2, 3, 5, 6]
[0, 2, 3, 5, 6, 0]
2
3
Apache Flink現在在大數據處理方面能夠和Apache Spark分庭抗禮么
我們是否還需要另外一個新的數據處理引擎?當我第一次聽到flink的時候這是我是非常懷疑的。在大數據領域,現在已經不缺少數據處理框架了,但是沒有一個框架能夠完全滿足不同的處理需求。自從Apache spark出現後,貌似已經成為當今把大部分的問題解決得最好的框架了,所以我對另外一款解決類似問題的框架持有很強烈的懷疑態度。
不過因為好奇,我花費了數個星期在嘗試了解flink。一開始仔細看了flink的幾個例子,感覺和spark非常類似,心理就傾向於認為flink又是一個模仿spark的框架。但是隨著了解的深入,這些API體現了一些flink的新奇的思路,這些思路還是和spark有著比較明顯的區別的。我對這些思路有些著迷了,所以花費了更多的時間在這上面。
flink中的很多思路,例如內存管理,dataset API都已經出現在spark中並且已經證明 這些思路是非常靠譜的。所以,深入了解flink也許可以幫助我們分散式數據處理的未來之路是怎樣的
在後面的文章里,我會把自己作為一個spark開發者對flink的第一感受寫出來。因為我已經在spark上幹了2年多了,但是只在flink上接觸了2到3周,所以必然存在一些bias,所以大家也帶著懷疑和批判的角度來看這篇文章吧。
Apache Flink是什麼
flink是一款新的大數據處理引擎,目標是統一不同來源的數據處理。這個目標看起來和spark和類似。沒錯,flink也在嘗試解決spark在解決的問題。這兩套系統都在嘗試建立一個統一的平台可以運行批量,流式,互動式,圖處理,機器學習等應用。所以,flink和spark的目標差別並不大,他們最主要的區別在於實現的細節。
後面我會重點從不同的角度對比這兩者。
Apache Spark vs Apache Flink
1.抽象 Abstraction
spark中,對於批處理我們有RDD,對於流式,我們有DStream,不過內部實際還是RDD.所以所有的數據表示本質上還是RDD抽象。
後面我會重點從不同的角度對比這兩者。在flink中,對於批處理有DataSet,對於流式我們有DataStreams。看起來和spark類似,他們的不同點在於:
一)DataSet在運行時是表現為運行計劃(runtime plans)的
在spark中,RDD在運行時是表現為java objects的。通過引入Tungsten,這塊有了些許的改變。但是在flink中是被表現為logical plan(邏輯計劃)的,聽起來很熟悉?沒錯,就是類似於spark中的dataframes。所以在flink中你使用的類Dataframe api是被作為第一優先順序來優化的。但是相對來說在spark RDD中就沒有了這塊的優化了。
flink中的Dataset,對標spark中的Dataframe,在運行前會經過優化。
在spark 1.6,dataset API已經被引入spark了,也許最終會取代RDD 抽象。
二)Dataset和DataStream是獨立的API
在spark中,所有不同的API,例如DStream,Dataframe都是基於RDD抽象的。但是在flink中,Dataset和DataStream是同一個公用的引擎之上兩個獨立的抽象。所以你不能把這兩者的行為合併在一起操作,當然,flink社區目前在朝這個方向努力(),但是目前還不能輕易斷言最後的結果。
2.內存管理
一直到1.5版本,spark都是試用java的內存管理來做數據緩存,明顯很容易導致OOM或者gc。所以從1.5開始,spark開始轉向精確的控制內存的使用,這就是tungsten項目了
flink從第一天開始就堅持自己控制內存試用。這個也是啟發了spark走這條路的原因之一。flink除了把數據存在自己管理的內存以外,還直接操作二進位數據。在spark中,從1.5開始,所有的dataframe操作都是直接作用在tungsten的二進位數據上。
3.語言實現
spark是用scala來實現的,它提供了Java,Python和R的編程介面。
flink是java實現的,當然同樣提供了Scala API
所以從語言的角度來看,spark要更豐富一些。因為我已經轉移到scala很久了,所以不太清楚這兩者的java api實現情況。
4.API
spark和flink都在模仿scala的collection API.所以從表面看起來,兩者都很類似。下面是分別用RDD和DataSet API實現的word count
// Spark wordcount
object WordCount {
def main(args: Array[String]) {
val env = new SparkContext(“local”,”wordCount”)
val data = List(“hi”,”how are you”,”hi”)
val dataSet = env.parallelize(data)
val words = dataSet.flatMap(value = value.split(“\\s+”))
val mappedWords = words.map(value = (value,1))
val sum = mappedWords.reduceByKey(_+_)
println(sum.collect())
}
}
// Flink wordcount
object WordCount {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val data = List(“hi”,”how are you”,”hi”)
val dataSet = env.fromCollection(data)
val words = dataSet.flatMap(value = value.split(“\\s+”))
val mappedWords = words.map(value = (value,1))
val grouped = mappedWords.groupBy(0)
val sum = grouped.sum(1)
println(sum.collect())
}
}
不知道是偶然還是故意的,API都長得很像,這樣很方便開發者從一個引擎切換到另外一個引擎。我感覺以後這種Collection API會成為寫data pipeline的標配。
Steaming
spark把streaming看成是更快的批處理,而flink把批處理看成streaming的special case。這裡面的思路決定了各自的方向,其中兩者的差異點有如下這些:
實時 vs 近實時的角度
flink提供了基於每個事件的流式處理機制,所以可以被認為是一個真正的流式計算。它非常像storm的model。
而spark,不是基於事件的粒度,而是用小批量來模擬流式,也就是多個事件的集合。所以spark被認為是近實時的處理系統。
Spark streaming 是更快的批處理,而Flink Batch是有限數據的流式計算。
雖然大部分應用對準實時是可以接受的,但是也還是有很多應用需要event level的流式計算。這些應用更願意選擇storm而非spark streaming,現在,flink也許是一個更好的選擇。
流式計算和批處理計算的表示
spark對於批處理和流式計算,都是用的相同的抽象:RDD,這樣很方便這兩種計算合併起來表示。而flink這兩者分為了DataSet和DataStream,相比spark,這個設計算是一個糟糕的設計。
對 windowing 的支持
因為spark的小批量機制,spark對於windowing的支持非常有限。只能基於process time,且只能對batches來做window。
而Flink對window的支持非常到位,且Flink對windowing API的支持是相當給力的,允許基於process time,data time,record 來做windowing。
我不太確定spark是否能引入這些API,不過到目前為止,Flink的windowing支持是要比spark好的。
Steaming這部分flink勝
SQL interface
目前spark-sql是spark裡面最活躍的組件之一,Spark提供了類似Hive的sql和Dataframe這種DSL來查詢結構化數據,API很成熟,在流式計算中使用很廣,預計在流式計算中也會發展得很快。
至於flink,到目前為止,Flink Table API只支持類似DataFrame這種DSL,並且還是處於beta狀態,社區有計劃增加SQL 的interface,但是目前還不確定什麼時候才能在框架中用上。
所以這個部分,spark勝出。
Data source Integration
Spark的數據源 API是整個框架中最好的,支持的數據源包括NoSql db,parquet,ORC等,並且支持一些高級的操作,例如predicate push down
Flink目前還依賴map/reduce InputFormat來做數據源聚合。
這一場spark勝
Iterative processing
spark對機器學習的支持較好,因為可以在spark中利用內存cache來加速機器學習演算法。
但是大部分機器學習演算法其實是一個有環的數據流,但是在spark中,實際是用無環圖來表示的,一般的分散式處理引擎都是不鼓勵試用有環圖的。
但是flink這裡又有點不一樣,flink支持在runtime中的有環數據流,這樣表示機器學習演算法更有效而且更有效率。
這一點flink勝出。
Stream as platform vs Batch as Platform
Spark誕生在Map/Reduce的時代,數據都是以文件的形式保存在磁碟中,這樣非常方便做容錯處理。
Flink把純流式數據計算引入大數據時代,無疑給業界帶來了一股清新的空氣。這個idea非常類似akka-streams這種。
成熟度
目前的確有一部分吃螃蟹的用戶已經在生產環境中使用flink了,不過從我的眼光來看,Flink還在發展中,還需要時間來成熟。
結論
目前Spark相比Flink是一個更為成熟的計算框架,但是Flink的很多思路很不錯,Spark社區也意識到了這一點,並且逐漸在採用Flink中的好的設計思路,所以學習一下Flink能讓你了解一下Streaming這方面的更迷人的思路。
轉載:阿里巴巴為什麼選擇Apache Flink?
本文主要整理自阿里巴巴計算平台事業部資深技術專家莫問在雲棲大會的演講。
合抱之木,生於毫末
隨著人工智慧時代的降臨,數據量的爆發,在典型的大數據的業務場景下數據業務最通用的做法是:選用批處理的技術處理全量數據,採用流式計算處理實時增量數據。在絕大多數的業務場景之下,用戶的業務邏輯在批處理和流處理之中往往是相同的。但是,用戶用於批處理和流處理的兩套計算引擎是不同的。
因此,用戶通常需要寫兩套代碼。毫無疑問,這帶來了一些額外的負擔和成本。阿里巴巴的商品數據處理就經常需要面對增量和全量兩套不同的業務流程問題,所以阿里就在想,我們能不能有一套統一的大數據引擎技術,用戶只需要根據自己的業務邏輯開發一套代碼。這樣在各種不同的場景下,不管是全量數據還是增量數據,亦或者實時處理,一套方案即可全部支持, 這就是阿里選擇Flink的背景和初衷 。
目前開源大數據計算引擎有很多選擇,流計算如Storm,Samza,Flink,Kafka Stream等,批處理如Spark,Hive,Pig,Flink等。而同時支持流處理和批處理的計算引擎,只有兩種選擇:一個是Apache Spark,一個是Apache Flink。
從技術,生態等各方面的綜合考慮。首先,Spark的技術理念是基於批來模擬流的計算。而Flink則完全相反,它採用的是基於流計算來模擬批計算。
從技術發展方向看,用批來模擬流有一定的技術局限性,並且這個局限性可能很難突破。而Flink基於流來模擬批,在技術上有更好的擴展性。從長遠來看,阿里決定用Flink做一個統一的、通用的大數據引擎作為未來的選型。
Flink是一個低延遲、高吞吐、統一的大數據計算引擎。在阿里巴巴的生產環境中,Flink的計算平台可以實現毫秒級的延遲情況下,每秒鐘處理上億次的消息或者事件。同時Flink提供了一個Exactly-once的一致性語義。保證了數據的正確性。這樣就使得Flink大數據引擎可以提供金融級的數據處理能力。
Flink在阿里的現狀
基於Apache Flink在阿里巴巴搭建的平台於2016年正式上線,並從阿里巴巴的搜索和推薦這兩大場景開始實現。目前阿里巴巴所有的業務,包括阿里巴巴所有子公司都採用了基於Flink搭建的實時計算平台。同時Flink計算平台運行在開源的Hadoop集群之上。採用Hadoop的YARN做為資源管理調度,以 HDFS作為數據存儲。因此,Flink可以和開源大數據軟體Hadoop無縫對接。
目前,這套基於Flink搭建的實時計算平台不僅服務於阿里巴巴集團內部,而且通過阿里雲的雲產品API向整個開發者生態提供基於Flink的雲產品支持。
Flink在阿里巴巴的大規模應用,表現如何?
規模: 一個系統是否成熟,規模是重要指標,Flink最初上線阿里巴巴只有數百台伺服器,目前規模已達上萬台,此等規模在全球範圍內也是屈指可數;
狀態數據: 基於Flink,內部積累起來的狀態數據已經是PB級別規模;
Events: 如今每天在Flink的計算平台上,處理的數據已經超過萬億條;
PS: 在峰值期間可以承擔每秒超過4.72億次的訪問,最典型的應用場景是阿里巴巴雙11大屏;
Flink的發展之路
接下來從開源技術的角度,來談一談Apache Flink是如何誕生的,它是如何成長的?以及在成長的這個關鍵的時間點阿里是如何進入的?並對它做出了那些貢獻和支持?
Flink誕生於歐洲的一個大數據研究項目StratoSphere。該項目是柏林工業大學的一個研究性項目。早期,Flink是做Batch計算的,但是在2014年,StratoSphere裡面的核心成員孵化出Flink,同年將Flink捐贈Apache,並在後來成為Apache的頂級大數據項目,同時Flink計算的主流方向被定位為Streaming,即用流式計算來做所有大數據的計算,這就是Flink技術誕生的背景。
2014年Flink作為主攻流計算的大數據引擎開始在開源大數據行業內嶄露頭角。區別於Storm,Spark Streaming以及其他流式計算引擎的是:它不僅是一個高吞吐、低延遲的計算引擎,同時還提供很多高級的功能。比如它提供了有狀態的計算,支持狀態管理,支持強一致性的數據語義以及支持Event Time,WaterMark對消息亂序的處理。
Flink核心概念以及基本理念
Flink最區別於其他流計算引擎的,其實就是狀態管理。
什麼是狀態?例如開發一套流計算的系統或者任務做數據處理,可能經常要對數據進行統計,如Sum,Count,Min,Max,這些值是需要存儲的。因為要不斷更新,這些值或者變數就可以理解為一種狀態。如果數據源是在讀取Kafka,RocketMQ,可能要記錄讀取到什麼位置,並記錄Offset,這些Offset變數都是要計算的狀態。
Flink提供了內置的狀態管理,可以把這些狀態存儲在Flink內部,而不需要把它存儲在外部系統。這樣做的好處是第一降低了計算引擎對外部系統的依賴以及部署,使運維更加簡單;第二,對性能帶來了極大的提升:如果通過外部去訪問,如Redis,HBase它一定是通過網路及RPC。如果通過Flink內部去訪問,它只通過自身的進程去訪問這些變數。同時Flink會定期將這些狀態做Checkpoint持久化,把Checkpoint存儲到一個分散式的持久化系統中,比如HDFS。這樣的話,當Flink的任務出現任何故障時,它都會從最近的一次Checkpoint將整個流的狀態進行恢復,然後繼續運行它的流處理。對用戶沒有任何數據上的影響。
Flink是如何做到在Checkpoint恢復過程中沒有任何數據的丟失和數據的冗餘?來保證精準計算的?
這其中原因是Flink利用了一套非常經典的Chandy-Lamport演算法,它的核心思想是把這個流計算看成一個流式的拓撲,定期從這個拓撲的頭部Source點開始插入特殊的Barries,從上游開始不斷的向下游廣播這個Barries。每一個節點收到所有的Barries,會將State做一次Snapshot,當每個節點都做完Snapshot之後,整個拓撲就算完整的做完了一次Checkpoint。接下來不管出現任何故障,都會從最近的Checkpoint進行恢復。
Flink利用這套經典的演算法,保證了強一致性的語義。這也是Flink與其他無狀態流計算引擎的核心區別。
下面介紹Flink是如何解決亂序問題的。比如星球大戰的播放順序,如果按照上映的時間觀看,可能會發現故事在跳躍。
在流計算中,與這個例子是非常類似的。所有消息到來的時間,和它真正發生在源頭,在線系統Log當中的時間是不一致的。在流處理當中,希望是按消息真正發生在源頭的順序進行處理,不希望是真正到達程序里的時間來處理。Flink提供了Event Time和WaterMark的一些先進技術來解決亂序的問題。使得用戶可以有序的處理這個消息。這是Flink一個很重要的特點。
接下來要介紹的是Flink啟動時的核心理念和核心概念,這是Flink發展的第一個階段;第二個階段時間是2015年和2017年,這個階段也是Flink發展以及阿里巴巴介入的時間。故事源於2015年年中,我們在搜索事業部的一次調研。當時阿里有自己的批處理技術和流計算技術,有自研的,也有開源的。但是,為了思考下一代大數據引擎的方向以及未來趨勢,我們做了很多新技術的調研。
結合大量調研結果,我們最後得出的結論是:解決通用大數據計算需求,批流融合的計算引擎,才是大數據技術的發展方向,並且最終我們選擇了Flink。
但2015年的Flink還不夠成熟,不管是規模還是穩定性尚未經歷實踐。最後我們決定在阿里內部建立一個Flink分支,對Flink做大量的修改和完善,讓其適應阿里巴巴這種超大規模的業務場景。在這個過程當中,我們團隊不僅對Flink在性能和穩定性上做出了很多改進和優化,同時在核心架構和功能上也進行了大量創新和改進,並將其貢獻給社區,例如:Flink新的分散式架構,增量Checkpoint機制,基於Credit-based的網路流控機制和Streaming SQL等。
阿里巴巴對Flink社區的貢獻
我們舉兩個設計案例,第一個是阿里巴巴重構了Flink的分散式架構,將Flink的Job調度和資源管理做了一個清晰的分層和解耦。這樣做的首要好處是Flink可以原生的跑在各種不同的開源資源管理器上。經過這套分散式架構的改進,Flink可以原生地跑在Hadoop Yarn和Kubernetes這兩個最常見的資源管理系統之上。同時將Flink的任務調度從集中式調度改為了分散式調度,這樣Flink就可以支持更大規模的集群,以及得到更好的資源隔離。
另一個是實現了增量的Checkpoint機制,因為Flink提供了有狀態的計算和定期的Checkpoint機制,如果內部的數據越來越多,不停地做Checkpoint,Checkpoint會越來越大,最後可能導致做不出來。提供了增量的Checkpoint後,Flink會自動地發現哪些數據是增量變化,哪些數據是被修改了。同時只將這些修改的數據進行持久化。這樣Checkpoint不會隨著時間的運行而越來越難做,整個系統的性能會非常地平穩,這也是我們貢獻給社區的一個很重大的特性。
經過2015年到2017年對Flink Streaming的能力完善,Flink社區也逐漸成熟起來。Flink也成為在Streaming領域最主流的計算引擎。因為Flink最早期想做一個流批統一的大數據引擎,2018年已經啟動這項工作,為了實現這個目標,阿里巴巴提出了新的統一API架構,統一SQL解決方案,同時流計算的各種功能得到完善後,我們認為批計算也需要各種各樣的完善。無論在任務調度層,還是在數據Shuffle層,在容錯性,易用性上,都需要完善很多工作。
篇幅原因,下面主要和大家分享兩點:
● 統一 API Stack
● 統一 SQL方案
先來看下目前Flink API Stack的一個現狀,調研過Flink或者使用過Flink的開發者應該知道。Flink有2套基礎的API,一套是DataStream,一套是DataSet。DataStream API是針對流式處理的用戶提供,DataSet API是針對批處理用戶提供,但是這兩套API的執行路徑是完全不一樣的,甚至需要生成不同的Task去執行。所以這跟得到統一的API是有衝突的,而且這個也是不完善的,不是最終的解法。在Runtime之上首先是要有一個批流統一融合的基礎API層,我們希望可以統一API層。
因此,我們在新架構中將採用一個DAG(有限無環圖)API,作為一個批流統一的API層。對於這個有限無環圖,批計算和流計算不需要涇渭分明的表達出來。只需要讓開發者在不同的節點,不同的邊上定義不同的屬性,來規劃數據是流屬性還是批屬性。整個拓撲是可以融合批流統一的語義表達,整個計算無需區分是流計算還是批計算,只需要表達自己的需求。有了這套API後,Flink的API Stack將得到統一。
除了統一的基礎API層和統一的API Stack外,同樣在上層統一SQL的解決方案。流和批的SQL,可以認為流計算有數據源,批計算也有數據源,我們可以將這兩種源都模擬成數據表。可以認為流數據的數據源是一張不斷更新的數據表,對於批處理的數據源可以認為是一張相對靜止的表,沒有更新的數據表。整個數據處理可以當做SQL的一個Query,最終產生的結果也可以模擬成一個結果表。
對於流計算而言,它的結果表是一張不斷更新的結果表。對於批處理而言,它的結果表是相當於一次更新完成的結果表。從整個SOL語義上表達,流和批是可以統一的。此外,不管是流式SQL,還是批處理SQL,都可以用同一個Query來表達復用。這樣以來流批都可以用同一個Query優化或者解析。甚至很多流和批的運算元都是可以復用的。
Flink的未來方向
首先,阿里巴巴還是要立足於Flink的本質,去做一個全能的統一大數據計算引擎。將它在生態和場景上進行落地。目前Flink已經是一個主流的流計算引擎,很多互聯網公司已經達成了共識:Flink是大數據的未來,是最好的流計算引擎。下一步很重要的工作是讓Flink在批計算上有所突破。在更多的場景下落地,成為一種主流的批計算引擎。然後進一步在流和批之間進行無縫的切換,流和批的界限越來越模糊。用Flink,在一個計算中,既可以有流計算,又可以有批計算。
第二個方向就是Flink的生態上有更多語言的支持,不僅僅是Java,Scala語言,甚至是機器學習下用的Python,Go語言。未來我們希望能用更多豐富的語言來開發Flink計算的任務,來描述計算邏輯,並和更多的生態進行對接。
最後不得不說AI,因為現在很多大數據計算的需求和數據量都是在支持很火爆的AI場景,所以在Flink流批生態完善的基礎上,將繼續往上走,完善上層Flink的Machine Learning演算法庫,同時Flink往上層也會向成熟的機器學習,深度學習去集成。比如可以做Tensorflow On Flink, 讓大數據的ETL數據處理和機器學習的Feature計算和特徵計算,訓練的計算等進行集成,讓開發者能夠同時享受到多種生態給大家帶來的好處。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/279859.html