Spark Data

Spark中的RDD(Spark 1.X中常用)

基本概念

J理解为RDD以分区的形式分布在集群中多个机器上,每个分区代表了数据集的一个子集,Spark框架并行处理多个分区,一个分区内的数据对象则是顺序处理。

RDD是(Resilient Distributed Dataset)弹性分布式数据集,每个RDD分为多个分区partition,这些分区运行在集群中的不同节点上,而每个分区中以行为单位,每一行由元素组成。RDD有两类操作,一种是转化transform成为新的RDD,一种是行动action计算出一个结果。

rdd-分区表示子集partition(不同节点上)-行为单位,每行都是元素

pair RDD is an RDD where each element is a pair tuple (k, v) where k is the key and v is the value。

RDD 分区的一个分配原则是:尽可能使得分区的个数,等于集群cores数目。这里的core是虚拟的core而不是机器的物理CPU核,可以理解为就是Executor的一个工作线程,J即一个executor有多个cores。

RDD有两类操作:转化(transformation生成一个新的RDD)和行动(action计算出一个结果)。

操作

RDD的持久化,J无需计算重新计算整个rdd转换

通过persist()或cache()方法可以标记一个要被持久化的RDD,一旦首次被触发,该RDD将会被保留在计算节点的内存中并重用。 例子:假设首先进行了RDD0→RDD1→RDD2的计算作业,那么计算结束时,RDD1就已经缓存在系统中了。在进行RDD0→RDD1→RDD3的计算作业时,由于RDD1已经缓存在系统中,因此RDD0→RDD1的转换不会重复进行,计算作业只须进行RDD1→RDD3的计算就可以了,因此计算速度可以得到很大提升。

distFile.persist()//持久化

broadcast 变量, 只读的共享变量,每个节点上都有一个拷贝。val v = broadcastVar.value //这么取值

val broadcastVar = sc.broadcast("string test")//broadcast variable is readonly
val v = broadcastVar.value //这么取值

Spark中的dataset(Spark 2.X中常用)

统一使用:SparkSession

使用Spark时经常出现的一个问题是:“好了,我们有了SparkContext、SQLContext和HiveContext。我该使用哪一个,而不是其他几个”Spark 2.0引入了一个新的SparkSession对象,因而减少了混乱,为使用Spark来计算提供了一致的入口点。

使用API尽量使用DataSet ,不行再选用DataFrame,其次选择RDD。

RDD[Person]以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而DataFrame可以进一步提供详细的结构信息,使得SparkSQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。 Jdataframe在2.0中更改只有指定了类型后,变成dataset后,才知道类型。

J我明白了,这里的dataset和dataframe的最重要区别确实就是不知道类型而已。从而各自的api的功能也不一样,涉及到字段类型的,就是dataset,而不涉及到字段类型的就是dataframe。但其实dataframe也可以用dataset的api,只是在构建阶段可能查不出问题,有一定风险性。为了便捷,我就使用dataframe,但是查dataset的api来使用

J重要区别:所以利用sparksql返回dataframe是因为每一行的类型和字段都是相同的。而dataset适应更广阔的舞台,比如说一行是row,另一行不是row。

就是说其实dataframe有字段,我们可以看到

|query| dt|search_count|plat| +-----+----------+------------+----+ | 幸福路上|2017-11-16| 68| ios| | 幸福路上|2017-11-15| 95| ios| | 幸福路上|2017-11-19| 40| ios| | 幸福路上|2017-11-17| 32| ios| | 幸福路上|2017-11-20| 2580| ios| | 幸福路上|2017-11-18| 27| ios|

但是如果是运算的话,则需要使用告诉下面的代码程序这个里面的具体类型,否则会编译有问题。比如指定dataset的api的话,就要先转成dataset才能用。

创建Dataframe,从spark sql

//即SparkSession的方法sql允许application在代码中运行SQL语句,并得到Dataset<Row>类型的返回值。
// Register the DataFrame as a SQL temporary view 必须注册为临时表才能供下面的查询使用
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.createOrReplaceTempView("people") //之后可以被当作people表被spark.sql使用
sqlDF.show()

创建dataset:toDS()或as

There is also a new as[U](implicit arg0: Encoder[U]): Dataset[U]which is used to convert a DataFrame to a DataSet of a given type. For example:

df.as[Person]
df.select($"name", $"age".cast("int")).as[(String, Int)].collect.toMap

J注意as中采用的定义方法,就是类似函数中的输入参数。

val gamma = df_gamma.select(col("rd"), col("gamma")).as[((Int, Int), Double)].collect.toMap

Spark Dataframe的常用操作

文件中创建dataframe(先成为RDD,然后定义类,最后配合toDF进行转换)

case class Session(query: Long, item_click_list: Seq[(Long, Boolean)]) //case class help define row, toDF
val data = spark.read.textFile("E:\\sparkdata.txt")
val df_sessions_raw = data.map(line => line.split(" ")).map(array =>
  Session(
    array(0).toLong,
    Seq((array(1).toLong, array(11).toBoolean),
      (array(2).toLong, array(12).toBoolean),
      (array(3).toLong, array(13).toBoolean),
      (array(4).toLong, array(14).toBoolean),
      (array(5).toLong, array(15).toBoolean),
      (array(6).toLong, array(16).toBoolean),
      (array(7).toLong, array(17).toBoolean),
      (array(8).toLong, array(18).toBoolean),
      (array(9).toLong, array(19).toBoolean),
      (array(10).toLong, array(20).toBoolean)
    )
  )
).toDF

var与loop搭配,对同名dataframe进行更新

var dataFrame:DataFrame = null
for (jsonfilename <- fileArray) {
   val eachDataFrame = hivecontext.read.json(jsonfilename)
   if(dataFrame == null)
      dataFrame = eachDataFrame
   else
      dataFrame = eachDataFrame.unionAll(dataFrame)
}

常用操作符

select;

filter;

withColumn;(注意不能直接对dataframe中某一个column进行修改,只能用withcolumn和udf生成新的一列)

withColumnRenamed;

groupBy...agg;

使用说明

如果不涉及运算的,则使用""表示column;如果涉及运算的,则使用$""表示column,因此filter常用$""。

实例

多个运算符可以用逗号去引导换行。

df_remark.filter($"type" === "4" || $"type" =!= "5")
         .order("result".desc)
df_remark_language.filter(not($"remark".contains("@BI_ROW_SPLIT@")))
df_remark_music.withColumn("notion", regexp_extract($"remark", "《(.*)》", 1))
               .withColumn("category", myfunc(regexp_replace($"remark", "《.*》", "")))
df_sn_sep.groupBy("singer").agg(sum($"hot") as "result")

配合udf操作

格式为val myfunc = udf{(a:String) => 函数体}

val alias = Map("^G.E.M.邓紫棋$|^G.E.M.邓紫棋(?=、)|(?<=、)G.E.M.邓紫棋(?=、)|(?<=、)G.E.M.邓紫棋$" -> "邓紫棋")
val broadcasted_alias = sc.broadcast(alias)
//没有使用 broadcast的话,每次task都要传一下,浪费内网带宽
val myalias = udf{(a:String) => 
  if(a != null & a != ""){
    var result = ""
    var str = a.trim
    for ((k,v) <- broadcasted_alias.value){
      val pattern = Pattern.compile(k)
      val matcher = pattern.matcher(str)
      if (matcher.find()){
        str = str.substring(0, matcher.start()) + v + str.substring(matcher.end(), str.length)
        result = str
      }
    }
    result
  }
  else {""}
}

注意当column内部是arraytypestructtype的情况时,则需要对于udf的参数不能直接用udf{(uc: Seq[(Long, Boolean)])

println (df_sessions_raw.schema)
df_sessions_raw.printSchema
//从这个StructField往下看:(query,LongType,true),考虑用什么类型来代替
StructType(StructField(query,LongType,true), StructField(item_click_list,ArrayType(StructType(StructField(_1,LongType,true), StructField(_2,BooleanType,true)),true),true))

root
 |-- query: long (nullable = true)
 |-- item_click_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: long (nullable = true)
 |    |    |-- _2: boolean (nullable = true)

因为如下可知,arraytypestructtype对应的是SeqRow

数据类型

Scala中的值类型

访问或者创建数据类型的API

ByteType

Byte

ByteType

ShortType

Short

ShortType

IntegerType

Int

IntegerType

LongType

Long

LongType

FloatType

Float

FloatType

DoubleType

Double

DoubleType

DecimalType

scala.math.BigDecimal

DecimalType

StringType

String

StringType

BinaryType

Array[Byte]

BinaryType

BooleanType

Boolean

BooleanType

TimestampType

java.sql.Timestamp

TimestampType

DateType

java.sql.Date

DateType

ArrayType

scala.collection.Seq

ArrayType(elementType, [containsNull]) 注意containsNull默认为true

MapType

scala.collection.Map

MapType(keyType, valueType, [valueContainsNull]) 注意valueContainsNull默认为true

StructType

org.apache.spark.sql.Row

StructType(fields) ,注意fields是一个StructField序列,相同名字的两个StructField不被允许

StructField

The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType)

StructField(name, dataType, nullable)

改用如下函数

val click2distance = udf{(uc: Seq[Row]) =>
var pre_click = -1
uc.zipWithIndex.map{case (Row(u:Long,c:Boolean), r) =>
  val d = r - pre_click - 1
  if (c) pre_click = r
  (u, c, r, d)
}
}

常用函数

explode

把列中数据切开,并进行延展。先变成列表List,然后直接使用explode。

df_remark_music.filter($"remark".contains("@BI_ROW_SPLIT@"))
               .withColumn("remark_new", explode(split($"remark", "@BI_ROW_SPLIT@")))

window function

窗口函数可以对传统的groupBy分的组,进一步进行操作。

import org.apache.spark.sql.expressions.{Window, WindowSpec}
val window_cover_song = Window.partitionBy("cover_song")
val window_song = Window.partitionBy("song")
val df= df_remark_ref_final.withColumn("cover_value", sum($"cover_hot").over(window_cover_song))
                           .withColumn("value", max($"hot").over(window_song))

注意window function常会引起Failed to get broadcast,解决方法具体见debug章节。

withcolumn

stucttype类型拆开。

    val df_sessions = df_sessions_distance.select($"query", explode($"click2distance").alias("group"))
      .withColumn("reorder", reorder($"query", $"group"))
      .groupBy($"reorder").agg(count("reorder").alias("cnt"))
      .withColumn("q", $"reorder._1")
      .withColumn("u", $"reorder._2")
      .withColumn("r", $"reorder._3")
      .withColumn("d", $"reorder._4")
      .withColumn("c", $"reorder._5")
      .select("q", "u", "r", "d", "c", "cnt")
+--------------------+---+
|reorder             |cnt|
+--------------------+---+
|[317,2909,7,4,false]|1  |

StructType(StructField(reorder,StructType(StructField(_1,LongType,false), StructField(_2,LongType,false), StructField(_3,IntegerType,false), StructField(_4,IntegerType,false), StructField(_5,BooleanType,false)),true), StructField(cnt,LongType,false))

root
 |-- reorder: struct (nullable = true)
 |    |-- _1: long (nullable = false)
 |    |-- _2: long (nullable = false)
 |    |-- _3: integer (nullable = false)
 |    |-- _4: integer (nullable = false)
 |    |-- _5: boolean (nullable = false)
 |-- cnt: long (nullable = false)

Reference

Last updated

Was this helpful?