Skip to content

Commit 6177ed0

Browse files
jzhugeclaude
andcommitted
[SPARK-56157][CORE] Support limitActiveProcessorCount in standalone mode
Add `activeProcessorCountOpts()` to `DriverRunner` and `ExecutorRunner` that returns `-XX:ActiveProcessorCount=<cores>` when the feature is enabled, and prepend it to the JVM opts passed to `CommandUtils.buildProcessBuilder`. Update the config docs to reflect that the feature now works in both YARN and standalone modes. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
1 parent 842eb7b commit 6177ed0

5 files changed

Lines changed: 76 additions & 7 deletions

File tree

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ import org.apache.spark.deploy.master.DriverState
3232
import org.apache.spark.deploy.master.DriverState.DriverState
3333
import org.apache.spark.internal.Logging
3434
import org.apache.spark.internal.LogKeys._
35-
import org.apache.spark.internal.config.{DRIVER_RESOURCES_FILE, SPARK_DRIVER_PREFIX}
35+
import org.apache.spark.internal.config.{DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED,
36+
DRIVER_RESOURCES_FILE, SPARK_DRIVER_PREFIX}
3637
import org.apache.spark.internal.config.UI.UI_REVERSE_PROXY
3738
import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT
3839
import org.apache.spark.resource.ResourceInformation
@@ -175,6 +176,13 @@ private[deploy] class DriverRunner(
175176
localJarFile.getAbsolutePath
176177
}
177178

179+
private[worker] def activeProcessorCountOpts(): Seq[String] =
180+
if (conf.get(DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED)) {
181+
Seq(s"-XX:ActiveProcessorCount=${driverDesc.cores}")
182+
} else {
183+
Seq.empty
184+
}
185+
178186
private[worker] def prepareAndRunDriver(): Int = {
179187
val driverDir = createWorkingDirectory()
180188
val localJarFilename = downloadUserJar(driverDir)
@@ -187,8 +195,9 @@ private[deploy] class DriverRunner(
187195
}
188196

189197
// config resource file for driver, which would be used to load resources when driver starts up
190-
val javaOpts = driverDesc.command.javaOpts ++ resourceFileOpt.map(f =>
191-
Seq(s"-D${DRIVER_RESOURCES_FILE.key}=${f.getAbsolutePath}")).getOrElse(Seq.empty)
198+
val javaOpts = activeProcessorCountOpts() ++
199+
driverDesc.command.javaOpts ++ resourceFileOpt.map(f =>
200+
Seq(s"-D${DRIVER_RESOURCES_FILE.key}=${f.getAbsolutePath}")).getOrElse(Seq.empty)
192201
// TODO: If we add ability to submit multiple jars they should also be added here
193202
val builder = CommandUtils.buildProcessBuilder(driverDesc.command.copy(javaOpts = javaOpts),
194203
securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
2828
import org.apache.spark.deploy.StandaloneResourceUtils.prepareResourcesFile
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.internal.LogKeys._
31+
import org.apache.spark.internal.config.EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED
3132
import org.apache.spark.internal.config.SPARK_EXECUTOR_PREFIX
3233
import org.apache.spark.internal.config.UI._
3334
import org.apache.spark.resource.ResourceInformation
@@ -132,6 +133,13 @@ private[deploy] class ExecutorRunner(
132133
}
133134
}
134135

136+
private[worker] def activeProcessorCountOpts(): Seq[String] =
137+
if (conf.get(EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED)) {
138+
Seq(s"-XX:ActiveProcessorCount=$cores")
139+
} else {
140+
Seq.empty
141+
}
142+
135143
/** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
136144
private[worker] def substituteVariables(argument: String): String = argument match {
137145
case "{{WORKER_URL}}" => workerUrl
@@ -152,7 +160,7 @@ private[deploy] class ExecutorRunner(
152160
// Launch the process
153161
val arguments = appDesc.command.arguments ++ resourceFileOpt.map(f =>
154162
Seq("--resourcesFile", f.getAbsolutePath)).getOrElse(Seq.empty)
155-
val subsOpts = appDesc.command.javaOpts.map {
163+
val subsOpts = activeProcessorCountOpts() ++ appDesc.command.javaOpts.map {
156164
Utils.substituteAppNExecIds(_, appId, execId.toString)
157165
}
158166
val subsCommand = appDesc.command.copy(arguments = arguments, javaOpts = subsOpts)

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2946,15 +2946,15 @@ package object config {
29462946
private[spark] val DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED =
29472947
ConfigBuilder("spark.driver.limitActiveProcessorCount.enabled")
29482948
.doc("Whether to add -XX:ActiveProcessorCount=<spark.driver.cores> to the driver JVM " +
2949-
"options. Currently, this only takes effect in YARN cluster mode.")
2949+
"options. Currently, this takes effect in YARN cluster mode and standalone cluster mode.")
29502950
.version("4.2.0")
29512951
.booleanConf
29522952
.createWithDefault(false)
29532953

29542954
private[spark] val EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED =
29552955
ConfigBuilder("spark.executor.limitActiveProcessorCount.enabled")
29562956
.doc("Whether to add -XX:ActiveProcessorCount=<spark.executor.cores> to executor JVM " +
2957-
"options. Currently, this only takes effect in YARN mode.")
2957+
"options. Currently, this takes effect in YARN mode and standalone mode.")
29582958
.version("4.2.0")
29592959
.booleanConf
29602960
.createWithDefault(false)

core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ import org.mockito.invocation.InvocationOnMock
2727
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
2828

2929
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
30-
import org.apache.spark.deploy.{Command, DriverDescription}
30+
import org.apache.spark.deploy.{Command, DeployTestUtils, DriverDescription}
3131
import org.apache.spark.deploy.master.DriverState
32+
import org.apache.spark.internal.config.DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED
3233
import org.apache.spark.rpc.RpcEndpointRef
3334
import org.apache.spark.util.Clock
3435

@@ -44,6 +45,14 @@ class DriverRunnerTest extends SparkFunSuite {
4445
new SecurityManager(conf)))
4546
}
4647

48+
private def createDriverRunnerWithCores(conf: SparkConf, cores: Int): DriverRunner = {
49+
val driverDesc = new DriverDescription(
50+
"hdfs://some-dir/some.jar", 100, cores, false, DeployTestUtils.createDriverCommand())
51+
new DriverRunner(conf, "driver-1", new File("workDir"), new File("sparkHome"),
52+
driverDesc, null, "spark://worker", "http://publicAddress:80",
53+
new SecurityManager(conf))
54+
}
55+
4756
private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
4857
val processBuilder = mock(classOf[ProcessBuilderLike])
4958
when(processBuilder.command).thenReturn(Seq("mocked", "command"))
@@ -208,4 +217,21 @@ class DriverRunnerTest extends SparkFunSuite {
208217
assert(runner.finalException.get.isInstanceOf[RuntimeException])
209218
}
210219
}
220+
221+
test("SPARK-56157: APC flag not set by default") {
222+
val runner = createDriverRunnerWithCores(new SparkConf(), cores = 2)
223+
assert(runner.activeProcessorCountOpts() === Seq.empty)
224+
}
225+
226+
test("SPARK-56157: APC flag set when enabled") {
227+
val conf = new SparkConf().set(DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED, true)
228+
val runner = createDriverRunnerWithCores(conf, cores = 2)
229+
assert(runner.activeProcessorCountOpts() === Seq("-XX:ActiveProcessorCount=2"))
230+
}
231+
232+
test("SPARK-56157: APC flag reflects core count") {
233+
val conf = new SparkConf().set(DRIVER_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED, true)
234+
val runner = createDriverRunnerWithCores(conf, cores = 5)
235+
assert(runner.activeProcessorCountOpts() === Seq("-XX:ActiveProcessorCount=5"))
236+
}
211237
}

core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.File
2121

2222
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
2323
import org.apache.spark.deploy.{ApplicationDescription, Command, DeployTestUtils, ExecutorState}
24+
import org.apache.spark.internal.config.EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED
2425
import org.apache.spark.resource.ResourceProfile
2526

2627
class ExecutorRunnerTest extends SparkFunSuite {
@@ -39,4 +40,29 @@ class ExecutorRunnerTest extends SparkFunSuite {
3940
val builderCommand = builder.command()
4041
assert(builderCommand.get(builderCommand.size() - 1) === appId)
4142
}
43+
44+
test("SPARK-56157: APC flag not set by default") {
45+
val runner = createExecutorRunner(new SparkConf(), cores = 2)
46+
assert(runner.activeProcessorCountOpts() === Seq.empty)
47+
}
48+
49+
test("SPARK-56157: APC flag set when enabled") {
50+
val conf = new SparkConf().set(EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED, true)
51+
val runner = createExecutorRunner(conf, cores = 2)
52+
assert(runner.activeProcessorCountOpts() === Seq("-XX:ActiveProcessorCount=2"))
53+
}
54+
55+
test("SPARK-56157: APC flag reflects core count") {
56+
val conf = new SparkConf().set(EXECUTOR_LIMIT_ACTIVE_PROCESSOR_COUNT_ENABLED, true)
57+
val runner = createExecutorRunner(conf, cores = 7)
58+
assert(runner.activeProcessorCountOpts() === Seq("-XX:ActiveProcessorCount=7"))
59+
}
60+
61+
private def createExecutorRunner(conf: SparkConf, cores: Int): ExecutorRunner = {
62+
new ExecutorRunner(
63+
"appId", 1, DeployTestUtils.createAppDesc(), cores, 1234, null,
64+
"workerId", "http://", "host", 123, "publicAddress",
65+
new File(sys.props.getOrElse("spark.test.home", ".")), new File("workDir"), "spark://worker",
66+
conf, Seq("localDir"), ExecutorState.RUNNING, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
67+
}
4268
}

0 commit comments

Comments
 (0)