1. <dd id="jcnig"><nav id="jcnig"><delect id="jcnig"></delect></nav></dd>
      <dfn id="jcnig"><tt id="jcnig"><sup id="jcnig"></sup></tt></dfn>

    2. <cite id="jcnig"></cite>
      <cite id="jcnig"><tt id="jcnig"></tt></cite>

      <address id="jcnig"><nav id="jcnig"></nav></address>
    3. <address id="jcnig"><nav id="jcnig"></nav></address>
      400-650-7353

      精品課程

      您所在的位置:首頁 > IT干貨資料 > 大數據 > 【大數據基礎知識】Spark常用算子(二)

      【大數據基礎知識】Spark常用算子(二)

      • 發布: 大數據培訓
      • 來源:大數據干貨資料
      • 2021-07-28 10:07:22
      • 閱讀()
      • 分享
      • 手機端入口

      1. mapValues

      mapValues算子 ,作用于 [K,V] 格式的RDD上,并且只對V(Value)進行操作,Key值保持不變。

      (1)將[K,V] 格式的List轉換為[K,V] 格式的RDD。

      scala> val rdd = sc.makeRDD(List(("Tom",100),("Mike",80)))

      rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at makeRDD at :24

      (2)使用mapValues算子,將value值乘以100,key值保持不變

      scala> val rdd2=rdd.mapValues(_*100)

      rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at mapValues at :26

      (3)使用collect算子回收,查看結果

      scala> rdd2.collect

      res0: Array[(String, Int)] = Array((Tom,10000), (Mike,8000))

      2. mapPartitions

      作用于RDD上的每一個分區,傳遞的函數相當于一個迭代器,有幾個分區,就會迭代幾次。

      object Test1 {

      def main(args: Array[String]): Unit = {

      val conf=new SparkConf()

      .setMaster("local[*]")

      .setAppName(this.getClass.getSimpleName)

      val sc=new SparkContext(conf)

      val rdd=sc.makeRDD(List(1,2,3,4,5,6),3);

      val values: RDD[Int] = rdd.mapPartitions(t => {

      t.map(_ * 10)

      })

      //打印輸出結果

      values.foreach(println)

      }

      }

      使用上面的代碼進行測試。輸出結果如下:

      可以看到,因為設置了3個分區,所以相應啟動了3個任務,在每個分區上進行迭代計算。

      3. filter

      filter算子過濾出所有的滿足條件的元素。

      另外fliter算子不會改變分區的數量,所以經過過濾后,即使某些分區沒有數據了,但是分區依然存在的。

      scala> val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),3)

      rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at :24

      scala> val rdd2 = rdd1.filter(_>3)

      rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at filter at :26

      scala> rdd2.partitions.size

      res3: Int = 3

      4. sortBy

      sortBy算子按照指定條件進行排序。

      我們使用下面的代碼進行測試:

      object Test2 {

      def main(args: Array[String]): Unit = {

      val conf=new SparkConf()

      .setMaster("local[*]")

      .setAppName(this.getClass.getSimpleName)

      val sc=new SparkContext(conf)

      val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Tom", 80), ("Mike", 90), ("Mary", 85),("John",60)))

      //按value值升序排列

      val res1: RDD[(String, Int)] = rdd.sortBy(_._2)

      res1.collect.foreach(println)

      // 按value值降序排列

      val res2: RDD[(String, Int)] = rdd.sortBy(_._2, false)

      res2.collect.foreach(println)

      }

      }

      升序輸出的結果如下:

      降序輸出的結果如下:

      有一點需要說明的是,輸出結果前,要使用collect算子把結果回收到本地。因為數據是分散在集群各節點的,如果不回收,看到的結果可能是不正確的。

      學習疑問申請解答
      您的姓名
      您的電話
      意向課程
       

      中公優就業

      IT小助手

      掃描上方二維碼添加好友,請備注:599,享學習福利。

      >>本文地址:
      注:本站稿件未經許可不得轉載,轉載請保留出處及源文件地址。

      推薦閱讀

      優就業:ujiuye

      關注中公優就業官方微信

      • 關注微信回復關鍵詞“大禮包”,享學習福利
      QQ交流群
      在線疑問解答
      (加群備注“網站”)
      IT培訓交流群 加入群聊 +
      軟件測試 加入群聊 +
      全鏈路UI/UE設計 加入群聊 +
      Python+人工智能 加入群聊 +
      互聯網營銷 加入群聊 +
      Java開發 加入群聊 +
      PHP開發 加入群聊 +
      VR/AR游戲開發 加入群聊 +
      大前端 加入群聊 +
      大數據 加入群聊 +
      Linux云計算 加入群聊 +
      優就業官方微信
      掃碼回復關鍵詞“大禮包”
      享學習福利

      測一測
      你適合學哪門IT技術?

      1 您的年齡

      2 您的學歷

      3 您更想做哪個方向的工作?

      獲取測試結果
       
      課程資料、活動優惠 領取通道
       
       
      日本一本二本三本av网站,一本加勒比HEZYO东京热高清,一本久久A久久精品综合