Skip to content

Commit 788a31a

Browse files
committed
[FLINK-39404][runtime] HardwareDescription reports incorrect CPU cores in containerized environments with fractional CPU limits
1 parent 01070e6 commit 788a31a

3 files changed

Lines changed: 177 additions & 7 deletions

File tree

flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public static int getPoolSize(Configuration config) {
115115
final int poolSize =
116116
config.get(
117117
ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE,
118-
4 * Hardware.getNumberCPUCores());
118+
(int) Math.ceil(4 * Hardware.getNumberCPUCoresAsDouble()));
119119
Preconditions.checkArgument(
120120
poolSize > 0,
121121
"Illegal pool size (%s) of io-executor, please re-configure '%s'.",

flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public final class HardwareDescription implements Serializable {
4141

4242
/** The number of CPU cores available to the JVM on the compute node. */
4343
@JsonProperty(FIELD_NAME_CPU_CORES)
44-
private final int numberOfCPUCores;
44+
private final double numberOfCPUCores;
4545

4646
/** The size of physical memory in bytes available on the compute node. */
4747
@JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY)
@@ -67,7 +67,7 @@ public final class HardwareDescription implements Serializable {
6767
*/
6868
@JsonCreator
6969
public HardwareDescription(
70-
@JsonProperty(FIELD_NAME_CPU_CORES) int numberOfCPUCores,
70+
@JsonProperty(FIELD_NAME_CPU_CORES) double numberOfCPUCores,
7171
@JsonProperty(FIELD_NAME_SIZE_PHYSICAL_MEMORY) long sizeOfPhysicalMemory,
7272
@JsonProperty(FIELD_NAME_SIZE_JVM_HEAP) long sizeOfJvmHeap,
7373
@JsonProperty(FIELD_NAME_SIZE_MANAGED_MEMORY) long sizeOfManagedMemory) {
@@ -82,7 +82,7 @@ public HardwareDescription(
8282
*
8383
* @return the number of CPU cores available to the JVM on the compute node
8484
*/
85-
public int getNumberOfCPUCores() {
85+
public double getNumberOfCPUCores() {
8686
return this.numberOfCPUCores;
8787
}
8888

@@ -126,7 +126,7 @@ public boolean equals(Object o) {
126126
return false;
127127
}
128128
HardwareDescription that = (HardwareDescription) o;
129-
return numberOfCPUCores == that.numberOfCPUCores
129+
return Double.compare(numberOfCPUCores, that.numberOfCPUCores) == 0
130130
&& sizeOfPhysicalMemory == that.sizeOfPhysicalMemory
131131
&& sizeOfJvmHeap == that.sizeOfJvmHeap
132132
&& sizeOfManagedMemory == that.sizeOfManagedMemory;
@@ -141,7 +141,7 @@ public int hashCode() {
141141
@Override
142142
public String toString() {
143143
return String.format(
144-
"cores=%d, physMem=%d, heap=%d, managed=%d",
144+
"cores=%s, physMem=%d, heap=%d, managed=%d",
145145
numberOfCPUCores, sizeOfPhysicalMemory, sizeOfJvmHeap, sizeOfManagedMemory);
146146
}
147147

@@ -150,7 +150,7 @@ public String toString() {
150150
// --------------------------------------------------------------------------------------------
151151

152152
public static HardwareDescription extractFromSystem(long managedMemory) {
153-
final int numberOfCPUCores = Hardware.getNumberCPUCores();
153+
final double numberOfCPUCores = Hardware.getNumberCPUCoresAsDouble();
154154
final long sizeOfJvmHeap = Runtime.getRuntime().maxMemory();
155155
final long sizeOfPhysicalMemory = Hardware.getSizeOfPhysicalMemory();
156156

flink-runtime/src/main/java/org/apache/flink/runtime/util/Hardware.java

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import java.lang.reflect.InvocationTargetException;
3333
import java.lang.reflect.Method;
3434
import java.nio.charset.StandardCharsets;
35+
import java.nio.file.Files;
36+
import java.nio.file.Path;
37+
import java.nio.file.Paths;
3538
import java.util.regex.Matcher;
3639
import java.util.regex.Pattern;
3740

@@ -45,6 +48,11 @@ public class Hardware {
4548
private static final Pattern LINUX_MEMORY_REGEX =
4649
Pattern.compile("^MemTotal:\\s*(\\d+)\\s+kB$");
4750

51+
private static final String CGROUP_V2_CPU_MAX_PATH = "/sys/fs/cgroup/cpu.max";
52+
53+
private static final String CGROUP_V1_CPU_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
54+
private static final String CGROUP_V1_CPU_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";
55+
4856
// ------------------------------------------------------------------------
4957

5058
/**
@@ -56,6 +64,168 @@ public static int getNumberCPUCores() {
5664
return Runtime.getRuntime().availableProcessors();
5765
}
5866

67+
/**
68+
* Gets the number of CPU cores available to the JVM as a fractional value.
69+
*
70+
* <p>On Linux, this method first attempts to detect a container CPU limit via cgroup files (v2,
71+
* then v1). If a limit is found, it is returned as-is (e.g. 0.5, 1.5). If no container limit is
72+
* detected, it falls back to {@link Runtime#availableProcessors()}.
73+
*
74+
* <p>Use this method when the fractional value matters, for example when displaying the CPU
75+
* count in the Web UI or when performing arithmetic before rounding (e.g. {@code 4 * cores}).
76+
* For call sites that need an integer (e.g. thread pool sizes), use {@link
77+
* #getNumberCPUCores()} instead.
78+
*
79+
* @return The number of CPU cores as a double.
80+
*/
81+
public static double getNumberCPUCoresAsDouble() {
82+
double containerLimit = getContainerCpuLimit();
83+
if (containerLimit > 0) {
84+
LOG.debug("Using container CPU limit for core count: limit={}", containerLimit);
85+
return containerLimit;
86+
}
87+
return Runtime.getRuntime().availableProcessors();
88+
}
89+
90+
/**
91+
* Returns the CPU limit of the container as a fractional double by reading Linux cgroup CPU
92+
* quota and period values.
93+
*
94+
* <p>This method attempts to read the CPU limit from cgroup v2 first ({@code
95+
* /sys/fs/cgroup/cpu.max}), then falls back to cgroup v1 ({@code
96+
* /sys/fs/cgroup/cpu/cpu.cfs_quota_us} and {@code cpu.cfs_period_us}).
97+
*
98+
* <p>Examples of return values:
99+
*
100+
* <ul>
101+
* <li>{@code 0.5} - container limited to half a CPU core
102+
* <li>{@code 2.0} - container limited to 2 CPU cores
103+
* <li>{@code -1} - not running in a container, no CPU limit set, or unable to read cgroup
104+
* files (e.g. non-Linux OS)
105+
* </ul>
106+
*
107+
* @return the container CPU limit as a fractional double, or {@code -1} if no limit is detected
108+
*/
109+
public static double getContainerCpuLimit() {
110+
// Try cgroup v2 first
111+
double limit = getCpuLimitFromCgroupV2();
112+
if (limit > 0) {
113+
return limit;
114+
}
115+
116+
// Fall back to cgroup v1
117+
limit = getCpuLimitFromCgroupV1();
118+
if (limit > 0) {
119+
return limit;
120+
}
121+
122+
LOG.debug(
123+
"Could not detect container CPU limit from cgroup files. "
124+
+ "This is expected when not running inside a container or when no CPU limit is set.");
125+
return -1;
126+
}
127+
128+
/**
129+
* Reads CPU limit from cgroup v2.
130+
*
131+
* <p>The file {@code /sys/fs/cgroup/cpu.max} contains two space-separated values: {@code quota
132+
* period}. For example, {@code "50000 100000"} means a limit of 0.5 CPU cores. The string
133+
* {@code "max"} as the quota means no limit is set.
134+
*
135+
* @return the CPU limit as a double, or {@code -1} if unavailable or unlimited
136+
*/
137+
private static double getCpuLimitFromCgroupV2() {
138+
try {
139+
Path path = Paths.get(CGROUP_V2_CPU_MAX_PATH);
140+
if (!Files.exists(path)) {
141+
return -1;
142+
}
143+
144+
String content = Files.readString(path).trim();
145+
String[] parts = content.split("\\s+");
146+
if (parts.length != 2) {
147+
LOG.debug(
148+
"Unexpected format in {}: '{}'. Expected 'quota period'.",
149+
CGROUP_V2_CPU_MAX_PATH,
150+
content);
151+
return -1;
152+
}
153+
154+
// "max" means no CPU limit is set
155+
if ("max".equals(parts[0])) {
156+
LOG.debug("No CPU limit set (cgroup v2 quota is 'max').");
157+
return -1;
158+
}
159+
160+
long quota = Long.parseLong(parts[0]);
161+
long period = Long.parseLong(parts[1]);
162+
if (quota > 0 && period > 0) {
163+
double cpuLimit = (double) quota / period;
164+
LOG.debug(
165+
"Detected cgroup v2 CPU limit: quota={}, period={}, limit={}",
166+
quota,
167+
period,
168+
cpuLimit);
169+
return cpuLimit;
170+
}
171+
} catch (NumberFormatException e) {
172+
LOG.debug("Failed to parse cgroup v2 CPU limit values.", e);
173+
} catch (IOException e) {
174+
LOG.debug("Could not read cgroup v2 CPU limit file: {}", CGROUP_V2_CPU_MAX_PATH, e);
175+
} catch (Throwable t) {
176+
LOG.debug("Unexpected error reading cgroup v2 CPU limit.", t);
177+
}
178+
return -1;
179+
}
180+
181+
/**
182+
* Reads CPU limit from cgroup v1.
183+
*
184+
* <p>The quota is read from {@code /sys/fs/cgroup/cpu/cpu.cfs_quota_us} and the period from
185+
* {@code /sys/fs/cgroup/cpu/cpu.cfs_period_us}. Both values are in microseconds. A quota of
186+
* {@code -1} means no limit is set. The CPU limit is computed as {@code quota / period}.
187+
*
188+
* @return the CPU limit as a double, or {@code -1} if unavailable or unlimited
189+
*/
190+
private static double getCpuLimitFromCgroupV1() {
191+
try {
192+
Path quotaPath = Paths.get(CGROUP_V1_CPU_QUOTA_PATH);
193+
Path periodPath = Paths.get(CGROUP_V1_CPU_PERIOD_PATH);
194+
if (!Files.exists(quotaPath) || !Files.exists(periodPath)) {
195+
return -1;
196+
}
197+
198+
long quota = Long.parseLong(Files.readString(quotaPath).trim());
199+
long period = Long.parseLong(Files.readString(periodPath).trim());
200+
201+
// quota == -1 means no CPU limit is set in cgroup v1
202+
if (quota <= 0) {
203+
LOG.debug("No CPU limit set (cgroup v1 quota={}).", quota);
204+
return -1;
205+
}
206+
207+
if (period <= 0) {
208+
LOG.debug("Invalid cgroup v1 CPU period: {}", period);
209+
return -1;
210+
}
211+
212+
double cpuLimit = (double) quota / period;
213+
LOG.debug(
214+
"Detected cgroup v1 CPU limit: quota={}, period={}, limit={}",
215+
quota,
216+
period,
217+
cpuLimit);
218+
return cpuLimit;
219+
} catch (NumberFormatException e) {
220+
LOG.debug("Failed to parse cgroup v1 CPU limit values.", e);
221+
} catch (IOException e) {
222+
LOG.debug("Could not read cgroup v1 CPU limit files.", e);
223+
} catch (Throwable t) {
224+
LOG.debug("Unexpected error reading cgroup v1 CPU limit.", t);
225+
}
226+
return -1;
227+
}
228+
59229
/**
60230
* Returns the size of the physical memory in bytes.
61231
*

0 commit comments

Comments
 (0)