Spark 核心编程

  • Spark 核心编程

    Spark Core是整个项目的基础。它提供了分布式任务分配,调度和基本的I/O功能。Spark使用称为RDD(弹性分布式数据集)的专用基础数据结构,该结构是跨机器分区的逻辑数据集合。RDD可以通过两种方式创建:一种是通过引用外部存储系统中的数据集,第二种是通过对现有RDD进行转换(例如,映射,过滤器,化简,联接)。
    RDD抽象是通过语言集成的API公开的。这简化了编程的复杂性,因为应用程序处理RDD的方式类似于处理本地数据集合。
  • Spark Shell

    Spark提供了一个交互式Shell - 一个强大的工具,可以交互式地分析数据。它支持ScalaPython语言。Spark的主要抽象是称为弹性分布数据集(RDD)的项目的分布式集合。可以从Hadoop输入格式(例如HDFS文件)或通过转换其他RDD创建RDD。
    打开spark shell
    以下命令用于打开Spark Shell。
    
    $ spark-shell
    
    创建简单的RDD
    让我们从文本文件创建一个简单的RDD。使用以下命令创建一个简单的RDD。
    
    scala> val inputfile = sc.textFile(“input.txt”)
    
    上面命令的输出是
    
    inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
    
    Spark RDD API引入了一些2转换和一些的操作来操纵RDD。
  • RDD 转换

    RDD转换返回指向新RDD的指针,并允许您在RDD之间创建依赖关系。依赖关系链中的每个RDD(依赖关系的字符串)都具有计算其数据的功能,并具有指向其父RDD的指针(依赖关系)。
    Spark是懒惰的,因此除非您调用将触发作业创建和执行的某些转换或操作,否则将不会执行任何操作。请看下面的单词计数示例片段。
    因此,RDD转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉Spark如何获取数据以及如何处理数据。
    下面给出了 RDD 转换的列表。
    转换 意义
    map(func) 返回一个新的分布式数据集,该数据集是通过将源的每个元素传递给函数func形成的。
    filter(func) 返回一个新的数据集,该数据集是通过选择源中func返回true的那些元素形成的。
    flatMap(func) 与map相似,但是每个输入项都可以映射到0个或多个输出项(因此func应该返回Seq而不是单个项)。
    mapPartitions(func) 与map相似,但是分别在RDD的每个分区(块)上运行,因此func在类型T的RDD上运行时必须为Iterator <T>⇒Iterator <U>类型。
    mapPartitionsWithIndex(func) 与map Partitions类似,但它还为func提供表示分区索引的整数值,因此当在类型T的RDD上运行时,func的类型必须为(Int,Iterator <T>)⇒Iterator <U>。
    sample(withReplacement, fraction, seed) 使用给定的随机数生成器种子,对一部分数据进行抽样,无论是否进行替换。
    union(otherDataset) 返回一个新的数据集,其中包含源数据集中的元素与参数的并集。
    intersection(otherDataset) 返回一个新的RDD,其中包含源数据集中的元素和参数的交集。
    distinct([numTasks]) 返回一个新的数据集,其中包含源数据集的不同元素。
    groupByKey([numTasks]) 在(K,V)对的数据集上调用时,返回(K,Iterable <V>)对的数据集。注–如果要分组以便对每个键执行聚合(例如求和或平均值),则使用reduceByKey或AggregateByKey将产生更好的性能。
    reduceByKey(func, [numTasks]) 在(K,V)对的数据集上调用时,返回(K,V)对的数据集,其中每个键的值使用给定的reduce函数func进行汇总,该函数必须为(V,V)⇒V类型与groupByKey中一样,reduce任务的数量可以通过可选的第二个参数进行配置。
    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 在(K,V)对的数据集上调用时,返回(K,U)对的数据集,其中每个键的值使用给定的Combine函数和中性的“零”值进行汇总。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。像groupByKey中一样,reduce任务的数量可以通过可选的第二个参数进行配置。
    sortByKey([ascending], [numTasks]) 在由K实现Ordered的(K,V)对的数据集上调用时,返回(K,V)对的数据集,按布尔值升序参数指定按键升序或降序排序。
    join(otherDataset, [numTasks]) 在(K,V)和(K,W)类型的数据集上调用时,返回(K,(V,W))对的数据集,其中每个键都有所有成对的元素。通过leftOuterJoin,rightOuterJoin和fullOuterJoin支持外部联接。
    cogroup(otherDataset, [numTasks]) 在(K,V)和(K,W)类型的数据集上调用时,返回(K,(Iterable <V>,Iterable <W>))元组的数据集。此操作也称为分组方式。
    cartesian(otherDataset) 在类型T和U的数据集上调用时,返回(T,U)对(所有元素对)的数据集。
    pipe(command, [envVars]) 通过shell命令通过管道传输RDD的每个分区Perl或bash脚本。将RDD元素写入进程的stdin,并将输出到其stdout的行作为字符串的RDD返回。
    coalesce(numPartitions) 将RDD中的分区数减少到numPartitions。筛选大型数据集后,对于更有效地运行操作很有用。
    repartition(numPartitions) 随机地重新随机排列RDD中的数据以创建更多或更少的分区,并在整个分区之间保持平衡。这始终会拖曳网络上的所有数据。
    repartitionAndSortWithinPartitions(partitioner) 根据给定的分区程序对RDD进行重新分区,并在每个结果分区中,按其键对记录进行排序。这比调用重新分区然后在每个分区内进行排序更有效,因为它可以将排序向下推入洗牌机制。
  • 动作

    下表列出了返回值的动作列​​表。
    动作 意义
    reduce(func) 使用函数func(使用两个参数并返回一个参数)聚合数据集的元素。 该函数应该是可交换的和关联的,以便可以并行正确地计算它。
    collect() 在驱动程序中将数据集的所有元素作为数组返回。 这通常在返回足够小的数据子集的过滤器或其他操作之后很有用。
    count() 返回数据集中的元素数。
    first() 返回数据集的第一个元素(类似于take(1))。
    take(n) 返回具有数据集前n个元素的数组。
    takeSample (withReplacement,num, [seed]) 返回一个数组,该数组包含数据集的num个元素的随机样本,有或没有替换,可以选择预先指定一个随机数生成器种子。
    takeOrdered(n, [ordering]) 使用自然顺序或自定义比较器返回RDD的前n个元素。
    saveAsTextFile(path) 将数据集的元素作为文本文件(或文本文件集)写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定目录中。 Spark在每个元素上调用toString将其转换为文件中的一行文本。
    saveAsSequenceFile(path) (Java and Scala) 将数据集的元素作为Hadoop SequenceFile写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定路径中。这在实现Hadoop的Writable接口的键/值对的RDD上可用。在Scala中,它也可用于隐式转换为Writable的类型(Spark包括对基本类型(如Int,Double,String等)的转换。
    saveAsObjectFile(path) (Java and Scala) 使用Java序列化以简单格式写入数据集的元素,然后可以使用SparkContext.objectFile()进行加载。
    countByKey() 仅在类型(K,V)的RDD上可用。返回(K,Int)对的哈希图以及每个键的计数。
    foreach(func) 在数据集的每个元素上运行函数func。通常这样做是出于副作用,例如更新累加器或与外部存储系统交互。注意-在foreach()之外修改除Accumulators以外的变量可能会导致未定义的行为。有关更多详细信息,请参见了解闭包。
  • 用RDD编程

    让我们借助示例来了解RDD编程中一些RDD转换和动作的实现。
    - 考虑一个单词计数示例-它计算文档中出现的每个单词。将以下文本视为输入,并将其另存为主目录中的input.txt文件。
    input.txt-输入文件。
    
    people are not as beautiful as they look, 
    as they walk or as they talk.
    they are only as beautiful  as they love, 
    as they care as they share.
    
    请按照下面给出的步骤执行给定的示例。
    打开Spark-Shell
    以下命令用于打开Spark-Shell。通常,使用Scala构建spark。因此,Spark程序在Scala环境中运行。
    
    $ spark-shell
    
    如果Spark Shell成功打开,则将找到以下输出。在开始程序的第一步之前,应创建SparkContext对象。
    
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    Spark context Web UI available at http://localhost:4040
    Spark context available as 'sc' (master = local[*], app id = local-1608794330204).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 3.0.0
          /_/
             
    Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_262)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala>
    
    创建一个RDD
    首先,我们必须使用Spark-Scala API读取输入文件并创建一个RDD。
    以下命令用于从给定位置读取文件。在这里,使用输入文件的名称创建新的RDD。在textFile("") 方法中作为参数给出的String是输入文件名的绝对路径。但是,如果仅给出文件名,则表示输入文件位于当前位置。
    
    scala> val inputfile = sc.textFile("input.txt")
    
    执行字数转换
    我们的目的是计算文件中的单词数。创建一个平面地图,将每行分割成多个单词(flatMap(line⇒line.split(" "))。
    接下来,使用映射函数(map(word⇒(word,1)),将每个单词作为键读取,值为'1'(<key,value> = <word,1>)。
    最后,通过添加相似键的值(reduceByKey(_ + _))来减少这些键。
    以下命令用于执行字数逻辑。执行此操作后,您将找不到任何输出,因为这不是操作,而是转换。指向新的RDD或告知Spark如何处理给定的数据)
    
    scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
    
    当前RDD
    在使用RDD时,如果您想了解当前的RDD,请使用以下命令。它将向您显示有关当前RDD及其调试依赖关系的描述。
    
    scala> counts.toDebugString
    
    缓存转换
    您可以使用其上的persist()cache()方法将RDD标记为要持久。第一次在操作中对其进行计算时,它将被保存在节点上的内存中。使用以下命令将中间转换存储在内存中。
    
    scala> counts.cache()
    
    应用动作
    应用动作(如存储所有转换)将结果生成文本文件。saveAsTextFile("") 方法的String参数是输出文件夹的绝对路径。尝试使用以下命令将输出保存在文本文件中。在以下示例中,“输出”文件夹位于当前位置。
    
    scala> counts.saveAsTextFile("output")
    
    检查输出
    打开另一个终端以转到主目录(在另一个终端中执行spark)。使用以下命令检查输出目录。
    
    [hadoop@localhost ~]$ cd output/ 
    [hadoop@localhost output]$ ls -1 
    
    
    part-00000 
    part-00001 
    _SUCCESS
    
    以下命令用于查看Part-00000文件的输出。
    
    [hadoop@localhost output]$ cat part-00000
    
    输出
    
    (people,1) 
    (are,2) 
    (not,1) 
    (as,8) 
    (beautiful,2) 
    (they, 7) 
    (look,1) 
    
    以下命令用于查看Part-00001文件的输出。
    
    [hadoop@localhost output]$ cat part-00001 
    
    输出
    
    (walk, 1) 
    (or, 1) 
    (talk, 1) 
    (only, 1) 
    (love, 1) 
    (care, 1) 
    (share, 1) 
    
  • 取消持久存储

    取消永久保留之前,如果要查看用于该应用程序的存储空间,请在浏览器中使用以下URL。
    
    http://localhost:4040
    
    您将看到以下屏幕,其中显示了用于应用程序的存储空间,这些存储空间正在Spark Shell上运行。
    如果要取消永久保留特定RDD的存储空间,请使用以下命令。
    
    Scala> counts.unpersist()
    
    您将看到如下输出:
    
    2020/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
    2020/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
    2020/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
    2020/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
    2020/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
    2020/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
    res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
    
    要验证浏览器中的存储空间,请使用以下URL。
    
    http://localhost:4040/
    
    您将看到以下屏幕。它显示了用于应用程序的存储空间,这些存储空间在Spark Shell上运行。