Spark Develop
本地调试
object SparkUBM {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.4") //设计hadoop home在windows下配置,https://github.com/srccodes/hadoop-common-2.2.0-bin/tree/master/bin中下载,下载winutils.exe文件复制到自己的E:\hadoop-2.6.4\bin目录里面
val spark = SparkSession
.builder()
.master("local") //否则报错Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
.appName("UBM")
.getOrCreate()
import spark.implicits._
val data = spark.read.textFile("E:\\sparkdatatest.txt")
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.music.search</groupId>
<artifactId>UBM</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.major.version>2.11</scala.major.version>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.0</spark.version>
<hadoop.version>2.7.1</hadoop.version>
</properties>
<dependencies>
<dependency> <!-- Hadoop dependency -->
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.major.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.major.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.major.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.major.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
Hive连接
具体hive有很多配置,公司中为我们部署好了环境,节省了时间。 J这段代码比较重要,是官方的,见后文代码。
import java.io.File
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Burst detection for day search count")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
代码
import scala.math.pow
import java.io.File
import java.text.SimpleDateFormat
import java.util.Calendar
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType //to solve the problem:Value toDF is not a member
import org.apache.spark.sql.SparkSession
case class TempRow(label: Int, dt: String)
object Burst {
def main(args: Array[String]):Unit = {
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Burst detection for day search count")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
spark.conf.set("spark.sql.shuffle.partitions", 2001)
//set the date and interval
val date_end = args(0)
val period = 29
val moving_average_length = 7
//establish the date table
val date_period = getDaysPeriod(date_end, period)
val date_start = date_period.takeRight(1)(0)
var date_list_buffer = new ListBuffer[TempRow]()
for (dt <- date_period){
date_list_buffer += TempRow(1, dt)
}
val date_list = date_list_buffer.toList
val df_date = date_list.toDF
df_date.createOrReplaceTempView("date_table")
//extract the original data
val sql_original=s"""select 1 as label, query, dt, sum(search_count) as cnt from (
select inputstring as query, dt, count(is_valid) as search_count, 'ard' as plat
from ddl.dt_search_ard_d
where dt >= '$date_start' and dt <= '$date_end'
and inputtype in ('1','2','3','4','6','7','8')
group by inputstring, dt
union all
select keyword as query, dt, count(valid) as search_count, 'pc' as plat
from ddl.dt_search_pc_d
where dt >= '$date_start' and dt <= '$date_end'
and inputtype in ('1','2','3','4','5','6','8')
group by keyword, dt
union all
select inputstring as query, dt, count(is_valid) as search_count, 'ios' as plat
from ddl.dt_search_ios_d
where dt >= '$date_start' and dt <= '$date_end'
and inputtype in ('1','2','3','4','6','7','8')
group by inputstring, dt
)triple_count
group by query, dt"""
val df_original = spark.sql(sql_original)
df_original.createOrReplaceTempView("keyword_30_sum")
//change leftouterjoin to rightouterjoin
val sql_thirty ="select query, a.dt, (case when a.dt=b.dt then cnt else 0 end) as count from date_table a RIGHT OUTER JOIN keyword_30_sum b on a.label=b.label"
val df_thirty = spark.sql(sql_thirty)
//to eliminate the extra data
val period_search = df_thirty.groupBy("query","dt").agg(max("count").as("cnt"))
//use window function
val weights = getWeight(moving_average_length)
val index = List.range(0,moving_average_length)
// val window_ma = Window.partitionBy("query").orderBy("query", "dt").rowsBetween(-6, 0)
val window_ma = Window.partitionBy("query").orderBy(asc("dt"))
//val period_ma = period_search.withColumn("movingAvg", avg(period_search("cnt")).over(window_ma))
val period_ma = period_search.withColumn("weightedmovingAvg", weighted_average(index, weights, window_ma, period_search("cnt"))).withColumn("simplemovingAvg", avg(period_search("cnt")).over(window_ma.rowsBetween(-6, 0)))
// set filter the last n rows
val window_filter = Window.partitionBy("query").orderBy(asc("dt"))
val period_filter = period_ma.withColumn("filter_tag", row_number.over(window_filter)).filter($"filter_tag" >= moving_average_length)
//calculate the amp score
// val period_cal = period_ma.groupBy("query").agg((last("simplemovingAvg")-(mean("simplemovingAvg") + lit(1.5)*stddev_samp("simplemovingAvg"))).as("simpleamp"), (last("weightedmovingAvg")-(mean("weightedmovingAvg") + lit(1.5)*stddev_samp("weightedmovingAvg"))).as("weightedamp"))
val period_cal = period_filter.groupBy("query").agg((first("simplemovingAvg")-(mean("simplemovingAvg") + lit(1.5)*stddev_samp("simplemovingAvg"))).as("simpleamp"), (last("weightedmovingAvg")-(mean("weightedmovingAvg") + lit(1.5)*stddev_samp("weightedmovingAvg"))).as("weightedamp"))
//format the data to save
// val df_save = period_cal.filter($"amp"> lit(100)).withColumn("ampTmp", $"amp".cast(IntegerType)).drop("amp").withColumnRenamed("ampTmp", "amp")
val df_save = period_cal.filter($"simpleamp" > lit(50) || $"weightedamp" > lit(50)).withColumn("sampTmp", $"simpleamp".cast(IntegerType)).drop("simpleamp").withColumnRenamed("sampTmp", "simpleamp").withColumn("wampTmp", $"weightedamp".cast(IntegerType)).drop("weightedamp").withColumnRenamed("wampTmp", "weightedamp")
df_save.createOrReplaceTempView("savetable")
//save the data
val hive_table = args(1)
val sql_save = s"INSERT OVERWRITE TABLE $hive_table PARTITION(dt='$date_end') select * from savetable"
//without it, it will return Dynamic partition strict mode requires at least one static partition column
spark.sql(sql_save)
//it doesn't matter:ERROR KeyProviderCache: Could not find uri with key
spark.stop() //to avoid ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerExecutorMetricsUpdate
}
// get the period days under date_start and interval in List[String] format
def getDaysPeriod(dt: String, interval: Int): List[String] = {
var period = new ListBuffer[String]() //initialize the return List period
period += dt
val cal: Calendar = Calendar.getInstance() //reset the date in Calendar
cal.set(dt.split("-")(0).toInt, dt.split("-")(1).toInt - 1, dt.split("-")(2).toInt)
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") //format the output date
for (i <- 0 to interval - 1){
cal.add(Calendar.DATE, - 1)
period += dateFormat.format(cal.getTime())
}
period.toList
}
def getWeight(length: Int): List[Double]= {
var sum = 0.0
for (i <- 0 to length-1 ){
sum += pow(0.5, i)
}
// val weights = for (i <- 0 to length-1 ) yield pow(0.5, i)/sum // it will return scala.collection.immutable.IndexedSeq
val weights = for (i <- List.range(0, length) ) yield pow(0.5, i)/sum
// var weights_buffer = new ListBuffer[Double]()
// for (i <- 0 to length-1 ){
// weights_buffer += pow(0.5, i)/sum
// }
// val weights = weights_buffer.toList
weights
}
def weighted_average(index: List[Int], weights: List[Double], w: WindowSpec, c: Column): Column= {
val wma_list = for (i <- index) yield (lag(c, i).over(w))*weights(i) // list comprehension, map also can do some easy thing, return scala.collection.immutable.IndexedSeq
wma_list.reduceLeft(_ + _)
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.music.search</groupId>
<artifactId>burst_detection</artifactId>
<version>1.0</version>
<properties>
<scala.major.version>2.11</scala.major.version>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.0</spark.version>
<hadoop.version>2.7.1</hadoop.version>
</properties>
<dependencies>
<dependency> <!-- Hadoop dependency -->
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.major.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.major.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.major.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.major.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
Last updated
Was this helpful?