Spark Data
Spark中的RDD(Spark 1.X中常用)
基本概念
J理解为RDD以分区的形式分布在集群中多个机器上,每个分区代表了数据集的一个子集,Spark框架并行处理多个分区,一个分区内的数据对象则是顺序处理。
RDD是(Resilient Distributed Dataset)弹性分布式数据集,每个RDD分为多个分区partition,这些分区运行在集群中的不同节点上,而每个分区中以行为单位,每一行由元素组成。RDD有两类操作,一种是转化transform成为新的RDD,一种是行动action计算出一个结果。
rdd-分区表示子集partition(不同节点上)-行为单位,每行都是元素
pair RDD is an RDD where each element is a pair tuple (k, v) where k is the key and v is the value。
RDD 分区的一个分配原则是:尽可能使得分区的个数,等于集群cores数目。这里的core是虚拟的core而不是机器的物理CPU核,可以理解为就是Executor的一个工作线程,J即一个executor有多个cores。
RDD有两类操作:转化(transformation生成一个新的RDD)和行动(action计算出一个结果)。
操作
RDD的持久化,J无需计算重新计算整个rdd转换
通过persist()或cache()方法可以标记一个要被持久化的RDD,一旦首次被触发,该RDD将会被保留在计算节点的内存中并重用。 例子:假设首先进行了RDD0→RDD1→RDD2的计算作业,那么计算结束时,RDD1就已经缓存在系统中了。在进行RDD0→RDD1→RDD3的计算作业时,由于RDD1已经缓存在系统中,因此RDD0→RDD1的转换不会重复进行,计算作业只须进行RDD1→RDD3的计算就可以了,因此计算速度可以得到很大提升。
broadcast 变量, 只读的共享变量,每个节点上都有一个拷贝。val v = broadcastVar.value //这么取值
Spark中的dataset(Spark 2.X中常用)
统一使用:SparkSession
使用Spark时经常出现的一个问题是:“好了,我们有了SparkContext、SQLContext和HiveContext。我该使用哪一个,而不是其他几个”Spark 2.0引入了一个新的SparkSession对象,因而减少了混乱,为使用Spark来计算提供了一致的入口点。
使用API尽量使用DataSet ,不行再选用DataFrame,其次选择RDD。
RDD[Person]以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而DataFrame可以进一步提供详细的结构信息,使得SparkSQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。 Jdataframe在2.0中更改只有指定了类型后,变成dataset后,才知道类型。
J我明白了,这里的dataset和dataframe的最重要区别确实就是不知道类型而已。从而各自的api的功能也不一样,涉及到字段类型的,就是dataset,而不涉及到字段类型的就是dataframe。但其实dataframe也可以用dataset的api,只是在构建阶段可能查不出问题,有一定风险性。为了便捷,我就使用dataframe,但是查dataset的api来使用
J重要区别:所以利用sparksql返回dataframe是因为每一行的类型和字段都是相同的。而dataset适应更广阔的舞台,比如说一行是row,另一行不是row。
就是说其实dataframe有字段,我们可以看到
|query| dt|search_count|plat| +-----+----------+------------+----+ | 幸福路上|2017-11-16| 68| ios| | 幸福路上|2017-11-15| 95| ios| | 幸福路上|2017-11-19| 40| ios| | 幸福路上|2017-11-17| 32| ios| | 幸福路上|2017-11-20| 2580| ios| | 幸福路上|2017-11-18| 27| ios|
但是如果是运算的话,则需要使用告诉下面的代码程序这个里面的具体类型,否则会编译有问题。比如指定dataset的api的话,就要先转成dataset才能用。
创建Dataframe,从spark sql
创建dataset:toDS()或as
There is also a new as[U](implicit arg0: Encoder[U]): Dataset[U]
which is used to convert a DataFrame
to a DataSet
of a given type. For example:
J注意as中采用的定义方法,就是类似函数中的输入参数。
Spark Dataframe的常用操作
文件中创建dataframe(先成为RDD,然后定义类,最后配合toDF进行转换)
var与loop搭配,对同名dataframe进行更新
常用操作符
select;
filter;
withColumn;(注意不能直接对dataframe中某一个column进行修改,只能用withcolumn和udf生成新的一列)
withColumnRenamed;
groupBy...agg;
使用说明
如果不涉及运算的,则使用""表示column;如果涉及运算的,则使用$""表示column,因此filter常用$""。
实例
多个运算符可以用逗号去引导换行。
配合udf操作
格式为val myfunc = udf{(a:String) => 函数体}
注意当column内部是arraytype
和structtype
的情况时,则需要对于udf
的参数不能直接用udf{(uc: Seq[(Long, Boolean)])
。
因为如下可知,arraytype
和structtype
对应的是Seq
和Row
。
数据类型
Scala中的值类型
访问或者创建数据类型的API
ByteType
Byte
ByteType
ShortType
Short
ShortType
IntegerType
Int
IntegerType
LongType
Long
LongType
FloatType
Float
FloatType
DoubleType
Double
DoubleType
DecimalType
scala.math.BigDecimal
DecimalType
StringType
String
StringType
BinaryType
Array[Byte]
BinaryType
BooleanType
Boolean
BooleanType
TimestampType
java.sql.Timestamp
TimestampType
DateType
java.sql.Date
DateType
ArrayType
scala.collection.Seq
ArrayType(elementType, [containsNull]) 注意containsNull默认为true
MapType
scala.collection.Map
MapType(keyType, valueType, [valueContainsNull]) 注意valueContainsNull默认为true
StructType
org.apache.spark.sql.Row
StructType(fields) ,注意fields是一个StructField序列,相同名字的两个StructField不被允许
StructField
The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType)
StructField(name, dataType, nullable)
改用如下函数
常用函数
explode
把列中数据切开,并进行延展。先变成列表List,然后直接使用explode。


window function
窗口函数可以对传统的groupBy分的组,进一步进行操作。
注意window function常会引起Failed to get broadcast
,解决方法具体见debug章节。
withcolumn
将stucttype
类型拆开。
Reference
Last updated
Was this helpful?