spark笔记

未完待续


导读


RDD

RDD(弹性分布式数据集,Resilient Distributed DataSet)是spark的核心抽象。它是一个不可变的带分区的记录集合,RDD的数据可以分布在多个节点上,它也是spark中的编程模型。
RDD的API主要分为两类:

  • transformation(转换)
    • 转换用于从一个现有的RDD,定义一个新的RDD
    • 比如map、flatMap、filter
  • action(动作)
    • 动作用于用于返回一个结果
    • 比如collect、reduce、count、save、lookupKey

关于RDD的创建方式,请点击这里查看。下面的是JAVA word count的例子:

package cn.timd.spark;

import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.api.java.function.FlatMapFunction;  
import org.apache.spark.api.java.function.Function2;  
import org.apache.spark.api.java.function.PairFunction;  
import scala.Tuple2;

import java.util.Arrays;  
import java.util.Iterator;

public class WordCount {  
    public static final String master = "spark://192.168.100.106:8765";

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("spark java word count demo");
        // conf.setMaster(master);
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 从hdfs文件创建RDD
        JavaRDD<String> lines = sc.textFile("hdfs://192.168.100.103:9001/input", 3);

        /*
        flatMap转换:
        first line              first
                                line
        second line  ===>       second
                                line
        third line              third
                                line
         */
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String line) {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });

        words.coalesce(3);

        /*
        mapToPair转换:
        this                 (this, 1)
        is            ===>   (is,   1)
        line                 (line, 1)
         */
        JavaPairRDD<String, Integer> pair = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        /*
        reduceByKey转换:
        (word1, 1)
        (word1, 1)                (word1, 3)
        (word1, 1)        ====>
        (word2, 1)                (word2, 2)
        (word2, 1)
         */
        JavaPairRDD<String, Integer> wordCountPair = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        // 保存到hdfs文件
        wordCountPair.saveAsTextFile("hdfs://192.168.100.103:9001/output");
        sc.close();
    }
}

在spark中,RDD的转换都是惰性求值的,RDD的转换会生成新的RDD,新的RDD会依赖原来的RDD的数据,并且每个RDD有多个分区。那么实际上,一个应用程序就构造了 由多个相互依赖的RDD组成的有向无环图(DAG)。并通过在RDD上执行一个动作,将这个有向无环图作为一个Job提交给Spark执行。
在spark driver中,RDD首先会交给DAGScheduler,DAGScheduler根据子逻辑的依赖关系,进行stage(阶段)的划分,然后底层的TaskScheduler和executor进行交互,让它们在各自的线程池中运行task,executor中运行的就是我们编写的业务逻辑代码
RDD的依赖分为窄依赖窄依赖是指父RDD的分区最多只能被一个子RDD的分区所引用,即一个父RDD的分区对应一个子RDD的分区,或者多个父RDD的分区对应一个子RDD的分区)和宽依赖宽依赖是指子RDD的分区依赖于父RDD的多个分区或所有分区,即存在一个父RDD的一个分区对应一个子RDD的多个分区)。DAGScheduler会根据宽窄依赖划分stage,根据分区划分task。
1.jpg
图中一个box代表一个RDD,一个带阴影的矩形框代表一个partition。

spark提供了两种方式,来支持故障的容错:

  • Linage
    • 通过数据的血缘关系,再执行一遍前面的处理
    • RDD自身是一个不可变的数据集,但是它能够记住构建它的操作图,因此当执行任务的worker失败时,可以进行重新计算
  • Checkpoint
    • 将数据持久到持久化存储中
    • 在某些场景下,spark也需要利用记录WAL日志的方式,来支持容错。比如,在spark streaming中,针对数据执行update操作,或者调用streaming提供的window操作时,如果发生故障,就需要恢复执行过程中的中间状态。此时,需要利用spark的checkpoint机制,来恢复数据
    • 更多关于checkpoint的细节,可以查看如下两篇文档(强烈建议在阅读本文的同时,也阅读这两篇文档):

RDD有两方面的特性:

  • partitioning:
    • 在创建RDD的时候,可以指定最小分区数。(比如:JavaRDD<String> lines = sc.textFile("hdfs://192.168.100.103:9001/input", 3);
    • 也可以通过repatition或coalesce给RDD重新划分分区,关于这两个方法,请参考这篇文档
  • persistence
    • 对RDD执行transformation操作,会生成一个新的RDD。为了减少中间RDD对资源的消耗,spark采用延迟计算的方式,只有操作累积到action,才会触发整个操作序列的执行,这样就无需为中间RDD分配内存,而是在同一数据块上进行流水线操作。也就说有些中间RDD是没有相应的内存或磁盘数据与之对应的,为了复用这种RDD,就需要对这些数据进行cache或persist
    • RDD通过cache和persist方法来缓存rdd。persist提供很多存储级别,支持将数据存储到不同的介质上。cache方法只是简单的使用memory这个存储级别调用persist,也就说cache方法是将数据保存到内存的

参考文档

感谢浏览tim chow的作品!

如果您喜欢,可以分享到: 更多

如果您有任何疑问或想要与tim chow进行交流

可点此给tim chow发信

如有问题,也可在下面留言: