Spark配置

刘超 11天前 ⋅ 4784 阅读   编辑

  hadoop环境变量

环境变量 描述
SPARK_DAEMON_MEMORY 分配给历史服务器的内存(默认:1G)
SPARK_DAEMON_JAVA_OPTS 历史服务器JVM选项(默认:none)
SPARK_PUBLIC_DNS 设置历史服务器的公共地址,如果不设定,连接应用程序的历史记录会使用服务器内部地址,可能会导致连接失效(默认:none)
SPARK_HISTORY_OPTS 历史服务器的spark.history.*配置选项(默认:none)

 

分类 参数名 默认值 描述 说明
应用 spark.app.name (none) Spark 应用的名字。会在 SparkUI 和日志中出现。
spark.driver.cores 1 在 cluster 模式下,用几个 core 运行 driver 进程。
spark.driver.maxResultSize 1g Spark action 算子返回的结果集的最大数量。至少要 1M,可以设为 0 表示无限制。如果结果超过这一大小,Spark job 会直接中断退出。但是,设得过高有可能导致 driver 出现 out-of-memory 异常(取决于 spark.driver.memory 设置,以及驱动器 JVM 的内存限制)。设一个合理的值,以避免 driver 出现 out-of-memory 异常。
spark.driver.memory 1g driver进程可以使用的内存总量(如 : 1g, 2g)。注意,在 client 模式下,这个配置不能在 SparkConf 中直接设置,应为在那个时候 driver 进程的 JVM 已经启动了。因此需要在命令行里用 --driver-memory 选项 或者在默认属性配置文件里设置。
spark.executor.cores executor能使用的CPU核数
spark.executor.memory 1g 每个 executor 进程使用的内存总量(如,2g, 8g)。 Amount of memory to use per executor process (e.g. 2g, 8g).
spark.extraListeners (none) 逗号分隔的实现 SparkListener 接口的类名列表;初始化 SparkContext 时,这些类的实例会被创建出来,并且注册到 Spark 的监听器上。如果这些类有一个接受 SparkConf 作为唯一参数的构造函数,那么这个构造函数会被调用;否则,就调用无参构造函数。如果没有合适的构造函数,SparkContext 创建的时候会抛异常。
spark.local.dir /tmp Spark 的”草稿“目录,包括 map 输出的临时文件以及 RDD 存在磁盘上的数据。这个目录最好在本地文件系统中。这个配置可以接受一个以逗号分隔的多个挂载到不同磁盘上的目录列表。注意 : Spark-1.0 及以后版本中,这个属性会被 cluster manager 设置的环境变量覆盖 : SPARK_LOCAL_DIRS(Standalone,Mesos)或者 LOCAL_DIRS(YARN)。
spark.logConf false SparkContext 启动时是否把生效的 SparkConf 属性以 INFO 日志打印到日志里。
spark.master (none) 要连接的 cluster manager。参考 Cluster Manager 类型。
spark.submit.deployMode (none) Spark driver 程序的部署模式,可以是 "client" 或 "cluster",意味着部署 dirver 程序本地("client")或者远程("cluster")在 Spark 集群的其中一个节点上。
spark.log.callerContext (none) Application information that will be written into Yarn RM log/HDFS audit log when running on Yarn/HDFS. Its length depends on the Hadoop configuration hadoop.caller.context.max.size. It should be concise, and typically can have up to 50 characters.
spark.driver.supervise false If true, restarts the driver automatically if it fails with a non-zero exit status. Only has effect in Spark standalone mode or Mesos cluster deploy mode.
spark.yarn.priority
--priority
网上查找可以通过该参数指定优先级,但在2.0.2.2.5.6.0-40
spark.debug.maxToStringFields 25 读取文件的时候,由于字段数目较多,超过了默认的字段数目25,会提示警告,可以通过在spark-defaults.conf文件中添加一行处理
spark.speculation true
spark.yarn.executor.memoryOverhead
spark.yarn.driver.memoryOverhead
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.app.name
spark.storage.memoryMapThreshold
spark.cores.max CPU计算资源的数量,决定在Standalone和Mesos模式下,一个Spark应用程序所能申请的CPU Core的数量
需要注意的是:该参数对Yarn模式不起作用,YARN模式下,资源由Yarn统一调度管理
CPU资源的数量由另外两个直接配置Executor的数量和每个Executor中core数量的参数决定
   

spark.sql.shuffle.partitions

   
  运行  
  spark.driver.extraClassPath (none) 额外的classpath条目需预先添加到驱动程序 classpath中。 注意 : 在客户端模式下,这一套配置不能通过 SparkConf 直接在应用在应用程序中,因为 JVM 驱动已经启用了。相反,请在配置文件中通过设置 --driver-class-path 选项或者选择默认属性。 注意: 在客户端模式下,这一套配置不能通过 SparkConf 直接应用在应用程序中,因为 JVM 驱动已经启用了。相反,请在配置文件中通过设置 --driver-java-options 选项或者选择默认属性。
spark.driver.extraJavaOptions (none) 一些额外的 JVM 属性传递给驱动。例如,GC 设置或其他日志方面设置。注意,设置最大堆大小(-Xmx)是不合法的。最大堆大小设置可以通过在集群模式下设置 spark.driver.memory 选项,并且可以通过--driver-memory 在客户端模式设置。
spark.driver.extraLibraryPath (none) 当启动 JVM 驱动程序时设置一个额外的库路径。 注意: 在客户端模式下,这一套配置不能通过 SparkConf 直接在应用在应用程序中,因为 JVM 驱动已经启用了。相反,请在配置文件中通过设置 --driver-library-path 选项或者选择默认属性。
spark.driver.userClassPathFirst false (实验)在驱动程序加载类库时,用户添加的 Jar 包是否优先于 Spark 自身的 Jar 包。这个特性可以用来缓解冲突引发的依赖性和用户依赖。目前只是实验功能。这是仅用于集群模式。
spark.executor.extraClassPath (none) 额外的类路径要预先考虑到 executor 的 classpath。这主要是为与旧版本的 Spark 向后兼容。用户通常不应该需要设置这个选项。
spark.executor.extraJavaOptions (none) 一些额外的 JVM 属性传递给 executor。例如,GC 设置或其他日志方面设置。注意,设置最大堆大小(-Xmx)是不合法的。Spark 应该使用 SparkConf 对象或 Spark 脚本中使用的 spark-defaults.conf 文件中设置。最大堆大小设置可以在 spark.executor.memory 进行设置。
spark.executor.extraLibraryPath (none) 当启动 JVM 的可执行程序时设置额外的类库路径。
spark.executor.logs.rolling.maxRetainedFiles (none) 最新回滚的日志文件将被系统保留。旧的日志文件将被删除。默认情况下禁用。
spark.executor.logs.rolling.enableCompression false Enable executor log compression. If it is enabled, the rolled executor logs will be compressed. Disabled by default.
spark.executor.logs.rolling.maxSize (none) 设置最大文件的大小,以字节为单位日志将被回滚。默认禁用。见 spark.executor.logs.rolling.maxRetainedFiles 旧日志的自动清洗。
spark.executor.logs.rolling.strategy (none) 设置 executor 日志的回滚策略。它可以被设置为 “时间”(基于时间的回滚)或 “大小”(基于大小的回滚)。对于 “时间”,使用 spark.executor.logs.rolling.time.interval 设置回滚间隔。用 spark.executor.logs.rolling.maxSize 设置最大文件大小回滚。
spark.executor.logs.rolling.time.interval daily 设定的时间间隔,executor 日志将回滚。默认情况下是禁用的。有效值是每天,每小时,每分钟或任何时间间隔在几秒钟内。见 spark.executor.logs.rolling.maxRetainedFiles 旧日志的自动清洗。
spark.executor.userClassPathFirst false (实验)与 spark.driver.userClassPathFirst 相同的功能,但适用于执行程序的实例。
spark.executorEnv.[EnvironmentVariableName] (none) 通过添加指定的环境变量 EnvironmentVariableName 给 executor 进程。用户可以设置多个环境变量。
spark.redaction.regex (?i)secret|password Regex to decide which Spark configuration properties and environment variables in driver and executor environments contain sensitive information. When this regex matches a property key or value, the value is redacted from the environment UI and various logs like YARN and event logs.
spark.python.profile false 启用在 python 中的 profile。结果将由 sc.show_profiles() 显示,或者它将会在驱动程序退出后显示。它还可以通过 sc.dump_profiles(path) dump 到磁盘。如果一些 profile 文件的结果已经显示,那么它们将不会再驱动程序退出后再次显示。默认情况下,pyspark.profiler.BasicProfiler 将被使用,但这可以通过传递一个 profile 类作为一个参数到 SparkContext 中进行覆盖。
spark.python.profile.dump (none) 这个目录是在驱动程序退出后,proflie 文件 dump 到磁盘中的文件目录。结果将为每一个 RDD dump 为分片文件。它们可以通过 ptats.Stats() 加载。如果指定,profile 结果将不会自动显示。
spark.python.worker.memory 512m 在聚合期间,每个python工作进程使用的内存量,与JVM内存条(例如:512m, 2g)格式相同。如果在聚合过程中使用的内存高于此数量,则会将数据溢出到磁盘中。
spark.python.worker.reuse true 重用 python worker。如果为 true,它将使用固定数量的 worker 数量。不需要为每一个任务分配 python 进程。如果是大型的这将是非常有用。
spark.files Comma-separated list of files to be placed in the working directory of each executor.
spark.submit.pyFiles Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
spark.jars Comma-separated list of local jars to include on the driver and executor classpaths.
spark.jars.packages Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths. The coordinates should be groupId:artifactId:version. If spark.jars.ivySettings is given artifacts will be resolved according to the configuration in the file, otherwise artifacts will be searched for in the local maven repo, then maven central and finally any additional remote repositories given by the command-line option --repositories. For more details, see Advanced Dependency Management.
spark.jars.excludes Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in spark.jars.packages to avoid dependency conflicts.
spark.jars.ivy Path to specify the Ivy user directory, used for the local Ivy cache and package files from spark.jars.packages. This will override the Ivy property ivy.default.ivy.user.dir which defaults to ~/.ivy2.
spark.jars.ivySettings Path to an Ivy settings file to customize resolution of jars specified using spark.jars.packages instead of the built-in defaults, such as maven central. Additional repositories given by the command-line option --repositories will also be included. Useful for allowing Spark to resolve artifacts from behind a firewall e.g. via an in-house artifact server like Artifactory. Details on the settings file format can be found at http://ant.apache.org/ivy/history/latest-milestone/settings.html  
  spark.pyspark.driver.python Python binary executable to use for PySpark in driver. (default is spark.pyspark.python)  
  spark.pyspark.python Python binary executable to use for PySpark in both driver and executors.  
   
  ui spark.ui.port 4040 任务控制台使用端口  
    spark.ui.retainedJobs 1000
master会读取每个task的event log日志去生成spark ui,调小可以减少保存在Master内存中的作业信息
    spark.ui.retainedStages 1000 在垃圾回收器收集之前spark UI能保留的最大stage数量
    spark.worker.ui.retainedDrivers 200 web ui中看到worker节点消失或处于dead状态,在该节点运行的任务则会报各种 lost worker 的错误,worker内存中保存了大量的ui信息导致gc时失去和master之间的心跳,调小可以减少保存在Worker内存中的Driver,Executor信息

    spark.worker.ui.retainedExecutors 200
    spark.ui.killEnabled true 允许通过web ui界面停止stage和jobs  
    spark.eventLog.enabled false 是否记录spark事件的日志  
    spark.eventLog.compress false 是否压缩事件产生的日志  
    spark.eventLog.dir file://tmp/spark-events spark事件产生日志的目录,在这个目录里,每个任务会创建一个子目录存放各个任务的日志文件  
完整性

 

   
   

dfs.client.read.shortcircuit.skip.checksum

  hdfs数据完整性  
安全  spark.history.kerberos.enabled false 用于表明历史服务器是否允许使用kerberos登录,当历史服务器需要访问一个安全Hadoop上的HDFS时,这是很有用的;如果设置为true,则用配置spark.history.kerberos.principal 和 spark.history.kerberos.keytab  
    spark.history.kerberos.principal (null) Kerberos安全配置的名字  
    spark.history.kerberos.keytab   kerberos keytab的存放位置  
    spark.history.ui.acls.enable false 指定是否应该检查访问控制列表,以授权用户查看应用详情;如果启用,当应用运行后,检查将忽略应用本身设置的spark.ui.acls.enable参数;应用所有者有权查看他们自己的应用情况,当设置了spark.ui.view.acls参数的其他用户也有权查看这类应用;如果禁用,则不会进行检查  
    spark.authenticate false 各个连接之间是否需要验证  
    spark.authenticate.secret None 各组件之间进行验证的秘钥,如果集群不是在YARN并且spark.authenticate设置为true的时候需要设置该属性  
    spark.core.connection.auth.wait.timeout 30 连接时验证的超时时间(秒)  
    spark.core.connection.ack.wait.timeout 60 连接时应答的超时时间,是为了避免由于GC带来的长时间等待  
    spark.ui.filters None 以逗号分隔的过滤器的名字,这些过滤器会在web UI中使用,它们必须是继承自javax servlet Filter类。过滤器的参数应为:spark..params='param1=value1,param2=value2'形式  
    spark.acls.enable false spark acls是否应该启用,如果为true那么会检查用户是否有访问和修改job的权限  
    spark.ui.view.acls Empty 逗号分隔的那些有权限访问web UI的用户,默认情况下只有启动当前job的用户才有访问权限  
    spark.modify.acls Empty 逗号分隔的那些有权限修改job的用户,默认情况下只有启动当前job的用户才有访问权限  
    spark.admin.acls Empty 逗号分隔的有权限查看和修改所有job的用户  
           
端口 spark.history.ui.port 18080    
    spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider 历史应用接入的类名,目前只有一个Spark提供的实现类,它是通过存储在文件系统中的应用程序日志中寻找出来的
    spark.history.updateInterval 10s 呈现在历史服务器上的应用数据更新周期,每一个更新操作都会记录日志并做持久化操作
    spark.history.retainedApplications 50 HistoryServer 内部有一个cache, 缓存application信息,这个cache的大小是spark.history.retainedApplications参数的值,cache有淘汰机制,如果超过这个限制,旧的应用记录将被移除;  
    spark.eventLog.enabled true 是否记录spark事件
    spark.eventLog.dir hdfs:///spark2-history/ 保存日志相关信息的路径,可以是hdfs://开头的HDFS路径,也可以是file://开头的本地路径,都需要提前创建
    spark.eventLog.compress   spark.eventLog.enabled为true,该参数生效,默认使用的是snappy
    spark.yarn.historyServer.address {{spark_history_server_host}}:{{spark_history_ui_port}} Spark history server的地址(不加http://).这个地址会在Spark应用程序完成后提交给YARN RM,然后可以在RM UI上点击链接跳转到history server UI上
    spark.history.fs.logDirectory   指定Spark History Server需要读取信息的路径,可以是本地文件系统(file:/tmp/spark-events)也可以是hdfs(hdfs:///spark2-history/)  
执行相关    

 

 
  spark.default.parallelism
  • local mode:本地机器的CPU数量
  • mesos file grained mode:8
  • 其他模式:所有执行器节点的cpu数量之和与2的最大值
当没有显式设置该值表示系统使用集群中运行shuffle操作(如groupByKey,reduceByKey)的默认的任务数  
  spark.broadcast.factory org.apache.spark.broadcast.TorrentBroadcastFactory 广播时使用的实现类  
  spark.broadcast.blockSize 4096 TorrentBroadcastFactory的块大小。该值过大会降低广播时的并行度(速度变慢),过小的话BlockManager的性能不能发挥到最佳  
  spark.files.overwrite false 通过SparkContext.addFile()添加的文件是否可以覆盖之前已经存在并且内容不匹配的文件  
  spark.files.fetchTimeout false 获取由driver通过SparkContext.addFile()添加的文件时是否启用通信超时  
  spark.storage.memoryFraction 0.6 java heap用于spark内存缓存的比例,该值不应该大于jvm中老生代对象的大小。当你自己设置了老生代的大小时可以适当加大该值  
  spark.storage.unrollFraction 0.2 spark.storage.memoryFraction中用于展开块的内存比例,当没有足够内存来展开新的块的时候会通过丢弃已经存在的旧的块来腾出空间  
  spark.tachyonStore.baseDir System.getProperty("java.io.tmpdir") Tachyon文件系统存放RDD的目录。tachyon文件系统的URL通过spark.tachyonStore.url进行设置。可通过逗号分隔设置多个目录  
  spark.storage.memoryMapThreshold 8192 以字节为单位的快大小,用于磁盘读取一个块大小时进行内存映射。这可以防止spark在内存映射时使用很小的块,一般情况下,对块进行内存映射的开销接近或低于操作系统的页大小  
  spark.tachyonStore.url tachyon://localhost:19998 tachyon文件系统的url  
  spark.cleaner.ttl (infinite) spark记录任何元数据(stages生成、task生成等)的持续时间。定期清理可以确保将超期的元数据删除,这在运行长时间任务时是非常有用的,如运行7*24的spark streaming任务。RDD持久化在内存中的超期数据也会被清理  
  spark.hadoop.validateOutputSpecs true 当为true时,在使用saveAsHadoopFile或者其他变体时会验证数据输出的合理性(如检查输出目录是否还存在)。  
   
日志清理 spark.history.fs.cleaner.enabled  

是否开启日志清理功能

示例值:true

    spark.history.fs.cleaner.interval  

检测周期;单位是d、h

示例值:1h

    spark.history.fs.cleaner.maxAge  

日志最大保留时间,单位是d、h

示例值:3d

SparkStreaming      

 

 
  spark.streaming.blockInterval 200 spark streaming接收器将接受到的数据合并成数据块并存储在spark里的时间间隔(毫秒)  
  spark.streaming.receiver.maxRate infinite 每个接收器将数据放入block的最大速率(每秒),每个stream每秒最多只能消费这么多的数据。如果该值《=0那么速率将没有限制  
  spark.streaming.unpersist true 如果为true,那么强迫将spark streaming持久化的RDD数据从spark内存中清理,同样的,spark streaming接收的原始输入数据也会自动被清理;如果为false,则允许原始输入数据和持久化的RDD数据可被外部的streaming应用程序访问,因为这些数据不会被自动清理,但会带来更大的内存使用  
kafka    

 

 
    spark.streaming.kafka.consumer.poll.ms
60000 可能报Failed to get records for ... after polling for 512  
  输出        
    spark.sql.parquet.compression.codec uncompressed/snappy    
调优  

超时 

spark.network.timeout
120s

单位seconds(s),milliseconds (ms),microseconds(us),minutes (m or min), hour (h)

spark.executor.heartbeatInterval不能大于spark.network.timeout,否者会有Resubmitted (resubmitted due to lost executor)
heartbeat spark.executor.heartbeatInterval 10000 每个executor向driver发送心跳的间隔时间(毫秒)。可能会遇到Futures timed out after [10 seconds]
失败容忍度

spark.yarn.max.executor.failures 100

shuffle

spark.shuffle.file.buffer
32k

该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升
spark.reducer.maxSizeInFlight
48m

该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升
spark.shuffle.io.maxRetries 3

shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败

调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
spark.shuffle.io.retryWait
5s

shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数代表了每次重试拉取数据的等待间隔,默认是5s

调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性
spark.shuffle.memoryFraction 0.2

该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%

调优建议:如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右
spark.shuffle.manager
sort

该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高
在spark2.0.2.2.5.6.0-40设置hash,报ERROR ApplicationMaster: User class threw exception: java.lang.ClassNotFoundException: hash

调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug
spark.shuffle.sort.bypassMergeThreshold 200

当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件

调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高
spark.shuffle.consolidateFiles false

如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能

调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%
spark.shuffle.spill true 设置为true则会限制在reduce阶段的内存使用量,超出部分会写到硬盘中,超出的阀值通过spark.shuffle.memoryFraction指定  
spark.shuffle.spill.compress true 是否压缩shuffle期间溢出的数据。通过spark.io.compression.codec设置压缩方式  
spark.shuffle.memoryFraction 0.2 如果spark.shuffle.spill设置为true,那么shuffle期间内存使用最大为总内存*该值,超出部分会写到硬盘,如果经常会溢出,则可适当增大该值。  
spark.shuffle.compress true 是否压缩输出文件  
spark.shuffle.file.buffer.kb 32 每次shuffle过程驻留在内存的buffer大小(单位:字节),在shuffle中间数据的产生过程中可减少硬盘的IO操作  
spark.reducer.maxMbInFlight 48 设置reduce任务能同时从map任务的输出文件中取多大的数据(单位:M)。在内存较少的情况下需要降低该值  
压缩序列化

  spark.broadcast.compress true 是否压缩需要广播的数据
  spark.rdd.compress false RDD数据在序列化之后是否进一步进行压缩后再存储到内存或磁盘上
  spark.io.compression.codec snappy RDD数据或shuffle输出数据使用的压缩算法,有lz4,lzf和snappy三种方式
  spark.io.compression.snappy.block.size 32768 在snappy压缩时指定的块大小(字节),降低该值也会降低shuffle过程使用的内存
  spark.io.compression.lz4.block.size 32768 和上述类似,只不过只在压缩方式为lz4时有效
  spark.closure.serializer org.apache.spark.serializer.JavaSerializer 序列化类
  spark.serializer.objectStreamReset 100 当序列化方式使用JavaSerializer时,序列化器会缓存对象以免写入冗余的数据,但这会使垃圾回收器停止对这些对象进行垃圾收集。所以当使用reset序列化器后就会使垃圾回收器重新收集那些旧对象。该值设置为-1则表示禁止周期性的reset,默认情况下每100个对象就会被reset一次序列化器
  spark.kryo.referenceTracking true 当使用kryo序列化器时,是否跟踪对同一个对象的引用情况,这对对象引用有循环引用或同一对象有多个副本的情况是很有用的。否则可以设置为false以提高性能
  spark.kryo.registrationRequired false 是否需要使用kryo来注册对象。当为true时,如果序列化一个未使用kryo注册的对象则会抛出异常,当为false,kryo会将未注册的类的名字一起写到序列化对象中,所以这会带来性能开支,所以在用户还没有从注册队列中删除相应的类时应该设置为true
  spark.kryoserializer.buffer.mb 0.064 kryo的序列化缓冲区的初始值。每个worker的每个core都会有一个缓冲区
  spark.kryoserializer.buffer.max.md 64 kryo序列化缓冲区允许的最大值(单位:M),这个值必须大于任何你需要序列化的对象。当遇到buffer limit exceeded异常时可以适当增大该值
网络相关

  spark.driver.host (本地主机名) driver监听的IP或主机名,用于与执行器和standalone模式的master节点进行通信
  spark.driver.port (随机) driver监听的端口号
  spark.fileserver.port (随机) driver的HTTP文件服务器监听的端口
  spark.broadcast.port (随机) driver的广播服务器监听的端口,该参数对于torrent广播模式是没有作用的
  spark.replClassServer.port (随机) driver的HTTP类服务器监听的端口,只用于spark shell
  spark.blockManager.port (随机) 所有块管理者监听的端口
  spark.executor.port (随机) executor监听的端口,用于与driver进行通信
  spark.port.maxRetries 16 绑定到某个端口的最大重试次数
  spark.akka.frameSieze 10 以MB为单位的driver和executor之间通信信息的大小,该值越大,driver可以接受更大的计算结果(如在一个很大的数据集上使用collect()方法)
  spark.akka.threads 4 用于通信的actor线程数,当在很大的集群中driver拥有更多的CPU内核数的driver可以适当增加该属性的值
  spark.akka.timeout 100 spark节点之间通信的超时时间(秒)
  spark.akka.heartbeat.pauses 600 下面三个参数通常一起使用。如果启用错误探测器,有助于对恶意的executor的定位,而对于由于GC暂停或网络滞后引起的情况下,不需要开启错误探测器,另外错误探测器的开启会导致由于心跳信息的频繁交换引起网络泛滥。设大该值可以禁用akka内置的错误探测器,表示akka可接受的心跳停止时间(秒)
  spark.akka.failure-detector.threshold 300.0 设大该值可以禁用akka内置的错误探测器,对应akka的akka.remote.transport-failure-detector.threshold
  spark.akka.heartbeat.interval 1000 设大该值可以禁用akka内置的错误探测器,该值越大会减少网络负载,越小就会向akka的错误探测器发送信息
调度相关

  spark.task.cpus 1 为每个人物分配的cpu数
  spark.task.maxFailures 4 每个单独任务允许的失败次数,必须设置为大于1,重试次数=该值-1
  spark.scheduler.mode FIFO 提交到SparkContext的任务的调度模式。设置为FAIR表示使用公平的方式进行调度而不是以队列的方式
  spark.cores.max (未设置) 当应用程序运行在standalone集群活着粗粒度共享模式mesos集群时,应用程序向集群请求的最大cpu内核总数(不是指每台机器,而是整个集群)。如果不设置,对于standalone集群将使用spark.deploy.defaultCores的值,而mesos将使用集群中可用的内核
  spark.mesos.coarse false 如果设置为true,在mesos集群上就会使用粗粒度共享模式,这种模式使得spark获得一个长时间运行的mesos任务而不是一个spark任务对应一个mesos任务。这对短查询会带来更低的等待时间,但资源会在整个spark任务的执行期间内被占用
  spark.speculation false 当设置为true时,将会推断任务的执行情况,当一个或多个任务在stage里执行较慢时,这些任务会被重新发布
  spark.speculation.interval 100 spark推断任务执行情况的间隔时间(毫秒)
  spark.speculation.quantile 0.75 推断启动前,stage必须要完成总task的百分比
  spark.speculation.multiplier 1.5 比已完成task的运行速度中位数慢多少倍才启用推断
  spark.locality.wait 3000 启动一个本地数据任务的等待时间,当等待时间超过该值时,就会启动下一个本地优先级别的任务。该设置同样可以应用到各优先级别的本地性之间(本地进程,本地节点,本地机架,任意节点),当然可以通过spark.locality.wait.node等参数设置不同优先级别的本地性
  spark.locality.wait.process spark.locality.wait 本地进程的本地等待时间,它会影响尝试访问缓存数据的任务
  spark.locality.wait.node spark.locality.wait 本地节点的本地等待时间,当设置为0,就会忽略本地节点并立即在本地机架上寻找
  spark.locality.wait.rack spark.locality.wait 本地机架的本地等待时间
  spark.scheduler.revive.interval 1000 复活重新获取资源的任务的最长时间间隔(毫秒)
  spark.scheduler.minRegisteredResourcesRatio 0 在调度开始之前已注册的资源需要达到的最小比例,如果不设置该属性的话,那么调度开始之前需要等待的最大时间由spark.scheduler.maxRegisteredResourcesWaitingTime设置
  spark.scheduler.maxRegisteredResourcesWaitingTime 30000 调度开始之前需要等待的最大时间(毫秒)
  spark.localExecution.enabled false 在spark调用first()或take()等任务时是否将任务发送给集群,当设置为true时会使这些任务执行速度加快,但是可能需要将整个分区的数据装载到driver
Windows函数

spark.sql.windowExec.buffer.spill.threshold  

WindowExec执行计划中数据结构默认设置的阔值是4096,太小可能导致了内存溢出

  缓存表 Sparksql仅仅会缓存必要的列,并且自动调整压缩算法来减少内存和GC压力
  spark.sql.inMemoryColumnarStorage.compressed true

假如设置为true,SparkSql会根据统计信息自动的为每个列选择压缩方式进行压缩

  spark.sql.inMemoryColumnarStorage.batchSize
10000

控制列缓存的批量大小。批次大有助于改善内存使用和压缩,但是缓存数据会有OOM的风险

  广播    

 

  spark.sql.broadcastTimeout 300

广播等待超时时间,单位秒

  spark.sql.autoBroadcastJoinThreshold
10485760 (10 MB)

最大广播表的大小。设置为-1可以禁止该功能。当前统计信息仅支持Hive Metastore表

  分区数据的调控  

 

  spark.sql.shuffle.partitions 200

对于有些公司来说,估计在用的时候会有Spark sql处理的数据比较少,然后资源也比较少,这时候这个shuffle分区数200就太大了,应该适当调小,来提升性能。

也有一些公司,估计在处理离线数据,数据量特别大,而且资源足,这时候shuffle分区数200,明显不够了,要适当调大

  文件与分区

 

 

 

 

spark.sql.files.maxPartitionBytes

134217728 (128 MB)

打包传入一个分区的最大字节,在读取文件的时候

 

spark.sql.files.openCostInBytes

4194304 (4 MB)

用相同时间内可以扫描的数据的大小来衡量打开一个文件的开销。当将多个文件写入同一个分区的时候该参数有用。该值设置大一点有好处,有小文件的分区会比大文件分区处理速度更快(优先调度)

 

spark.sql.files.maxPartitionBytes

 

该值的调整要结合你想要的并发度及内存的大小来进行。

  spark.sql.files.openCostInBytes  

说直白一些这个参数就是合并小文件的阈值,小于这个阈值的文件将会合并

升级  

spark.yarn.archive /hdp/apps/${hdp.version}/spark2/spark2-hdp-yarn-archive.tar.gz
/hdp/apps/2.5.6.0-40/spark2/spark2-hdp-yarn-archive.tar.gz

spark on yarn 提交任务后会下载该文件,通过该参数可以修改spark2-hdp-yarn-archive.tar.gz存放位置,也就是__spark_libs__的值;目前看只有重启spark history server,才会将本地/usr/hdp/2.6.2.0-205/spark2/jars/ 下jar包打包成spark2-hdp-yarn-archive.tar.gz上传

spark.yarn.archive优先级高于spark.yarn.jars
spark.yarn.jars
/user/spark/share/lib/*.jar

spark.yarn.user.jar

 
与其他组件

 
kafka maxFilesPerTrigger spark
.readStream
.option("maxFilesPerTrigger", 1)
.schema(parquetSchema)
.parquet(parqurtUri)

把parquet文件处理后发到Kafka,怎么才能分批发送,如果是用Spark Structured Streaming,把maxFilesPerTrigger选项设置为1

 


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: