val period_search = spark.sql("select query, a.dt, (case when a.dt=b.dt then cnt else 0 end) as count from date_table a LEFT OUTER JOIN keyword_30_sum b on a.label=b.label")val period_search2 = spark.sql("select query, dt, max(count) as cnt from (select query, a.dt, (case when a.dt=b.dt then cnt else 0 end) as count from date_table a LEFT OUTER JOIN keyword_30_sum b on a.label=b.label) group by query, dt ")
17/03/28 10:56:19 ERROR LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
17/03/28 10:56:19 WARN LiveListenerBus: Dropped 1 SparkListenerEvents since Thu Jan 01 08:00:00 CST 1970
##ConsoleProgressBar What you get is a Console Progress Bar, [Stage 7: shows the stage you are in now, and (14174 + 5) / 62500] is (numCompletedTasks + numActiveTasks) / totalNumOfTasksInThisStage]. The progress bar shows numCompletedTasks / totalNumOfTasksInThisStage.
####设置patitions的大小为2001 原因:Rule of thumb is around 128 MB per partition 但是: If you're running out of memory on the shuffle, try setting spark.sql.shuffle.partitions to 2001. Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000:
trick:
找到代码中卡住的位置,采用在代码中重新设置分区:
val df_status = (df_group_pair.
filter($"count" >= 1).//avoid data skew //filter(!($"prev_word".contains($"next_word"))).
as("d1").join(df_menu_read.as("d2"), $"d1.prev_word" === $"d2.keyword", "left").
select($"d1.*", $"d2.name_entity" as "prev_name_entity", $"d2.similar_entity" as "prev_similar_entity", $"d2.unknown_entity" as "prev_unknown_entity", $"d2.menu_contain" as "pre_menu_contain").
as("d3").
join(df_menu_read.as("d4"), $"d3.next_word" === $"d4.keyword", "left").
select($"d3.*", $"d4.result" as "next_result", $"d4.menu_contain" as "menu_contain", $"d4.name_entity", $"d4.similar_entity", $"d4.unknown_entity").
filter($"next_result".isNotNull && $"pre_menu_contain"=!=true && $"menu_contain"=!=true).//filter tag
filter($"prev_name_entity".isNotNull && $"prev_similar_entity".isNotNull && $"prev_unknown_entity".isNotNull).// null means not exists
withColumn("status", check_status($"prev_name_entity", $"prev_similar_entity", $"prev_unknown_entity", $"name_entity", $"similar_entity", $"unknown_entity")).
filter($"status" === true).
distinct.//cause df_menu_read's keyword duplicate
sort($"count".desc).
repartition(numPartitions=2001))
println("max of df_status is :" + df_status.rdd.glom().map(_.length).collect().max.toString)
shuffle read的时候数据的分区数则是由spark提供的一些参数控制。可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即使不会导致JVM crash也会造成长时间的gc。
From the answer here, spark.sql.shuffle.partitions configures the number of partitions that are used when shuffling data for joins or aggregations.
spark.default.parallelism is the default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set explicitly by the user. Note that spark.default.parallelism seems to only be working for raw RDD and is ignored when working with dataframes.
This message means that for some reason the garbage collector is taking an excessive amount of time (by default 98% of all CPU time of the process) and recovers very little memory in each run (by default 2% of the heap).
This effectively means that your program stops doing any progress and is busy running only the garbage collection at all time.
To prevent your application from soaking up CPU time without getting anything done, the JVM throws this Error so that you have a chance of diagnosing the problem. 解决方法增大driver内存:--driver-memory 4G
##Failed to get broadcast
[Stage 7:(860 + 88) / 7339][Stage 10:(3 + 24) / 2001][Stage 11:>(0 + 0) / 2001]18/03/22 12:04:28 WARN TaskSetManager: Lost task 6.0 in stage 10.0 (TID 24987, kg-dn-109, executor 13): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_19_piece0 of broadcast_19
####Task not serializable 首先会报Task not serializable错误,这是因为window function返回的column对象是非序列化的,这就意味着都是存在各个executor中的内存里,如果需要进一步进行计算时(比如map时),系统就会提示未进行序列化,实际上未进行序列化也是可以的,只是系统必须这么提示而已,因为下面就有种方法告诉系统这个对象不需要序列化。
import org.apache.spark.sql.expressions.Window
val df = Seq(("foo", 1), ("bar", 2)).toDF("x", "y")
val w = Window.partitionBy("x").orderBy("y")
val lag_y = lag(col("y"), 1).over(w)
def f(x: Any) = x.toString
df.select(lag_y).map(f _).first //it will raise error:Task not serializable
@transient val w = Window.partitionBy("x").orderBy("y")
@transient val lag_y = lag(col("y"), 1).over(w)
df.select(lag_y).map(f _).first //it will succeed
####WARN TaskSetManager: Lost task;IOException: org.apache.spark.SparkException: Failed to get broadcast
Scala provides a @transient annotation for fields that should not be serialized at all. If you mark a field as @transient, then the frame- work should not save the field even when the surrounding object is serialized. When the object is loaded, the field will be restored to the default value for the type of the field annotated as @transient. 猜测@transient标记表明不被序列化后,就等于保存在各个executor的内存中。而由于大量操作导致内存被占用,从而导致丢失掉了被当做broadcast的值,从而无法进行join,报错。但我认为并非是broadcast,因为每个executor中的值是不一样的,与传统的broadcast的定义不一样。所以方法是先进行整体persist驻留在各个executor中。
Sparks DAGScheduler and it's lower level cluster manager implementation (Standalone, YARN or Mesos) will notice a task failed and will take care of rescheduling the said task as part of the overall stages executed.
DAGScheduler does three things in Spark (thorough explanations follow):
Computes an execution DAG, i.e. DAG of stages, for a job.
Determines the preferred locations to run each task on.
Handles failures due to shuffle output files being lost.
// val window_cover_song = Window.partitionBy("cover_song")
// val window_song = Window.partitionBy("song")
// val df_ref_diff = df_remark_ref_final.filter($"song" =!= $"cover_song")
// .withColumn("cover_value", sum($"cover_hot").over(window_cover_song))
// .withColumn("value", max($"hot").over(window_song)) //change avg to max, cause it will exists many-many relation between song and hot, some hots are so small, it will lower the sum value, eg:凉凉
// .withColumn("term",concat($"cover_song", lit(" 原唱")))
// .withColumn("penality", $"value"*penality)
// .groupBy("term").agg(max("penality") as "result")
// .select("term", "result")
// .withColumn("alias", lit(""))
// val df_ref_eql = df_remark_ref_final.filter($"song" === $"cover_song")
// .withColumn("cover_value", sum($"cover_hot").over(window_cover_song))
// .withColumn("value", max($"hot").over(window_song)) //change avg to max, cause it will exists many-many relation between song and hot, some hots are so small, it will lower the sum value, eg:凉凉
// .withColumn("term",concat($"cover_song", lit(" 原唱")))
// .withColumn("penality", when($"cover_value" > $"value", $"value"*penality as "result").otherwise($"value"*penality*penality as "result"))
// .groupBy("term").agg(max("penality") as "result")
// .select("term", "result")
// .withColumn("alias", lit(""))
@transient val window_cover_song = Window.partitionBy("cover_song")
@transient val window_song = Window.partitionBy("song")
val df_ref_diff_temp = df_remark_ref_final.filter($"song" =!= $"cover_song")
val df_ref_eql_temp = df_remark_ref_final.filter($"song" === $"cover_song")
@transient val diff_cover_value = sum(df_ref_diff_temp("cover_hot")).over(window_cover_song)
@transient val diff_value = max(df_ref_diff_temp("hot")).over(window_song)
//change avg to max, cause it will exists many-many relation between song and hot, some hots are so small, it will lower the sum value, eg:凉凉
@transient val eql_cover_value = sum(df_ref_eql_temp("cover_hot")).over(window_cover_song)
@transient val eql_value = max(df_ref_eql_temp("hot")).over(window_song)
val df_ref_diff = df_ref_diff_temp.withColumn("cover_value", diff_cover_value)
.withColumn("value", diff_value)
.withColumn("term",concat($"cover_song", lit(" 原唱")))
.withColumn("penality", $"value"*penality)
.groupBy("term").agg(max("penality") as "result")
.select("term", "result")
val df_ref_eql = df_ref_eql_temp.withColumn("cover_value", eql_cover_value)
.withColumn("value", eql_value)
.withColumn("term",concat($"cover_song", lit(" 原唱")))
.withColumn("penality", when($"cover_value" > $"value", $"value"*penality as "result").otherwise($"value"*penality*penality*penality as "result"))
.groupBy("term").agg(max("penality") as "result")
.select("term", "result")
val df_ref = df_ref_eql.union(df_ref_diff)
.groupBy("term").agg(max("result") as "result")
.withColumn("alias", lit(""))
.select("term", "alias","result")
df_ref.persist()
df_ref.count()
//df_term.persist()
//df_term.count()
val df_final = df_term.union(df_ref)
.groupBy("term", "alias").agg(max("result") as "result")
RDDs are divided into partitions. These partitions themselves act as an immutable subset of the entire RDD. When Spark executes each stage of the graph, each partition gets sent to a worker which operates on the subset of the data. In turn, each worker can cache the data if the RDD needs to be re-iterated.
Broadcast variables are used to send some immutable state once to each worker. You use them when you want a local copy of a variable.
2021-04-03 01:15:40 CONSOLE# Exception in thread "main" org.apache.spark.SparkException: Application application_1608206170538_5912246 finished with failed status
021-04-03 01:15:40 CONSOLE# diagnostics: Application application_1608206170538_5912246 failed 2 times due to AM Container for appattempt_1608206170538_5912246_000002 exited with exitCode: -104
2021-04-03 01:15:40 CONSOLE# For more detailed output, check application tracking page:http://kg-nn-2:8088/cluster/app/application_1608206170538_5912246Then, click on links to logs of each attempt.
2021-04-03 01:15:40 CONSOLE# Diagnostics: Container [pid=7236,containerID=container_1608206170538_5912246_02_000001] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 4.6 GB of 10.3 GB virtual memory used. Killing container.
网上认为会重新提交,因此不用查过往历史记录,只要通过即可。 I believe failed tasks are resubmitted because I have seen the same failed task submitted multiple times on the Web UI. However, if the same task fails multiple times, the full job fail:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 120 in stage 91.0 failed 4 times, most recent failure: Lost task 120.3 in stage 91.0
1.Running Spark version 2.4.5 SparkContext----->开始准备启动
2.INFO Utils: Successfully started service 'sparkDriver' on port 36991.----->启动Driver
3.Start server connector----->开始准备连接
4.Started SparkUI----->启动SparkwebUI
5.Added JAR file:/home/hadoop/app/spark/examples/jars/spark-examples_2.12-2.4.5.jar----->上传Jar包到Spark
6.Connecting to ResourceManager----->连接到Resourcemanager
7.Setting up container launch context for our AM-----> 为ApplicationMaster准备container
8.Setting up the launch environment for our AM container----->为ApplicationMaster设置container的运行环境
9.Preparing resources for our AM container----->为ApplicationMaster 准备资源
10.Uploading resource file:/tmp/xx/__spark_conf__14378.zip -> hdfs://xxx/__spark_conf__.zip----->上传Spark 的配置文件
11.Submitting application application_1604816619741_0001 to ResourceManager ----->提交任务到ResourceManager
12.Application report for application_1604816619741_0001 (state: ACCEPTED) ----->监控任务的运行状态
13.Application report for application_1604816619741_0001 (state: RUNNING)
14.Application application_1604816619741_0001 has started running.----->资源分配合适开始运行Spark任务
15.NettyBlockTransferService: Server created on bigdata01:44993----->创建Netty连接
16.INFO BlockManagerMaster: Registered BlockManager ----->注册Blockmanager
17.Registered executor NettyRpcEndpointRef----->注册executor NettyRpcEndpointRef
18.INFO SparkContext: Starting job----->启动任务
19.INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:38) with 2 output partitions-----> partition信息
20.INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 ----->提交missing的task fro ResultStage
21.INFO YarnScheduler: Adding task set 0.0 with 2 tasks
22.INFO TaskSetManager: Starting task 0.0 in stage 0.0 ----->启动Task
23.INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on bigdata01:44924
24.INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) ----->TaskS完成
25.INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 5.051 s
26.INFO DAGScheduler: Job 0 finished:----->job完成
27.Pi is roughly 3.1423357116785584 ----->得到PI的计算结果
28.INFO SparkUI: Stopped Spark web UI at http://bigdata01:4040 ----->停止SparkWebUI
29.INFO YarnClientSchedulerBackend: Shutting down all executors----->停止所有的executor
30.INFO YarnClientSchedulerBackend: Stopped
具体本地的log:
可以看到首先是Submitting application application_1620535124225_2671397 to ResourceManager,然后得到ApplicationMaster host后,显示Application application_1620535124225_2671397 has started running。
2021-06-28 12:51:33 CONSOLE# 21/06/28 12:51:33 INFO Client: Submitting application application_1620535124225_2671397 to ResourceManager
2021-06-28 12:51:33 CONSOLE# 21/06/28 12:51:33 INFO YarnClientImpl: Submitted application application_1620535124225_2671397
2021-06-28 12:51:33 CONSOLE# 21/06/28 12:51:33 INFO SchedulerExtensionServices: Starting Yarn extension services with app application_1620535124225_2671397 and attemptId None
2021-06-28 12:51:34 CONSOLE# 21/06/28 12:51:34 INFO Client:
2021-06-28 12:51:34 CONSOLE# client token: N/A
2021-06-28 12:51:34 CONSOLE# diagnostics: AM container is launched, waiting for AM container to Register with RM
2021-06-28 12:51:34 CONSOLE# ApplicationMaster host: N/A
2021-06-28 12:51:34 CONSOLE# ApplicationMaster RPC port: -1
2021-06-28 12:51:34 CONSOLE# queue: root.XXXXX
2021-06-28 12:51:34 CONSOLE# start time: 1624855893792
2021-06-28 12:51:34 CONSOLE# final status: UNDEFINED
2021-06-28 12:51:34 CONSOLE# tracking URL: http://10.5.132.8:5004/proxy/application_1620535124225_2671397/
2021-06-28 12:51:34 CONSOLE# user: jimmylian
2021-06-28 12:51:38 CONSOLE# 21/06/28 12:51:38 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> 10.5.XXX.XXX,10.5.XXX.XXX, PROXY_URI_BASES -> http://10.5.132.3:5004/proxy/application_1620535124225_2671397,http://10.5.XXX.XXX:5004/proxy/application_1620535124225_2671397, RM_HA_URLS -> 10.5.132.3:5004,10.5.132.8:5004), /proxy/application_1620535124225_2671397
2021-06-28 12:51:38 CONSOLE# 21/06/28 12:51:38 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
2021-06-28 12:51:38 CONSOLE# 21/06/28 12:51:38 INFO Client:
2021-06-28 12:51:38 CONSOLE# client token: N/A
2021-06-28 12:51:38 CONSOLE# diagnostics: N/A
2021-06-28 12:51:38 CONSOLE# ApplicationMaster host: 10.5.XXX.XXX
2021-06-28 12:51:38 CONSOLE# ApplicationMaster RPC port: -1
2021-06-28 12:51:38 CONSOLE# queue: root.baseDepSarchQueue
2021-06-28 12:51:38 CONSOLE# start time: 1624855893792
2021-06-28 12:51:38 CONSOLE# final status: UNDEFINED
2021-06-28 12:51:38 CONSOLE# tracking URL: http://10.5.132.8:5004/proxy/application_1620535124225_2671397/
2021-06-28 12:51:38 CONSOLE# user: XXXXXXX
2021-06-28 12:51:38 CONSOLE# 21/06/28 12:51:38 INFO YarnClientSchedulerBackend: Application application_1620535124225_2671397 has started running.