PySpark - RDD

  • 简述

    现在我们已经在我们的系统上安装和配置了 PySpark,我们可以在 Apache Spark 上使用 Python 进行编程。不过在此之前,让我们先了解一下 Spark 中的一个基本概念——RDD。
    RDD代表Resilient Distributed Dataset,这些是在多个节点上运行和操作以在集群上进行并行处理的元素。RDD 是不可变的元素,这意味着一旦创建了 RDD,就无法更改它。RDD 也是容错的,因此在发生任何故障时,它们会自动恢复。您可以对这些 RDD 应用多个操作来完成某个任务。
    要对这些 RDD 应用操作,有两种方法 -
    • 变形
    • 动作
    让我们详细了解这两种方式。
    变形− 这些是应用于 RDD 以创建新 RDD 的操作。Filter、groupBy 和 map 是转换的例子。
    动作− 这些是应用于 RDD 的操作,它指示 Spark 执行计算并将结果发送回驱动程序。
    要在 PySpark 中应用任何操作,我们需要创建一个PySpark RDD第一的。以下代码块包含 PySpark RDD 类的详细信息 -
    
    class pyspark.RDD (
       jrdd, 
       ctx, 
       jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
    )
    
    让我们看看如何使用 PySpark 运行一些基本操作。Python 文件中的以下代码创建 RDD 单词,其中存储了一组提到的单词。
    
    words = sc.parallelize (
       ["scala", 
       "java", 
       "hadoop", 
       "spark", 
       "akka",
       "spark vs hadoop", 
       "pyspark",
       "pyspark and spark"]
    )
    
    我们现在将对单词进行一些操作。
  • count()

    返回 RDD 中的元素数。
    
    ----------------------------------------count.py---------------------------------------
    from pyspark import SparkContext
    sc = SparkContext("local", "count app")
    words = sc.parallelize (
       ["scala", 
       "java", 
       "hadoop", 
       "spark", 
       "akka",
       "spark vs hadoop", 
       "pyspark",
       "pyspark and spark"]
    )
    counts = words.count()
    print "Number of elements in RDD -> %i" % (counts)
    ----------------------------------------count.py---------------------------------------
    
    命令- count() 的命令是 -
    
    $SPARK_HOME/bin/spark-submit count.py
    
    输出- 上述命令的输出是 -
    
    Number of elements in RDD → 8
    
  • collect()

    返回 RDD 中的所有元素。
    
    ----------------------------------------collect.py---------------------------------------
    from pyspark import SparkContext
    sc = SparkContext("local", "Collect app")
    words = sc.parallelize (
       ["scala", 
       "java", 
       "hadoop", 
       "spark", 
       "akka",
       "spark vs hadoop", 
       "pyspark",
       "pyspark and spark"]
    )
    coll = words.collect()
    print "Elements in RDD -> %s" % (coll)
    ----------------------------------------collect.py---------------------------------------
    
    命令- collect() 的命令是 -
    
    $SPARK_HOME/bin/spark-submit collect.py
    
    输出- 上述命令的输出是 -
    
    Elements in RDD -> [
       'scala', 
       'java', 
       'hadoop', 
       'spark', 
       'akka', 
       'spark vs hadoop', 
       'pyspark', 
       'pyspark and spark'
    ]
    
  • foreach(f)

    仅返回那些满足 foreach 内部函数条件的元素。在下面的例子中,我们在 foreach 中调用了一个 print 函数,它打印了 RDD 中的所有元素。
    
    ----------------------------------------foreach.py---------------------------------------
    from pyspark import SparkContext
    sc = SparkContext("local", "ForEach app")
    words = sc.parallelize (
       ["scala", 
       "java", 
       "hadoop", 
       "spark", 
       "akka",
       "spark vs hadoop", 
       "pyspark",
       "pyspark and spark"]
    )
    def f(x): print(x)
    fore = words.foreach(f) 
    ----------------------------------------foreach.py---------------------------------------
    
    命令- foreach(f) 的命令是 -
    
    $SPARK_HOME/bin/spark-submit foreach.py
    
    输出- 上述命令的输出是 -
    
    scala
    java
    hadoop
    spark
    akka
    spark vs hadoop
    pyspark
    pyspark and spark
    
  • filter(f)

    返回一个包含元素的新 RDD,它满足过滤器内部的功能。在下面的示例中,我们过滤掉包含“spark”的字符串。
    
    ----------------------------------------filter.py---------------------------------------
    from pyspark import SparkContext
    sc = SparkContext("local", "Filter app")
    words = sc.parallelize (
       ["scala", 
       "java", 
       "hadoop", 
       "spark", 
       "akka",
       "spark vs hadoop", 
       "pyspark",
       "pyspark and spark"]
    )
    words_filter = words.filter(lambda x: 'spark' in x)
    filtered = words_filter.collect()
    print "Fitered RDD -> %s" % (filtered)
    ----------------------------------------filter.py----------------------------------------
    
    命令- filter(f) 的命令是 -
    
    $SPARK_HOME/bin/spark-submit filter.py
    
    输出- 上述命令的输出是 -
    
    Fitered RDD -> [
       'spark', 
       'spark vs hadoop', 
       'pyspark', 
       'pyspark and spark'
    ]
    
  • map(f,preservesPartitioning = False)

    通过对 RDD 中的每个元素应用一个函数来返回一个新的 RDD。在下面的示例中,我们形成一个键值对并将每个字符串映射为值为 1。
    ----------------------------------------map.py---------------------------------------
    from pyspark import SparkContext
    sc = SparkContext("local", "Map app")
    words = sc.parallelize (
       ["scala", 
       "java", 
       "hadoop", 
       "spark", 
       "akka",
       "spark vs hadoop", 
       "pyspark",
       "pyspark and spark"]
    )
    words_map = words.map(lambda x: (x, 1))
    mapping = words_map.collect()
    print "Key value pair -> %s" % (mapping)
    ----------------------------------------map.py---------------------------------------
    
    命令- map(f, preservesPartitioning=False) 的命令是 -
    
    $SPARK_HOME/bin/spark-submit map.py
    
    输出- 上述命令的输出是 -
    
    Key value pair -> [
       ('scala', 1), 
       ('java', 1), 
       ('hadoop', 1), 
       ('spark', 1), 
       ('akka', 1), 
       ('spark vs hadoop', 1), 
       ('pyspark', 1), 
       ('pyspark and spark', 1)
    ]
    
  • reduce(f)

    执行指定的交换和关联二元运算后,返回RDD中的元素。在下面的示例中,我们从运算符中导入 add 包并将其应用于“num”以执行简单的加法操作。
    ----------------------------------------reduce.py---------------------------------------
    from pyspark import SparkContext
    from operator import add
    sc = SparkContext("local", "Reduce app")
    nums = sc.parallelize([1, 2, 3, 4, 5])
    adding = nums.reduce(add)
    print "Adding all the elements -> %i" % (adding)
    ----------------------------------------reduce.py---------------------------------------
    
    命令- reduce(f) 的命令是 -
    
    $SPARK_HOME/bin/spark-submit reduce.py
    
    输出- 上述命令的输出是 -
    
    Adding all the elements -> 15
    
  • join(other, numPartitions = None)

    它返回带有匹配键的一对元素以及该特定键的所有值的 RDD。在以下示例中,两个不同的 RDD 中有两对元素。加入这两个 RDD 后,我们得到一个 RDD,其中的元素具有匹配的键及其值。
    ----------------------------------------join.py---------------------------------------
    from pyspark import SparkContext
    sc = SparkContext("local", "Join app")
    x = sc.parallelize([("spark", 1), ("hadoop", 4)])
    y = sc.parallelize([("spark", 2), ("hadoop", 5)])
    joined = x.join(y)
    final = joined.collect()
    print "Join RDD -> %s" % (final)
    ----------------------------------------join.py---------------------------------------
    
    命令- join(other, numPartitions = None) 的命令是 -
    
    $SPARK_HOME/bin/spark-submit join.py
    
    输出- 上述命令的输出是 -
    
    Join RDD -> [
       ('spark', (1, 2)),  
       ('hadoop', (4, 5))
    ]
    
  • collect()

    使用默认存储级别 (MEMORY_ONLY) 持久化此 RDD。您还可以检查 RDD 是否已缓存。
    ----------------------------------------cache.py---------------------------------------
    from pyspark import SparkContext 
    sc = SparkContext("local", "Cache app") 
    words = sc.parallelize (
       ["scala", 
       "java", 
       "hadoop", 
       "spark", 
       "akka",
       "spark vs hadoop", 
       "pyspark",
       "pyspark and spark"]
    ) 
    words.cache() 
    caching = words.persist().is_cached 
    print "Words got chached > %s" % (caching)
    ----------------------------------------cache.py---------------------------------------
    
    命令- cache() 的命令是 -
    
    $SPARK_HOME/bin/spark-submit cache.py
    
    输出- 上述程序的输出是 -
    
    Words got cached -> True
    
    这些是在 PySpark RDD 上完成的一些最重要的操作。