Skip to content

[Bug]: Hyperpod Elatic Agent keeps on throwing asyncio.exceptions.CancelledError at the end of the Training Job #1033

@anindya-saha

Description

@anindya-saha

Prerequisites

  • I have searched existing issues to make sure this bug has not already been reported.
  • I have verified the issue against the latest main branch.

Environment

Hi Team, I see the below attached error at the end of every training job.

Usinghyperpod-elastic-agent==1.1.2.

Bug Description

Expected Behavior: The training job should shutdown cleanly at the end of the job.

Steps to Reproduce

Added a minimal yaml.

apiVersion: sagemaker.amazonaws.com/v1
kind: HyperPodPyTorchJob
metadata:
  name: tp-smoke
  namespace: mlp
spec:
  nprocPerNode: "8"

  replicaSpecs:
    - name: pods
      replicas: 1
      spares: 0
      template:
        metadata:
          labels:
            job-name: tp-smoke
            replica-type: pods
            app: tp-experiments
        spec:
          nodeSelector:
            beta.kubernetes.io/instance-type: ml.p4d.24xlarge
          serviceAccountName: mlp-sa
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchLabels:
                      job-name: tp-smoke
                  topologyKey: kubernetes.io/hostname
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                  - matchExpressions:
                      - key: sagemaker.amazonaws.com/node-health-status
                        operator: In
                        values:
                          - Schedulable
                      - key: sagemaker.amazonaws.com/compute-type
                        operator: In
                        values:
                          - hyperpod
          topologySpreadConstraints:
            - maxSkew: 1
              topologyKey: kubernetes.io/hostname
              whenUnsatisfiable: DoNotSchedule
              labelSelector:
                matchLabels:
                  job-name: tp-smoke
          tolerations:
            - key: "nvidia.com/gpu"
              operator: "Exists"
              effect: "NoSchedule"
          volumes:
            - name: shmem
              emptyDir:
                medium: Memory
            - name: local-nvme
              hostPath:
                path: /mnt/k8s-disks/0
            - name: fsx-static
              persistentVolumeClaim:
                claimName: fsx-static-claim
            - name: host-nvme-storage
              hostPath:
                path: /opt/dlami/nvme
                type: Directory
          containers:
            - name: pytorch
              image: <Image URI>
              imagePullPolicy: IfNotPresent
              resources:
                requests:
                  nvidia.com/gpu: "8"
                  vpc.amazonaws.com/efa: "4"
                limits:
                  nvidia.com/gpu: "8"
                  vpc.amazonaws.com/efa: "4"
              env:
                - name: LOGLEVEL
                  value: INFO
                - name: NCCL_DEBUG
                  value: INFO
                - name: NCCL_ASYNC_ERROR_HANDLING
                  value: "1"
                - name: NCCL_SOCKET_IFNAME
                  value: ^lo
                - name: GLOO_SOCKET_IFNAME
                  value: eth0
                - name: TORCH_NCCL_ASYNC_ERROR_HANDLING
                  value: "1"
                - name: TORCH_NCCL_ENABLE_MONITORING
                  value: "1"
                - name: TORCH_NCCL_TRACE_BUFFER_SIZE
                  value: "20000"
                - name: TORCH_NCCL_DUMP_ON_TIMEOUT
                  value: "1"
                - name: TORCH_NCCL_DEBUG_INFO_TEMP_FILE
                  value: /tmp/nccl_trace_rank_
                - name: PYTORCH_CUDA_ALLOC_CONF
                  value: "expandable_segments:True"
                - name: PYTHONFAULTHANDLER
                  value: "1"
                - name: FI_EFA_USE_DEVICE_RDMA
                  value: "1"
                - name: FI_EFA_FORK_SAFE
                  value: "1"
                - name: FI_LOG_LEVEL
                  value: "1"
                - name: FI_PROVIDER
                  value: efa
                - name: LD_LIBRARY_PATH
                  value: "/opt/amazon/efa/lib:/opt/amazon/openmpi/lib:/opt/amazon/ofi-nccl/lib:/usr/local/lib:$LD_LIBRARY_PATH"
              command:
                - hyperpodrun
                - "--tee=3"
                - "--log_dir=/tmp/hyperpod"
                - "--nproc_per_node=8"
                - "--nnodes=1"
                - "--server-log-level=warning"
                - "/workspace/training-parallelism-strategies-from-scratch/tensor-parallelism/test_model.py"
              volumeMounts:
                - name: shmem
                  mountPath: /dev/shm
                - name: local-nvme
                  mountPath: /tmp
                - name: fsx-static
                  mountPath: /mnt/fsx
                - name: host-nvme-storage
                  mountPath: /opt/dlami/nvme
              securityContext:
                capabilities:
                  add:
                    - IPC_LOCK
                    - SYS_PTRACE
              livenessProbe:
                exec:
                  command:
                    - /bin/bash
                    - -c
                    - "ps aux | grep python || true"
                initialDelaySeconds: 120
                periodSeconds: 60
                timeoutSeconds: 30
                failureThreshold: 10

  runPolicy:
    cleanPodPolicy: All
    jobMaxRetryCount: 0
    ttlSecondsAfterFinished: 0
"""Standard (non-parallelized) GPT-style transformer model.

A minimal GPT decoder used as the baseline for tensor-parallelism experiments.
All TP scripts import the model definition and shared constants from here.
"""

import math

import torch
import torch.nn as nn
import torch.nn.functional as F


# ---------------------------------------------------------------------------
# Model constants
# ---------------------------------------------------------------------------

D_MODEL = 512       # hidden dimension (embedding size)
N_HEADS = 8         # number of attention heads
D_HEAD = D_MODEL // N_HEADS   # = 64, dimension per head
D_FF = 2048         # feed-forward intermediate dimension (4x D_MODEL)
N_LAYERS = 6        # number of transformer blocks
VOCAB_SIZE = 10_000
MAX_SEQ_LEN = 512

BATCH_SIZE = 8
SEQ_LEN = 256
NUM_WARMUP = 3
NUM_BENCHMARK = 10


# ---------------------------------------------------------------------------
# Utilities
# ---------------------------------------------------------------------------


def count_parameters(model: nn.Module) -> int:
    return sum(p.numel() for p in model.parameters())


def get_gpu_memory_mb(device=0) -> float:
    return torch.cuda.memory_allocated(device) / 1024 / 1024


def get_gpu_peak_memory_mb(device=0) -> float:
    return torch.cuda.max_memory_allocated(device) / 1024 / 1024


# ---------------------------------------------------------------------------
# Model components
# ---------------------------------------------------------------------------


class StandardAttention(nn.Module):
    def __init__(self, d_model: int, n_heads: int):
        super().__init__()
        self.n_heads = n_heads
        self.d_head = d_model // n_heads
        self.scale = 1.0 / math.sqrt(self.d_head)

        self.W_q = nn.Linear(d_model, d_model, bias=False)
        self.W_k = nn.Linear(d_model, d_model, bias=False)
        self.W_v = nn.Linear(d_model, d_model, bias=False)
        self.W_o = nn.Linear(d_model, d_model, bias=False)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        B, T, _ = x.shape
        Q = self.W_q(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
        K = self.W_k(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
        V = self.W_v(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)

        attn = (Q @ K.transpose(-2, -1)) * self.scale
        mask = torch.triu(torch.ones(T, T, device=x.device), diagonal=1).bool()
        attn = attn.masked_fill(mask, float("-inf"))
        attn = F.softmax(attn, dim=-1)

        out = (attn @ V).transpose(1, 2).contiguous().view(B, T, -1)
        return self.W_o(out)


class StandardFFN(nn.Module):
    def __init__(self, d_model: int, d_ff: int):
        super().__init__()
        self.W1 = nn.Linear(d_model, d_ff)
        self.W2 = nn.Linear(d_ff, d_model)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.W2(F.gelu(self.W1(x)))


class StandardTransformerBlock(nn.Module):
    def __init__(self, d_model: int, n_heads: int, d_ff: int):
        super().__init__()
        self.ln1 = nn.LayerNorm(d_model)
        self.attn = StandardAttention(d_model, n_heads)
        self.ln2 = nn.LayerNorm(d_model)
        self.ffn = StandardFFN(d_model, d_ff)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.attn(self.ln1(x))
        x = x + self.ffn(self.ln2(x))
        return x


class StandardGPT(nn.Module):
    def __init__(self):
        super().__init__()
        self.tok_emb = nn.Embedding(VOCAB_SIZE, D_MODEL)
        self.pos_emb = nn.Embedding(MAX_SEQ_LEN, D_MODEL)
        self.blocks = nn.ModuleList([
            StandardTransformerBlock(D_MODEL, N_HEADS, D_FF)
            for _ in range(N_LAYERS)
        ])
        self.ln_f = nn.LayerNorm(D_MODEL)
        self.lm_head = nn.Linear(D_MODEL, VOCAB_SIZE, bias=False)

    def forward(self, input_ids: torch.Tensor) -> torch.Tensor:
        B, T = input_ids.shape
        pos = torch.arange(T, device=input_ids.device).unsqueeze(0)
        x = self.tok_emb(input_ids) + self.pos_emb(pos)
        for block in self.blocks:
            x = block(x)
        x = self.ln_f(x)
        return self.lm_head(x)
from model import *

if __name__ == "__main__":
    model = StandardGPT().cuda()
    n_params = count_parameters(model)

    print(f'Model parameters: {n_params:,} ({n_params * 2 / 1e6:.1f} MB in BF16)')
    print(f'Architecture: d_model={D_MODEL}, n_heads={N_HEADS}, d_ff={D_FF}, layers={N_LAYERS}')
    print(f'\nBlock 0 parameter breakdown:')
    for name, param in model.named_parameters():
        if 'blocks.0.' in name:
            print(f'  {name:40s} {str(list(param.shape)):20s} {param.numel():>8,}')

    test_input = torch.randint(0, VOCAB_SIZE, (2, 32)).cuda()
    with torch.no_grad():
        out = model(test_input)
    print(f'\nForward pass OK: {list(test_input.shape)} -> {list(out.shape)}')
    del model; torch.cuda.empty_cache()

Relevant Log Output

│ [HyperPodElasticAgent] 2026-03-23 03:31:39,332 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/pcontext/pcontext.py:133: Reaped child process 78 with status │
│ [HyperPodElasticAgent] 2026-03-23 03:31:39,474 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/pcontext/pcontext.py:133: Reaped child process 85 with status │
│ [HyperPodElasticAgent] 2026-03-23 03:31:39,700 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/pcontext/pcontext.py:133: Reaped child process 79 with status │
│ [HyperPodElasticAgent] 2026-03-23 03:31:39,707 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/pcontext/pcontext.py:133: Reaped child process 84 with status │
│ [HyperPodElasticAgent] 2026-03-23 03:31:39,714 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/pcontext/pcontext.py:133: Reaped child process 82 with status │
│ [HyperPodElasticAgent] 2026-03-23 03:31:39,807 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/pcontext/pcontext.py:133: Reaped child process 80 with status │
│ [HyperPodElasticAgent] 2026-03-23 03:31:39,881 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/pcontext/pcontext.py:133: Reaped child process 81 with status │
│ [HyperPodElasticAgent] 2026-03-23 03:31:40,625 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/pcontext/pcontext.py:133: Reaped child process 83 with status │
│ [HyperPodElasticAgent] 2026-03-23 03:31:40,985 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/elastic_agent/plr_hyperpod_elastic_agent.py:135: [default] wo │
│ [HyperPodElasticAgent] 2026-03-23 03:31:40,985 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/elastic_agent/hyperpod_elastic_agent.py:338: Transitioned fro │
│ [HyperPodElasticAgent] 2026-03-23 03:31:40,985 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/elastic_agent/hyperpod_elastic_agent.py:340: The agent is fin │
│ [HyperPodElasticAgent] 2026-03-23 03:31:42,049 [WARNING] [rank0-restart0] /usr/local/lib/python3.12/site-packages/torch/distributed/elastic/agent/server/api.py:723: Received 15 death signal, shut │
│ [HyperPodElasticAgent] 2026-03-23 03:31:42,050 [INFO] [rank0-restart0] /usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/elastic_agent/hyperpod_elastic_agent.py:292: Shutdown complet │
│ ERROR:    Traceback (most recent call last):                                                                                                                                                        │
│   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 645, in lifespan                                                                                                        │
│     await receive()                                                                                                                                                                                 │
│   File "/usr/local/lib/python3.12/site-packages/uvicorn/lifespan/on.py", line 137, in receive                                                                                                       │
│     return await self.receive_queue.get()                                                                                                                                                           │
│            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                           │
│   File "/usr/local/lib/python3.12/asyncio/queues.py", line 158, in get                                                                                                                              │
│     await getter                                                                                                                                                                                    │
│ asyncio.exceptions.CancelledError                                                                                                                                                                   │
│                                                                                                                                                                                                     │
│ Traceback (most recent call last):                                                                                                                                                                  │
│   File "/usr/local/bin/hyperpodrun", line 6, in <module>                                                                                                                                            │
│     sys.exit(main())                                                                                                                                                                                │
│              ^^^^^^                                                                                                                                                                                 │
│   File "/usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/run.py", line 300, in main                                                                                                   │
│     run(args)                                                                                                                                                                                       │
│   File "/usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/run.py", line 131, in run                                                                                                    │
│     result = agent.run()                                                                                                                                                                            │
│              ^^^^^^^^^^^                                                                                                                                                                            │
│   File "/usr/local/lib/python3.12/site-packages/torch/distributed/elastic/metrics/api.py", line 138, in wrapper                                                                                     │
│     result = f(*args, **kwargs)                                                                                                                                                                     │
│              ^^^^^^^^^^^^^^^^^^                                                                                                                                                                     │
│   File "/usr/local/lib/python3.12/site-packages/torch/distributed/elastic/agent/server/api.py", line 715, in run                                                                                    │
│     result = self._invoke_run(role)                                                                                                                                                                 │
│              ^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                 │
│   File "/usr/local/lib/python3.12/site-packages/hyperpod_elastic_agent/elastic_agent/plr_hyperpod_elastic_agent.py", line 112, in _invoke_run                                                       │
│     time.sleep(monitor_interval)                                                                                                                                                                    │
│   File "/usr/local/lib/python3.12/site-packages/torch/distributed/elastic/multiprocessing/api.py", line 84, in _terminate_process_handler                                                           │
│     raise SignalException(f"Process {os.getpid()} got signal: {sigval}", sigval=sigval)                                                                                                             │
│ torch.distributed.elastic.multiprocessing.api.SignalException: Process 1 got signal: 15                                                                                                             │
│ stream closed EOF for mlp/tp-smoke-pods-0 (pytorch)                                                                                                                                                 │
│

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions