Spark
#!/bin/bash
##********************************************************************#
##
## 日期支持运算,通过以下方式:
## ${DATA_DATE offset field formatter},
## DATE_DATE:*固定值,为当前作业的业务时间
## offet:*必填,当前的日期偏移量,根据field判定偏移字段,取值为数值可正可负
## field:*必填,偏移的字段,取值可以为:day,month,year,minute,hour,week,second
## formatter:选填,计算偏移后的日期格式,如:yyyy-MM-dd HH:mm:ss
## 如:${DATA_DATE -1 day 'yyyy-MM-dd HH:mm'}
##********************************************************************#
vDay=${DATA_DATE}
sql="
insert overwrite table temp.dm_search_keyword_result_d partition(dt='${vDay}',pt='android')
select
keyword,
result_num ---计算搜索关键字对应的单曲搜索结果数
from
(
select
keyword,
result_num,
row_number() over(partition by keyword order by ren desc) as rank
from
(
select keyword, result_num, count(distinct uuid) as ren
from
(
select
kw as keyword,
ivar5 as result_num,
uuid
from ddl.dt_list_ard_d
where dt='${vDay}'
and action='search'
and b rlike '^搜索页-'
and ivar5 is not null
and ivar5 <>'null'
and ivar5 <>''
and ivar5 <>'-1'
and ivar5 <>'-2'
and kw is not null and kw<>''
group by kw, ivar5, uuid
union all
select
kw as keyword,
split(ivar5, ',')[1] as result_num,
uuid
from ddl.dt_list_ard_d
where dt='${vDay}'
and action in ('search', 'click', 'trash')
and b rlike '^搜索综合页-'
and ivar5 is not null
and ivar5 <>'null'
and ivar5 <>''
and ivar5 <>'-1'
and ivar5 <>'-2'
and kw is not null and kw<>''
group by kw, split(ivar5, ',')[1], uuid
)a
group by keyword, result_num
)b
) b
where rank=1
"
###### spark的必调参数 ##########
app_queue=${q};
app_name=test_spark
executor_cores=4
executor_memory=4G
initialExecutors=10
maxExecutors=100
minExecutors=10
driver_memory=2G
partitions=50
autoBroadcastJoinThreshold=10485760
memoryOverhead=2048
broadcastTimeout=2000
###### spark执行前准备(不需要修改) ##########
export JAVA_HOME=/usr/local/jdk1.7.0_51
export SPARK_HOME=/data1/app/spark-2.1.0-bin-hadoop2.7
export PATH=${JAVA_HOME}/bin:${SPARK_HOME}/bin:${PATH}
executeEnv="spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-cores ${executor_cores} \
--executor-memory ${executor_memory} \
--driver-memory ${driver_memory} \
--conf spark.sql.shuffle.partitions=${partitions} \
--conf spark.dynamicAllocation.initialExecutors=${initialExecutors} \
--conf spark.dynamicAllocation.minExecutors=${minExecutors} \
--conf spark.dynamicAllocation.maxExecutors=${maxExecutors} \
--conf spark.yarn.executor.memoryOverhead=${memoryOverhead} \
--conf spark.sql.broadcastTimeout=${broadcastTimeout} \
--conf spark.sql.autoBroadcastJoinThreshold=${autoBroadcastJoinThreshold} \
--conf spark.locality.wait=0 \
--conf spark.shuffle.io.maxRetries=5 \
--conf spark.shuffle.file.buffer=128k \
--conf spark.shuffle.io.retryWait=30s \
--conf spark.executor.heartbeatInterval=60s \
--conf spark.network.timeout=1200s \
--conf spark.files.fetchTimeout=600s \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.executorIdleTimeout=30s \
--conf spark.dynamicAllocation.schedulerBacklogTimeout=3s \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=30s \
--conf spark.yarn.archive=hdfs://kgdc/data1/sparkJars \
--conf spark.yarn.dist.files=hdfs://kgdc/data1/sparkJars/hive-site.xml \
--conf spark.kryoserializer.buffer.max=256m \
--conf spark.kryoserializer.buffer=64m \
--queue ${app_queue} \
--name ${app_name} \
--class net.XXXXX.sparkenv.ExecuteSql \
/data1/app/sparkJars/original-sparkenv.jar"
${executeEnv} "${sql}"
if [ $? -ne 0 ]; then exit 1; fiLast updated