Spark核心数据结构弹性分布式数据集

北京市治疗白癜风最好的医院 https://jbk.39.net/yiyuanzaixian/bjzkbdfyy/
大家好,我是勾叔。今天和大家分享Spark核心数据结构:弹性分布式数据集RDD。内容分为两块:RDD的核心概念以及实践环节如何创建RDD。RDD的核心概念RDD是Spark最核心的数据结构,RDD(ResilientDistributedDataset)全称为弹性分布式数据集,是Spark对数据的核心抽象,也是最关键的抽象,它实质上是一组分布式的JVM不可变对象集合,不可变决定了它是只读的,所以RDD在经过变换产生新的RDD时,(如下图中A-B),原有RDD不会改变。弹性主要表现在两个方面:

在面对出错情况(例如任意一台节点宕机)时,Spark能通过RDD之间的依赖关系恢复任意出错的RDD(如B和D可以算出最后的RDD),RDD就像一块海绵一样,无论怎么挤压,都像海绵一样完整;

在经过转换算子处理时,RDD中的分区数以及分区所在的位置随时都有可能改变。

每个RDD都有如下几个成员:

分区的集合;

用来基于分区进行计算的函数(算子);

依赖(与其他RDD)的集合;

对于键-值型的RDD的散列分区器(可选);

对于用来计算出每个分区的地址集合(可选,如HDFS上的块存储的地址)。

如下图所示,RDD_0根据HDFS上的块地址生成,块地址集合是RDD_0的成员变量,RDD_1由RDD_0与转换(transform)函数(算子)转换而成,该算子其实是RDD_0内部成员。从这个角度上来说,RDD_1依赖于RDD_0,这种依赖关系集合也作为RDD_1的成员变量而保存。在Spark源码中,RDD是一个抽象类,根据具体的情况有不同的实现,比如RDD_0可以是MapPartitionRDD,而RDD_1由于产生了Shuffle,则是ShuffledRDD。下面我们来看一下RDD的源码:

//表示RDD之间的依赖关系的成员变量

transientprivatevardeps:Seq[Dependency[_]]//分区器成员变量

transientvalpartitioner:Option[Partitioner]=None//该RDD所引用的分区集合成员变量

transientprivatevarpartitions_:Array[Partition]=null//得到该RDD与其他RDD之间的依赖关系protecteddefgetDependencies:Seq[Dependency[_]]=deps//得到该RDD所引用的分区protecteddefgetPartitions:Array[Partition]//得到每个分区地址protecteddefgetPreferredLocations(split:Partition):Seq[String]=Nil//distinct算子defdistinct(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]=withScope{map(x=(x,null)).reduceByKey((x,y)=x,numPartitions).map(_._1)}其中,你需要特别注意这一行代码:transientprivatevarpartitions_:Array[Partition]=null它说明了一个重要的问题,RDD是分区的集合,本质上还是一个集合,所以在理解时,你可以用分区之类的概念去理解,但是在使用时,就可以忘记这些,把其当做是一个普通的集合。实践环节:创建RDD

Spark编程是一件不难的工作,而事实也确实如此。现在我们可以通过已有的SparkSession直接创建RDD。创建RDD的方式有以下几类:通过并行集合创建RDD;从HDFS中加载数据创建RDD;从linux本地文件系统加载数据创建RDD。

了解了RDD的创建方式,接下来,我们逐个进行演示介绍:

通过并行集合创建RDD这种RDD纯粹是为了学习,将内存中的集合变量转换为RDD,没太大实际意义。

//valspark:SparkSession=.......valrdd=spark.sparkcontext.parallelize(Seq(1,2,3))从HDFS中加载数据创建RDD这种生成RDD的方式是非常常用的:

//valspark:SparkSession=.......valrdd=spark.sparkcontext.textFile("hdfs://namenode:/user/me/wiki.txt")从HDFS中加载数据创建RDDSpark从MySQL中读取数据返回的RDD类型是JdbcRDD,顾名思义,是基于JDBC读取数据的,这点与Sqoop是相似的,但不同的是JdbcRDD必须手动指定数据的上下界,也就是以MySQL表某一列的最值作为切分分区的依据。

//valspark:SparkSession=.......vallowerBound=1valupperBound=valnumPartition=10valrdd=newJdbcRDD(spark.sparkcontext,()={Class.forName("


转载请注明:http://www.92nongye.com/xxmb/xxmb/204623348.html