一 .spark 安裝配置
註:按照自己saprk 安裝包版本安裝
1、下載,解壓
$tar zxf spark-1.6.1-bin-2.5.0-cdh5.3.6.tgz
-C /opt/modules/
2、配置
重命名mv spark-env.sh.template spark-env.sh
JAVA_HOME=/opt/modlues/jdk1.7.0_67
SCALA_HOME=/opt/modlues/scala-2.10.4
HADOOP_CONF_DIR=
/opt/modlues/hadoop-2.5.0/etc/hadoop
3、啟動HDFS服務
$sbin/hadoop-daemon.sh start namenode
$sbin/hadoop-daemon.sh start datanode
$sbin/start-master.sh
$start-slaves.sh
4、常見問題
命令強制離開
$bin/hadoop dfsadmin -safemode leave
二、實現WordCount
//hdfs path
方式一
val rdd = sc.textFile(“/input/wc.txt”)
//這是在內存中處理 flatmap函數 這是匿名函數
按行讀取分割split
val wcrdd=rdd.flatMap(line=>line.split(“\t”)).map(word
=>(word,1)).reduceByKey((a,b)=>(a+b))
//保存到HDFS
wcrdd.saveAsTextFile(“wc-spark”)
方式二
val wcrdd=sc.textFile(“/input/wc.txt”).flatMap(line
=>line.split(“\t”)).map(word=>(word,1)).reduceByKey((a,b)
=>(a+b)).saveAsTextFile(“wc-spark”)
方式三
val rdd = sc.textFile(“/input/wc.txt”)
//分割這個行
val linerdd=rdd.flatMap(line=>line.split(“\t”))
//元組對出現一個 就是一個元組對
val kvrdd=linerdd.map(word=>(word,1))
val wcrdd=kvrdd.reduceByKey((a,b)=>(a+b))
wcrdd.saveAsTextFile(“wc-spark”)
方式四
val rdd = sc.textFile(“/input/wc.txt”)
val wcrdd=rdd.flatMap(_.split(“\t”)).map
((_,1)).reduceByKey((_+_))
wcrdd.saveAsTextFile(“wc-spark”)
spark sc:SparkContext(上下文)
Spark context available as sc.
三、spark常用函數
過濾filter
val rdd = sc.textFile(“/input/wc.txt”).filter
(line=>line.contains(“hadoop”))
val wcrdd=rdd.flatMap(_.split(“\t”)).map
((_,1)).reduceByKey((_+_))
循環foreach
wcrdd.foreach(word=>println(word))
//查看函數
wcrdd.first
wcrdd.take(3)
wcrdd.top(3)
wcrdd.collect 查看結果
wcrdd.count
//緩存
wcrdd.cache
wcrdd.count//使緩存生效
四、spark 二次排序
分析:(spark,2)=》(2,spark)》sortByKey(false)=》
(2,spark)=》(spark,2)
代碼:
val rdd = sc.textFile(“/input/wc.txt”)
val wcrdd=rdd.flatMap(_.split(“\t”)).map
((_,1)).reduceByKey((_+_))
val sortrdd=wcrdd.map(word=>(word._2,word._1)).
sortByKey(false).map(word=>(word._2,word._1))
sortrdd.collect
//top:自帶排序
val sortrdd=wcrdd.map(word=>(word._2,word._1)).top(3)
五、spark standalone集群模式的配置
1、sparp-evn
//主節點配置
SPARK_MASTER_IP=li(主機名)
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
//從節點配置
SPARK_WORKER_CORES=2
SPARK_WORKER_MEMORY=2g
SPARK_WORKER_PORT=7078
SPARK_WORKER_WEBUI_PORT=8081
SPARK_WORKER_INSTANCES=1
2、slaves
寫入hostname名稱:自己主機名
3、啟動服務
$sbin/start-master.sh
$start-slaves.sh
4、spark-shell常用命令行參數
幫助信息查看:
$bin/spark-shell –help
(1)spark運行在本地模式下:
$bin/spark-shell –master local
或者$bin/spark-shell
//本地
bin/spark-shell \
–class org.apache.spark.examples.SparkPi \
–master local \
–jars lib/spark-examples-1.6.1-hadoop2.5.0-cdh5.3.6.jar \
100
//集群
bin/spark-shell \
–class org.apache.spark.examples.SparkPi \
–master spark://bigdata.beifeng.com:7077 \
–jars lib/spark-examples-1.6.1-hadoop2.5.0-cdh5.3.6.jar \
100
spark-shell和spark-submit
一般執行腳本時使用spark-shell;
一般向集群提交job是使用spark-submit。
spark-shell的本質是spark-submit
spark-shell和spark-submit通知只能使用一個
//本地
bin/spark-submit \
–class org.apache.spark.examples.SparkPi \
–master local \
lib/spark-examples-1.6.1-hadoop2.5.0-cdh5.3.6.jar \
100
//集群
bin/spark-submit \
–class org.apache.spark.examples.SparkPi \
–master spark://bigdata.beifeng.com:7077 \
lib/spark-examples-1.6.1-hadoop2.5.0-cdh5.3.6.jar \
100
//spark開發時模式的使用
一般本地模式適合本地開發測試。集群模式適合生產環境
六、spark日誌聚合功能配置
1、spark-env.sh
SPARK_HISTORY_OPTS=”-Dspark.history.fs.logDirectory
=hdfs://192.168.1.1:8020/user/li/spark-events”
註:spark-events目錄需要創建
2、spark-default.conf
spark.eventLog.enabled
true
spark.eventLog.dir
hdfs://192.168.1.1:8020/user/li/spark-events
註:spark.eventLog.dir 目錄和
-Dspark.history.fs.logDirectory保持一致
3、啟動history服務
$sbin/start-history-server.sh
原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/274134.html
微信掃一掃
支付寶掃一掃