Spark

  • 与hive执行sql一样,只是用spark作为引擎,其中spark的必调参数spark执行前准备都没有改动。

  • 注意spark sql无法执行多段语句,可用hive执行,而spark sql只执行需要加速的sql。

  • 敏感字用XXXXX代替。

#!/bin/bash
##********************************************************************#
##
## 日期支持运算,通过以下方式:
## ${DATA_DATE offset field formatter},
## DATE_DATE:*固定值,为当前作业的业务时间
## offet:*必填,当前的日期偏移量,根据field判定偏移字段,取值为数值可正可负
## field:*必填,偏移的字段,取值可以为:day,month,year,minute,hour,week,second
## formatter:选填,计算偏移后的日期格式,如:yyyy-MM-dd HH:mm:ss
## 如:${DATA_DATE -1 day 'yyyy-MM-dd HH:mm'}
##********************************************************************#
vDay=${DATA_DATE}

sql="
insert overwrite table temp.dm_search_keyword_result_d partition(dt='${vDay}',pt='android')
select 
    keyword,
    result_num             ---计算搜索关键字对应的单曲搜索结果数
from 
(
    select 
        keyword,
        result_num,
        row_number() over(partition by keyword order by ren desc) as rank
    from 
    (
        select keyword, result_num, count(distinct uuid) as ren
        from 
        (
            select 
                kw as keyword,
                ivar5 as result_num,
                uuid
            from ddl.dt_list_ard_d
            where dt='${vDay}'
                    and action='search'
                    and b rlike '^搜索页-'
                    and ivar5 is not null
                    and ivar5 <>'null'
                    and ivar5 <>''
                    and ivar5 <>'-1'
                    and ivar5 <>'-2'
                    and kw is not null and kw<>''
            group by kw, ivar5, uuid

            union all
            select 
                kw as keyword,
                split(ivar5, ',')[1] as result_num,
                uuid
            from ddl.dt_list_ard_d
            where dt='${vDay}'
                    and action in ('search', 'click', 'trash')
                    and b rlike '^搜索综合页-'
                    and ivar5 is not null
                    and ivar5 <>'null'
                    and ivar5 <>''
                    and ivar5 <>'-1'
                    and ivar5 <>'-2'
                    and kw is not null and kw<>''
            group by kw, split(ivar5, ',')[1], uuid
        )a
        group by keyword, result_num
    )b
) b 
where rank=1
"

###### spark的必调参数 ##########
app_queue=${q};
app_name=test_spark
executor_cores=4
executor_memory=4G
initialExecutors=10
maxExecutors=100
minExecutors=10
driver_memory=2G
partitions=50
autoBroadcastJoinThreshold=10485760
memoryOverhead=2048
broadcastTimeout=2000

###### spark执行前准备(不需要修改) ##########
export JAVA_HOME=/usr/local/jdk1.7.0_51
export SPARK_HOME=/data1/app/spark-2.1.0-bin-hadoop2.7
export PATH=${JAVA_HOME}/bin:${SPARK_HOME}/bin:${PATH}
executeEnv="spark-submit   \
--master yarn \
--deploy-mode cluster  \
--executor-cores ${executor_cores} \
--executor-memory ${executor_memory} \
--driver-memory  ${driver_memory} \
--conf spark.sql.shuffle.partitions=${partitions} \
--conf spark.dynamicAllocation.initialExecutors=${initialExecutors} \
--conf spark.dynamicAllocation.minExecutors=${minExecutors} \
--conf spark.dynamicAllocation.maxExecutors=${maxExecutors} \
--conf spark.yarn.executor.memoryOverhead=${memoryOverhead} \
--conf spark.sql.broadcastTimeout=${broadcastTimeout} \
--conf spark.sql.autoBroadcastJoinThreshold=${autoBroadcastJoinThreshold} \
--conf spark.locality.wait=0 \
--conf spark.shuffle.io.maxRetries=5 \
--conf spark.shuffle.file.buffer=128k \
--conf spark.shuffle.io.retryWait=30s \
--conf spark.executor.heartbeatInterval=60s \
--conf spark.network.timeout=1200s \
--conf spark.files.fetchTimeout=600s \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.executorIdleTimeout=30s \
--conf spark.dynamicAllocation.schedulerBacklogTimeout=3s \
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=30s \
--conf spark.yarn.archive=hdfs://kgdc/data1/sparkJars \
--conf spark.yarn.dist.files=hdfs://kgdc/data1/sparkJars/hive-site.xml \
--conf spark.kryoserializer.buffer.max=256m \
--conf spark.kryoserializer.buffer=64m \
--queue ${app_queue} \
--name  ${app_name} \
--class net.XXXXX.sparkenv.ExecuteSql \
/data1/app/sparkJars/original-sparkenv.jar"
${executeEnv} "${sql}"
if [ $? -ne 0 ]; then exit 1; fi

Last updated