Initial release.

This commit is contained in:
Bartosz Wróblewski
2024-06-25 07:15:00 -07:00
commit 2129478bbd
32 changed files with 5662 additions and 0 deletions

9
README.md Normal file
View File

@@ -0,0 +1,9 @@
# Training Infrastructure Scripts
This repository contains various scripts developed at Imbue for managing a large cluster of H100s, detecting and fixing hardware issues, and generally ensuring smooth model training. You can read more about our process [here](https://imbue.com/research/70b-infrastructure/)
The code is organized as follows:
- `gpu_stress_test` tests that the GPUs on each machine are able to allocate large tensors and perform standard operations/
- `health_checks` contains various checks we use to determine which hosts are healthy.
- `host_validation` contains tests to check that the GPUs on a given machine are able to communicate with each other (via NVLink) and with GPUs on other machines (via InfiniBand).
- `ufm_events` contains a script which parses the UFM event log, checks for relevant events, and determines which network ports should be disabled.

View File

@@ -0,0 +1,69 @@
import math
import socket
import sys
import time
import torch
GPU_MEMORY_IN_GB = 40
MAX_RUNTIME = 5 * 60 # Run for 5 minutes
def run_load() -> str:
if not torch.cuda.is_available():
return "CUDA is not available"
# Get the array size for a square array that fills 1/4 of memory with 2 byte values
arr_size = (((GPU_MEMORY_IN_GB / 4) * 10**9) / 2) ** (1 / 2)
arr_size = int(math.ceil(arr_size))
num_gpus = torch.cuda.device_count()
if num_gpus != 8:
return f"Found wrong number of GPUS: {num_gpus}"
Ts = [
torch.ones(arr_size, arr_size, dtype=torch.bfloat16, device=f"cuda:{gpu_num}") for gpu_num in range(num_gpus)
]
results = [
torch.zeros(arr_size, arr_size, dtype=torch.bfloat16, device=f"cuda:{gpu_num}") for gpu_num in range(num_gpus)
]
from_others = [
torch.zeros(arr_size, arr_size, dtype=torch.bfloat16, device=f"cuda:{gpu_num}") for gpu_num in range(num_gpus)
]
torch.manual_seed(12345)
start_time = time.time()
curr_loop_num = 0
while time.time() - start_time < MAX_RUNTIME:
# Matrix multiply into result
[torch.matmul(T, T, out=result) for T, result in zip(Ts, results)]
# Move into gpu curr_loop_num away
for i in range(num_gpus):
other_gpu = (curr_loop_num % (num_gpus - 1) + i + 1) % num_gpus
other = from_others[other_gpu]
original = results[i]
other[:] = original
# Check values are correct
checks = [(other == result).sum() == result.numel() for other, result in zip(from_others, results)]
if not all([check.item() for check in checks]):
return "Issue with GPUS, values don't match"
curr_loop_num += 1
if curr_loop_num < num_gpus:
return f"Few loops seen, only {curr_loop_num}"
return f"All okay for {curr_loop_num} loops"
if __name__ == "__main__":
if len(sys.argv) > 1:
MAX_RUNTIME = int(sys.argv[1])
hostname = socket.gethostname()
try:
print(f"{hostname}: {run_load()}")
except torch.cuda.OutOfMemoryError as e:
print(f"{hostname}: out of memory {e}")
except Exception as e:
print(f"{hostname}: {e}")

81
health_checks/config.json Normal file
View File

@@ -0,0 +1,81 @@
{
"node_info": {
"nodes": ["hostname-23r66x3", "hostname-32ls6x3", "hostname-3lvy7y3", "hostname-f7n4wx3"],
"port": "22",
"user": "host"
},
"leaf_health_checks": [],
"ubuntu": {
"distributor id": "ubuntu",
"description": "ubuntu 22.04.3 lts",
"release": "22.04",
"codename": "jammy"
},
"ib_hcas": {
"mlx5_0": "ibp26s0",
"mlx5_3": "ibp60s0",
"mlx5_4": "ibp77s0",
"mlx5_5": "ibp94s0",
"mlx5_6": "ibp156s0",
"mlx5_9": "ibp188s0",
"mlx5_10": "ibp204s0",
"mlx5_11": "ibp220s0"
},
"infiniband_error": {
"driver_versions": {
"Host Driver Version": "MLNX_OFED_LINUX-23.10-1.1.9.0 (OFED-23.10-1.1.9): 6.5.0-1007-nvidia",
"Firmware on CA": "v28.39.1002",
"total_passes": 9
},
"hcas": {
"Port State": "UP 4X NDR (InfiniBand)",
"Node GUID": "VALID_UUID",
"total_passes": 19
},
"error_counters": {
"total_passes": 8
}
},
"vbios": {
"CUDA Version": "12.2",
"Driver Version": "535.129.03",
"GSP Firmware Version": "535.129.03",
"Image Version": "G520.0200.00.05",
"Inforom Version": "",
"VBIOS Version": "96.00.99.00.01"
},
"docker": {
"expected_error_info": {
"Driver": "zfs"
},
"expected_warning_info" : {
"CgroupVersion": "2",
"KernelVersion": "6.5.0-1007-nvidia",
"OSVersion": "22.04",
"ServerVersion": "25.0.3",
"ClientInfo": {
"Version": "25.0.3",
"GoVersion": "go1.21.6"
}
}
},
"flint": {
"expected_fields": {
"Image type": ["FS4"],
"FW Version": ["28.39.1002"],
"Product Version": ["28.39.1002"],
"FW Factory Version": ["00.00.0000"],
"Rom Info": ["type=UEFI version=14.32.12 cpu=AMD64,AARCH64", "type=PXE version=3.7.201 cpu=AMD64"]
},
"expected_FW_possibilities": {
"Information block is": ["FW image A is present", "FW image B is present"],
"FW A Version": ["47.171.0001", "00.00.0000"],
"FW B Version": ["00.00.0000", "47.171.0001"]
}
},
"infiniband_status": {
"network_type": "NDR",
"active_devices": "8",
"device_names": ["mlx5_0","mlx5_3","mlx5_4","mlx5_5","mlx5_6","mlx5_9","mlx5_10","mlx5_11"]
}
}

View File

@@ -0,0 +1,649 @@
WHITELISTED_REGEX_STR = [
r"\d+",
r"Key type .* registered",
r"NODE_DATA\(\d+\) allocated \[mem HEX-HEX\]",
r"SRAT: PXM \d+ \-> APIC HEX \-> Node \d+",
r"ata\d+: SATA link down \(SStatus \d+ SControl \d+\)",
r"ata\d+: SATA max UDMA/\d+ abar m\d+@HEX port HEX irq",
r"ata\d+: DUMMY",
r"gran_size: \d+.*chunk_size: \d+.*num_reg: \d+.*lose cover RAM: \d+.*",
r"hub [\d\-\.]+:[\d\-\.]+: USB hub found",
r"hub [\d\-\.]+:[\d\-\.]+: \d+ ports detected",
r"mlx5_core ADDR .*: renamed from .*",
r"mlx5_core ADDR: [\d\.]+ Gb/s available PCIe bandwidth .* x16 link\)",
r"mpt3sas_cm[0-7]: .*",
r"pci ADDR: .*",
r"systemd\[1\]: (Starting|Started|Reached|Queued|Listening on| Mounted|Created|Finished|Mounting|Mounted) .*",
r"usb \d-\d+(\.\d:)?.*",
r"systemd\[1\]: modprobe@.*\.service: Deactivated successfully\.",
r"usb usb\d: .*",
r"x86/fpu: xstate_offset\[\d+\]:\s*\d+, xstate_sizes\[\d+\]:\s*\d+",
r"spi-nor spi0.1: .* \(4096 Kbytes\)",
r"acpi/hmat: Locality: Flags:00 Type:(Read|Write) (Bandwidth|Latency) Initiator Domains:8 Target Domains:8 Base:100",
r"mpt3sas[0-7]-msix0: PCI-MSI-X enabled: irq",
r"node [0-7]: \[mem HEX-HEX\]",
r"pci_bus .*: (root bus )?resource (\d+ )?\[(mem|io)\s*HEX-HEX( window| 64bit pref)?\]",
r"pci_bus .*: No. \d+ try to assign unassigned res",
r"pci_bus .*: on NUMA node \d+",
r"pci_bus .*: root bus resource \[bus [0-9a-f]+(-[0-9a-f]+)?\]",
r"pci_bus .*: max bus depth: \d+ pci_try_num: \d+",
r"pcieport .*: PME: Signaling with irq",
r"pcieport ADDR: pciehp: Slot #\d+ AttnBtn- PwrCtrl- MRL- AttnInd- PwrInd- HotPlug\+ Surprise- Interlock- NoCompl\+ IbPresDis- LLActRep\+",
r"i2c i2c-\d: 16/32 memory slots populated \(from DMI\)",
r"i2c i2c-\d: Systems with more than 4 memory slots not supported yet, not instantiating SPD",
r"perf: interrupt took too long \(.*\), lowering kernel\.perf_event_max_sample_rate to \d+",
r"(eth0|veth): renamed from (veth|eth0)",
r"veth: entered (promiscuous|allmulticast) mode",
r"veth \(unregistering\): left (promiscuous|allmulticast) mode",
r"(br|docker0): port \d+\(veth\) entered (disabled|blocking|forwarding) state",
r"systemd\[\d+\]: memfd_create\(\) called without MFD_EXEC or MFD_NOEXEC_SEAL set",
r"hugetlbfs: clusterkit \(\d+\): Using mlock ulimits for SHM_HUGETLB is obsolete",
r"INFO: NMI handler \(.*\) took too long to run:",
r"nvidia-nvswitch[0-7]: SXid \(PCI:ADDR\): \d+, (Non-fatal|Severity 0|Data).*",
r"mlx5_core ADDR enp.*: Link up",
r"mlx5_core ADDR enp.*: Link down",
r"mlx5_core ADDR enp.*: renamed from eth\d",
r"MFT device name created: id: \d+, slot id: \d+, device name: .*",
r"hrtimer: interrupt took \d+ ns",
r"workqueue: .* hogged CPU for .* consider switching to WQ_UNBOUND",
r"ptrace attach of .* was attempted by .*",
r"File /var/log/journal/.*.journal corrupted or uncleanly shut down, renaming and replacing.",
r"usb-storage [\d\.:-]+ USB Mass Storage device detected",
r"scsi HOST_NUM: usb-storage [\d\.:-]+",
r".*Capabilities=\(.*", # This line splits across lines weirdly sometimes,
r".*,Task Set Full,NCQ\).*", # This is also a part of the above
r"systemd\[1\]: systemd 249.11-0ubuntu3.* running in system mode \(\+PAM \+AUDIT \+SELINUX \+APPARMOR \+IMA \+SMACK \+SECCOMP \+GCRYPT \+GNUTLS \+OPENSSL \+ACL \+BLKID \+CURL \+ELFUTILS \+FIDO2 \+IDN2 -IDN \+IPTC \+KMOD \+LIBCRYPTSETUP \+LIBFDISK \+PCRE2 -PWQUALITY -P11KIT -QRENCODE \+BZIP2 \+LZ4 \+XZ \+ZLIB \+ZSTD -XKBCOMMON \+UTMP \+SYSVINIT default-hierarchy=unified\)",
r"input: DELL DRAC 5 Virtual Keyboard and Mouse as /devices/pci0000:00/ADDR/usb1/1-10/\d*-\d*\.\d*/\d*-\d*\.\d*:\d*\.\d*/\w*:\w*:\w*\.\w*/input/input.",
r"hid-generic \w*:\w*:\w*\.\w*: input,hidraw.: USB HID v1\.01 (Mouse|Keyboard) \[DELL DRAC 5 Virtual Keyboard and Mouse\] on usb-ADDR-\w*\.\w*/input.",
r"systemd-journald.*: File /var/log/journal/.*/system.journal corrupted or uncleanly shut down, renaming and replacing.",
r"systemd-journald.*: File /var/log/journal/.*/user-1000.journal corrupted or uncleanly shut down, renaming and replacing.",
r"systemd-journald.*: Failed to read journal file /var/log/journal/.*/user-1000.journal for rotation, trying to move it out of the way: Device or resource busy",
r"sdID\[sda\] .* 512-byte logical blocks:",
r"python\[.*\]: segfault at",
r"python3\[.*\]: segfault at",
r"pt_main_thread\[.*\]: segfault at",
r"in python3.11\[.*\]",
r"likely on CPU [0-9]",
r"Code: [0-9a-f ]*",
r"to colour dummy device",
r"nvidia-modeset: Loading NVIDIA Kernel Mode Setting Driver for UNIX platforms",
r"NVRM: loading NVIDIA UNIX x86_64 Kernel Module",
r"Pid [0-9]*\(.*\) over core_pipe_limit",
r"kauditd_printk_skb: \d+ callbacks suppressed",
]
WHITELISTED_MESSAGES = [
"... bit width: 48",
"... event mask: 0001000f000000ff",
"... fixed-purpose events: 4",
"... generic registers: 8",
"... max period: 00007fffffffffff",
"... value mask: 0000ffffffffffff",
"... version: 5",
".... node #",
"..TIMER: vector=HEX apic1=0 pin1=2 apic2=-1 pin2=-1",
"/init",
"/usr/lib/systemd/system-generators/systemd-fstab-generator failed with exit status 1.",
"00:04: ttyS1 at I/O HEX (irq = 3, base_baud = 115200) is a 16550A",
"00:05: ttyS0 at I/O HEX (irq = 4, base_baud = 115200) is a 16550A",
"63 BIT PCI BUS DMA ADDRESSING SUPPORTED, total mem",
"ACPI Error: Aborting method \\_SB.PMI0._GHL due to previous error (AE_NOT_EXIST) (20230331/psparse-529)",
"ACPI Error: Aborting method \\_SB.PMI0._PMC due to previous error (AE_NOT_EXIST) (20230331/psparse-529)",
"ACPI Error: No handler for Region [SYSI]",
"ACPI Error: Region IPMI (ID=7) has no handler (20230331/exfldio-261)",
"ACPI FADT declares the system doesn't support PCIe ASPM, so disable it",
"ACPI:",
"AES CTR mode by8 optimization enabled",
"AMD AuthenticAMD",
"APIC: Switch to symmetric I/O mode setup",
"AVX2 version of gcm_enc/dec engaged.",
"Adding NUM_KB swap on /swap.img.",
"AppArmor:",
"Asymmetric key parser 'x509' registered",
"BIOS-e820:",
"BIOS-provided physical RAM map:",
"BOOT_IMAGE=/boot/vmlinuz-6.5.0-1007-nvidia",
"Backport based on mlnx_ofed/mlnx-ofa_kernel-4.0.git a675be0",
"Block layer SCSI generic (bsg) driver version 0.4 loaded (major 243)",
"Booting paravirtualized kernel on bare hardware",
"Bridge firewalling registered",
"Btrfs loaded, zoned=yes, fsverity=yes",
"Built 8 zonelists, mobility grouping on. Total pages: 264117399",
"CPU_NUM: Thermal monitoring enabled (TM1)",
"Calibrating delay loop (skipped), value calculated using timer frequency.. 4000.00 BogoMIPS (lpj=8000000)",
"Centaur CentaurHauls",
"Command line: BOOT_IMAGE=/boot/vmlinuz-6.5.0-1007-nvidia root=UUID=UUID ro",
"Compat-mlnx-ofed backport release: a675be0",
"Console:",
'Creating 1 MTD partitions on "ADDR":',
"DMA [mem HEX-HEX]",
"DMA32 [mem HEX-HEX]",
"DMA:",
"DMI: Dell Inc. PowerEdge SYSTEM_TAG, BIOS 1.3.6 09/20/2023",
"DMI: Dell Inc. PowerEdge SYSTEM_TAG, BIOS 1.7.6 01/11/2024",
"DMI: Dell Inc. PowerEdge SYSTEM_TAG, BIOS 1",
"Dentry cache hash table entries: 33554432 (order: 16, 268435456 bytes, vmalloc hugepage)",
"Device empty",
"Disabling lock debugging due to kernel taint",
"Dynamic Preempt: voluntary",
"EDAC MC",
"EDAC MC: Ver: 3.0.0",
"EDAC i10nm: No hbm memory",
"EDAC i10nm: v0.0.6",
"EISA bus registered",
"ENERGY_PERF_BIAS: Set to 'normal', was 'performance'",
"ERST: Error Record Serialization Table (ERST) support is initialized.",
"EXT4-fs (NVMEn1p2): mounted filesystem UUID ro with ordered data mode. Quota mode: none.",
"EXT4-fs (NVMEn1p2): re-mounted UUID r/w. Quota mode: none.",
"Early memory node ranges",
"Estimated ratio of average max frequency by base frequency (times 1024): 1638",
"FS-Cache: Loaded",
"Failed to create unit file /run/systemd/generator/mnt-nfs_clusterkit.mount",
"Fallback order for Node",
"Freeing SMP alternatives memory: 44K",
"Freeing initrd memory: NUM_KB",
"Freeing unused decrypted memory: 2036K",
"Freeing unused kernel image (initmem) memory: 4792K",
"Freeing unused kernel image (rodata/data gap) memory: 1156K",
"GHES: APEI firmware first mode is enabled by APEI bit and WHEA _OSC.",
"Giving out device to module i10nm_edac controller Intel_10nm Socket#0",
"HEST: Table parsing has been initialized.",
'HEX-HEX : "BIOS"',
"HOME=/",
"Hostname set to",
"HugeTLB: 16380 KiB vmemmap can be freed for a 1.00 GiB page",
"HugeTLB: 28 KiB vmemmap can be freed for a 2.00 MiB page",
"HugeTLB: registered 1.00 GiB page size, pre-allocated 0 pages",
"HugeTLB: registered 2.00 MiB page size, pre-allocated 0 pages",
"Hygon HygonGenuine",
"IOAPIC[0]: apic_id 8, version 32, address HEX, GSI 0-119",
"IP idents hash table entries: 262144 (order: 9, 2097152 bytes, vmalloc)",
"IPI shorthand broadcast: enabled",
"IPMI message handler: version 39.2",
"In-situ OAM (IOAM) with IPv6",
"Initialise system trusted keyrings",
"Initializing XFRM netlink socket",
"Initmem setup",
"Inode-cache hash table entries: 16777216 (order: 15, 134217728 bytes, vmalloc hugepage)",
"Intel GenuineIntel",
"KERNEL supported cpus:",
"Kernel command line: BOOT_IMAGE=/boot/vmlinuz-6.5.0-1007-nvidia root=UUID=UUID ro",
"LSM: initializing lsm=lockdown,capability,landlock,yama,apparmor,integrity",
"Last level dTLB entries: 4KB 0, 2MB 0, 4MB 0, 1GB 0",
"Last level iTLB entries: 4KB 0, 2MB 0, 4MB 0",
"Linux agpgart interface v0.103",
"Linux version 6.5.0-1007-nvidia (buildd@lcy02-amd64-008) (x86_64-linux-gnu-gcc-11 (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0, GNU ld (GNU Binutils for Ubuntu) 2.38) #7-Ubuntu SMP PREEMPT_DYNAMIC Wed Dec 6TIMEUTC 2023 (Ubuntu 6.5.0-1007.7-nvidia 6.5.3)",
"Loaded X.509 cert",
"Loading compiled-in X.509 certificates",
"Loading compiled-in module X.509 certificates",
"MPTCP token hash table entries: 65536 (order: 8, 1572864 bytes, vmalloc)",
"MTRR map: 6 entries (3 fixed + 3 variable; max 23), built from 10 variable MTRRs",
"Magic number:",
"Memory: NUM_KB/NUM_KB available (NUM_KB kernel code, 4257K rwdata, NUM_KB rodata, 4792K init, NUM_KB bss, NUM_KB reserved, 0K cma-reserved)",
"Monitor-Mwait will be used to enter C-1 state",
"Mount-cache hash table entries: 524288 (order: 10, 4194304 bytes, vmalloc)",
"Mountpoint-cache hash table entries: 524288 (order: 10, 4194304 bytes, vmalloc)",
"Movable zone start for each node",
"NET: Registered PF",
"NFS: Registering the id_resolver key type",
"NMI watchdog: Enabled. Permanently consumes one hw-PMU counter.",
"NR_IRQS: 524544, nr_irqs: 2888, preallocated irqs: 16",
"NUMA: Initialized distance table, cnt=8",
"NUMA: Node 0 [mem HEX-HEX] + [mem HEX-HEX] -> [mem HEX-HEX]",
"NVMEn1: p1 p2",
"NVMEn1: p1 p9",
"NVRM: loading NVIDIA UNIX x86_64 Kernel Module 535.129.03 Thu Oct 19TIMEUTC 2023",
"NX (Execute Disable) protection: active",
"NetLabel: domain hash size = 128",
"NetLabel: protocols = UNLABELED CIPSOv4 CALIPSO",
"NetLabel: unlabeled traffic allowed by default",
"NetLabel: Initializing",
"No Arguments are initialized for method [_GHL]",
"No Local Variables are initialized for Method [_GHL]",
"Normal [mem HEX-HEX]",
"On node 0, zone DMA32: 10240 pages in unavailable ranges",
"On node 0, zone DMA32: 46593 pages in unavailable ranges",
"On node 0, zone DMA: 1 pages in unavailable ranges",
"On node 0, zone DMA: 96 pages in unavailable ranges",
"On node 0, zone Normal: 2048 pages in unavailable ranges",
"PCI host bridge to bus",
"PCI-DMA: Using software bounce buffering for IO (SWIOTLB)",
"PCI: CLS 0 bytes, default 64",
"PCI: Dell System detected, enabling pci=bfsort.",
"PCI: Ignoring E820 reservations for host bridge windows",
"PCI: MMCONFIG at [mem HEX-HEX] reserved as EfiMemoryMappedIO",
"PCI: MMCONFIG for domain 0000 [bus 00-ff] at [mem HEX-HEX] (base HEX)",
"PCI: Using ACPI for IRQ routing",
"PCI: Using configuration type 1 for base access",
'PCI: Using host bridge windows from ACPI; if necessary, use "pci=nocrs" and report a bug',
"PCI: not using MMCONFIG",
"PCI: pci_cache_line_size set to 64 bytes",
"PM: RTC time:TIME date: DATE",
"PM: hibernation: Registered nosave memory: [mem",
"PPP generic driver version 2.4.2",
"PTP clock support registered",
"Performance Events: XSAVE Architectural LBR, PEBS fmt4+-baseline, AnyThread deprecated, Sapphire Rapids events, 32-deep LBR, full-width counters, Intel PMU driver.",
"Policy zone: Normal",
"Process accounting resumed",
"Protocol=(Initiator), Capabilities=(Diag Trace Buffer,Task Set Full,NCQ)",
"RAMDISK: [mem HEX-HEX]",
"RAPL PMU: API unit is 2^-32 Joules, 3 fixed counters, 655360 ms ovfl timer",
"RAPL PMU: hw unit of domain dram 2^-14 Joules",
"RAPL PMU: hw unit of domain package 2^-14 Joules",
"RAPL PMU: hw unit of domain psys 2^-0 Joules",
"RAS: Correctable Errors collector initialized.",
"RCU Tasks Rude: Setting shift to 7 and lim to 1 rcu_task_cb_adjust=1.",
"RCU Tasks Trace: Setting shift to 7 and lim to 1 rcu_task_cb_adjust=1.",
"RCU Tasks: Setting shift to 7 and lim to 1 rcu_task_cb_adjust=1.",
"RPC: Registered named UNIX socket transport module.",
"RPC: Registered tcp NFSv4.1 backchannel transport module.",
"RPC: Registered tcp transport module.",
"RPC: Registered tcp-with-tls transport module.",
"RPC: Registered udp transport module.",
"Received client request to flush runtime journal.",
"Rude variant of Tasks RCU enabled.",
"Run /init as init process",
"SCSI subsystem initialized",
"SLUB: HWalign=64, Order=0-3, MinObjects=0, CPUs=104, Nodes=8",
"SMBIOS 3.3.0 present.",
"Segment Routing with IPv6",
"Serial: 8250/16550 driver, 32 ports, IRQ sharing enabled",
"Shutdown timeout set to",
"Spectre V1 : Mitigation: usercopy/swapgs barriers and __user pointer sanitization",
"Spectre V2 : Mitigation: Enhanced / Automatic IBRS",
"Spectre V2 : Spectre v2 / PBRSB-eIBRS: Retire a single CALL on VMEXIT",
"Spectre V2 : Spectre v2 / SpectreRSB mitigation: Filling RSB on context switch",
"Spectre V2 : mitigation: Enabling conditional Indirect Branch Prediction Barrier",
"Speculative Store Bypass: Mitigation: Speculative Store Bypass disabled via prctl",
"Switched APIC routing to physical flat.",
"TCP bind hash table entries: 65536 (order: 9, 2097152 bytes, vmalloc)",
"TCP established hash table entries: 524288 (order: 10, 4194304 bytes, vmalloc)",
"TCP: Hash tables configured (established 524288 bind 65536)",
"TERM=linux",
"TSC deadline timer available",
"Table-perturb hash table entries: 65536 (order: 6, 262144 bytes, vmalloc)",
"Tracing variant of Tasks RCU enabled.",
"Trampoline variant of Tasks RCU enabled.",
"Trying to unpack rootfs image as initramfs...",
"UDP hash table entries: 65536 (order: 9, 2097152 bytes, vmalloc)",
"UDP-Lite hash table entries: 65536 (order: 9, 2097152 bytes, vmalloc)",
'Unknown kernel command line parameters "BOOT_IMAGE=/boot/vmlinuz-6.5.0-1007-nvidia", will be passed to user space.',
"Using GB pages for direct mapping",
"VFIO - User Level meta-driver version: 0.3",
"VFS: Disk quotas dquot_6.6.0",
"VFS: Dquot-cache hash table entries: 512 (order 0, 4096 bytes)",
"Write protecting the kernel read-only data: NUM_KB",
"Yama: becoming mindful.",
"ZFS: Loaded module v2.2.0-0ubuntu1~23.10, ZFS pool version 5000, ZFS filesystem version 5",
"Zone ranges:",
"[Firmware Info]: PCI: MMCONFIG at [mem HEX-HEX] not reserved in ACPI motherboard resources",
"[drm] Initialized mgag200 1.0.0 20110418 for ADDR on minor 0",
"[drm] Initialized nvidia-drm 0.0.0 20160202 for ADDR on minor",
"[drm] [nvidia-drm] [GPU ID HEX] Loading driver",
"[mem HEX-HEX] available for PCI devices",
"acpi PNP0A03",
"acpi PNP0A08",
"acpi/hmat: Initiator-Target",
"acpi/hmat: Memory Flags:0001 Processor Domain:",
"acpiphp: ACPI Hot Plug PCI Controller Driver version: 0.5",
"ahci ADDR: AHCI 0001.0301 32 slots 4 ports 6 Gbps HEX impl SATA mode",
"ahci ADDR: flags: 64bit ncq sntf pm clo only pio slum part ems deso sadm sds",
"ahci ADDR: version 3.0",
"alua: device handler registered",
"async_tx: api initialized (async)",
"audit: initializing netlink subsys (disabled)",
'audit: type=1400 TIME: apparmor="STATUS" ',
"audit: type=2000 TIME: state=initialized audit_enabled=0 res=1",
'audit: type=1400 TIME: apparmor="DENIED"',
"blacklist: Loading compiled-in revocation X.509 certificates",
"blacklist: Revoked X.509 cert 'Debian Secure Boot Signer: 00a7468def'",
"block NVMEn1: the capability attribute has been deprecated.",
"bridge: filtering via arp/ip/ip6tables is no longer available by default. Update your scripts to load br_netfilter if you need this.",
"clk: Disabling unused clocks",
"clocksource:",
"compat.git: mlnx_ofed/mlnx-ofa_kernel-4.0.git",
"cpuidle: using governor ladder",
"cpuidle: using governor menu",
"cryptd: max_cpu_qlen set to 1000",
"dcdbas dcdbas: Dell Systems Management Base Driver (version 5.6.0-3.4)",
"device-mapper: core: CONFIG_IMA_DISABLE_HTABLE is disabled. Duplicate IMA measurements will not be recorded in the IMA log.",
"device-mapper: ioctl: 4.48.0-ioctl (DATE) initialised: dm-devel@redhat.com",
"device-mapper: uevent: version 1.0.3",
"devtmpfs: initialized",
"drop_monitor: Initializing network drop monitor service",
"e820: remove [mem HEX-HEX] reserved",
"e820: remove [mem HEX-HEX] usable",
"e820: reserve RAM buffer [mem HEX-HEX]",
"e820: update [mem HEX-HEX] usable ==> reserved",
"e820: update [mem HEX-HEX] usable ==> usable",
"efi: ACPI=HEX ACPI 2.0=HEX MEMATTR=HEX SMBIOS=HEX SMBIOS 3.0=HEX MOKvar=HEX RNG=HEX",
"efi: EFI v2.7 by Dell Inc.",
"efi: Not removing MEM_ADDR: MMIO range=[HEX-HEX] (4KB) from e820 map",
"efi: Remove MEM_ADDR: MMIO range=[HEX-HEX] (256MB) from e820 map",
"efifb: No BGRT, not showing boot graphics",
"efifb: Truecolor: size=8:8:8:8, shift=24:16:8:0",
"efifb: framebuffer at HEX, using 3072k, total 3072k",
"efifb: mode is 1024x768x32, linelength=4096, pages=1",
"efifb: probing for efifb",
"efifb: scrolling: redraw",
"efivars: Registered efivars operations",
"emc: device handler registered",
"end_DEVICE add: handle(HEX), sas_addr(HEX)",
"evict_inodes",
"evm: HMAC attrs: HEX",
"evm: Initialising EVM extended attributes:",
"evm: security.",
"extended physical RAM map:",
"fb0: EFI VGA frame buffer device",
"fbcon: Taking over console",
"fbcon: mgag200drmfb (fb0) is primary device",
"ftrace: allocated 216 pages with 4 groups",
"ftrace: allocating 55296 entries in 216 pages",
"fuse: init (API version 7.38)",
"futex hash table entries: 32768 (order: 9, 2097152 bytes, vmalloc)",
"hash matches",
"hpet0: 8 comparators, 64-bit 25.000000 MHz counter",
"hpet0: at MMIO HEX, IRQs 2, 8, 0, 0, 0, 0, 0, 0",
"i2c i2c-1: 16/32 memory slots populated (from DMI)",
"i2c i2c-1: Systems with more than 4 memory slots not supported yet, not instantiating SPD",
"i2c_dev: i2c /dev entries driver",
"i801_smbus ADDR: SMBus using PCI interrupt",
"i801_smbus ADDR: SPD Write Disable is set",
"i8042: PNP: No PS/2 controller found.",
"idxd ADDR: Intel(R) Accelerator Device (v100)",
"idxd ADDR: Unable to turn on user SVA feature.",
"ima: Allocated hash algorithm: sha1",
"ima: No TPM chip found, activating TPM-bypass!",
"ima: No architecture policies found",
"input: Power Button as /devices/LNXSYSTM:00/LNXPWRBN:00/input/input1",
"input: Power Button as /devices/LNXSYSTM:00/LNXSYBUS:00/PNP0C0C:00/input/input0",
"integrity: Loaded X.509 cert",
"integrity: Loading X.509 certificate",
"integrity: Machine keyring initialized",
"integrity: Platform Keyring initialized",
"integrity: Revoking X.509 certificate: UEFI:dbx",
"intel_pstate: Intel P-state driver initializing",
"intel_rapl_common: Found RAPL domain",
"io scheduler mq-deadline registered",
"iommu: DMA domain TLB invalidation policy: lazy mode",
"iommu: Default domain type: Translated",
"ipmi device interface",
"ipmi_platform: ipmi_si: SMBIOS: io HEX regsize 1 spacing 4 irq",
"ipmi_si IPI0001:00: IPMI kcs interface initialized",
"ipmi_si IPI0001:00: IPMI message handler: Found new BMC (man_id: HEX, prod_id: HEX, dev_id: HEX)",
"ipmi_si IPI0001:00: The BMC does not support setting the recv irq bit, compensating, but the BMC needs to be fixed.",
"ipmi_si IPI0001:00: Using irq",
"ipmi_si IPI0001:00: ipmi_platform: [io HEX] regsize 1 spacing 4 irq",
"ipmi_si IPI0001:00: ipmi_platform: probing via ACPI",
"ipmi_si dmi-ipmi-si.0: Removing SMBIOS-specified kcs state machine in favor of ACPI",
"ipmi_si dmi-ipmi-si.0: ipmi_platform: probing via SMBIOS",
"ipmi_si: Adding ACPI-specified kcs state machine",
"ipmi_si: Adding SMBIOS-specified kcs state machine",
"ipmi_si: IPMI System Interface driver",
"ipmi_si: Trying ACPI-specified kcs state machine at i/o address HEX, slave address HEX, irq",
"ipmi_ssif: IPMI SSIF Interface driver",
"is added to hba_port list",
"knem 1.1.4.90mlnx3: initialized",
"kprobes: kprobe jump-optimization is enabled. All kprobes are optimized if possible.",
"landlock: Up and running.",
"last_pfn = HEX max_arch_pfn = HEX",
"ledtrig-cpu: registered to indicate activity on CPUs",
"libata version 3.00 loaded.",
"loop: module loaded",
"mctp: management component transport protocol core",
"mei_me ADDR: Device doesn't have valid ME Interface",
"mem auto-init: stack:off, heap alloc:on, heap free:off",
"mempolicy: Enabling automatic NUMA balancing. Configure with numa_balancing= or the kernel.numa_balancing sysctl",
"mf:",
"mgag200 ADDR: [drm] fb0: mgag200drmfb frame buffer device",
"mgag200 ADDR: vgaarb: deactivate vga console",
"microcode: Microcode Update Driver: v2.2.",
"microcode: updated early: HEX -> HEX, date = DATE",
"mlx5_core : mlx5_pcie_event:299:(PID): Detected insufficient power on the PCIe slot (27W).",
"mlx5_core ADDR: 252.048 Gb/s available PCIe bandwidth (16.0 GT/s PCIe x16 link)",
"mlx5_core ADDR: E-Switch: Total vports 2, per vport: max uc(128) max mc(2048)",
"mlx5_core ADDR: MLX5E: StrdRq(1) RqSz(8) StrdSz(2048) RxCqeCmprss(0 enhanced)",
"mlx5_core ADDR: Port module event: module 0, Cable plugged",
"mlx5_core ADDR: Port module event: module 1, Cable unplugged",
"mlx5_core ADDR: Rate limit: 127 rates are supported, range: 0Mbps to 97656Mbps",
"mlx5_core ADDR: firmware version: 22.36.1010",
"mlx5_core ADDR: firmware version: 28.39.1002",
"mlx5_core ADDR: mlx5_pcie_event:299:(PID): Detected insufficient power on the PCIe slot (27W).",
"mlx5_core ADDR: mlx5_pcie_event:299:(PID): Detected insufficient power on the PCIe slot ADDR(27W).",
"mlx_compat: loading out-of-tree module taints kernel.",
"mlx_compat: module verification failed: signature and/or required key missing - tainting kernel",
"mousedev: PS/2 mouse device common for all mice",
"mpt3sas version 43.100.00.00 loaded",
'mtd: partition "BIOS" extends beyond the end of device "ADDR" -- size truncated to HEX',
"mtrr_cleanup: can not find optimal value",
"no of cores: 104, max_msix_vectors: -1",
"nvidia-modeset: Loading NVIDIA Kernel Mode Setting Driver for UNIX platforms 535.129.03 Thu Oct 19TIMEUTC 2023",
"nvidia-nvlink: Nvlink Core is being initialized, major device number 237",
"nvidia-nvlink: nvlink driver close",
"nvidia-nvlink: nvlink driver open",
"nvidia-nvswitch0: open (major=236)",
"nvidia-nvswitch0: using MSI",
"nvidia-nvswitch1: open (major=236)",
"nvidia-nvswitch1: using MSI",
"nvidia-nvswitch2: open (major=236)",
"nvidia-nvswitch2: using MSI",
"nvidia-nvswitch3: open (major=236)",
"nvidia-nvswitch3: using MSI",
"nvidia-nvswitch: Probing device ADDR, Vendor Id = HEX, Device Id = HEX, Class = HEX",
"nvidia-uvm: Loaded the UVM driver, major device number 235.",
"nvidia: module license 'NVIDIA' taints kernel.",
"nvidia: module license taints kernel.",
"nvidia_uvm: module uses symbols nvUvmInterfaceDisableAccessCntr from proprietary module nvidia, inheriting taint.",
"nvme NVME: 104/0/0 default/read/poll queues",
"nvme NVME: 3/0/0 default/read/poll queues",
"nvme NVME: pci function ADDR",
"pci_bus 0000:04: extended config space not accessible",
"pci_bus 0000:80: Some PCI device resources are unassigned, try booting with pci=realloc",
"pcpu-alloc:",
"percpu: Embedded 63 pages/cpu s221184 r8192 d28672 u262144",
"pid_max: default: 106496 minimum: 832",
"pinctrl core: initialized pinctrl subsystem",
"platform eisa.0: Cannot allocate resource for EISA slot",
"platform eisa.0: EISA: Cannot allocate resource for mainboard",
"platform eisa.0: EISA: Detected 0 cards",
"platform eisa.0: Probing EISA bus 0",
"please specify mtrr_gran_size/mtrr_chunk_size",
"pnp 00:01: disabling [mem HEX-HEX disabled] because it overlaps ADDR BAR 8 [mem HEX-HEX 64bit pref]",
"pnp 00:01: disabling [mem HEX-HEX] because it overlaps ADDR BAR 8 [mem HEX-HEX 64bit pref]",
"pnp 00:02: disabling [mem HEX-HEX disabled] because it overlaps ADDR BAR 8 [mem HEX-HEX 64bit pref]",
"pnp 00:02: disabling [mem HEX-HEX] because it overlaps ADDR BAR 8 [mem HEX-HEX 64bit pref]",
"pnp: PnP ACPI init",
"pnp: PnP ACPI: found 6 devices",
"pps_core: LinuxPPS API ver. 1 registered",
"pps_core: Software ver. 5.3.6 - Copyright 2005-2007 Rodolfo Giometti <giometti@linux.it>",
"printk: console [tty0] enabled",
"printk: early log buf free:",
"printk: log_buf_len",
"process: using mwait in idle threads",
"pstore: Registered erst as persistent store backend",
"pstore: Using crash dump compression: deflate",
"pstore: backend 'erst' already in use: ignoring 'efi_pstore'",
"raid6: .... xor() MB_S, rmw enabled",
"raid6: avx2x1 gen() MB_S",
"raid6: avx2x2 gen() MB_S",
"raid6: avx2x4 gen() MB_S",
"raid6: avx512x1 gen() MB_S",
"raid6: avx512x2 gen() MB_S",
"raid6: avx512x4 gen() MB_S",
"raid6: using algorithm avx512x2 gen() MB_S",
"raid6: using avx512x2 recovery algorithm",
"random: crng init done",
"rcu: \tMax phase no-delay instances is 1000.",
"rcu: \tRCU restricting CPUs from NR_CPUS=8192 to nr_cpu_ids=104.",
"rcu: Max phase no-delay instances is 1000.",
"rcu: RCU restricting CPUs from NR_CPUS=8192 to nr_cpu_ids=104.",
"rcu: Adjusting geometry for rcu_fanout_leaf=16, nr_cpu_ids=104",
"rcu: Hierarchical SRCU implementation.",
"rcu: Preemptible hierarchical RCU implementation.",
"rcu: RCU calculated value of scheduler-enlistment delay is 25 jiffies.",
"rcu: srcu_init: Setting srcu_struct sizes based on contention.",
"rdac: device handler registered",
"registered taskstats version 1",
"release child resource [mem HEX-HEX 64bit pref]",
"release child resource [mem HEX-HEX 64bit]",
"resctrl: L2 allocation detected",
"resctrl: L3 allocation detected",
"resctrl: L3 monitoring detected",
"resctrl: MB allocation detected",
"reserve setup_data: [mem HEX-HEX] ACPI NVS",
"reserve setup_data: [mem HEX-HEX] ACPI data",
"reserve setup_data: [mem HEX-HEX] reserved",
"reserve setup_data: [mem HEX-HEX] usable",
"rtc_cmos 00:00: RTC can wake from S4",
"rtc_cmos 00:00: alarms up to one month, y3k, 114 bytes nvram",
"rtc_cmos 00:00: registered as rtc0",
"sched_clock: Marking stable",
"scsi HOST_NUM: Fusion MPT SAS Host",
"scsi HOST_NUM: ahci",
"scsiIDAttached scsi generic sg type 13",
"scsiIDEnclosure Dell Fryer U.2 (SW1) 00 PQ: 0 ANSI: 5",
"scsiIDEnclosure Dell Fryer U.2 (SW1) 00 PQ: 0 ANSI: 5",
"scsiIDEnclosure Dell Fryer U.2 (SW2) 00 PQ: 0 ANSI: 5",
"scsiIDEnclosure Dell Fryer U.2 (SW3) 00 PQ: 0 ANSI: 5",
"scsiIDEnclosure Dell Fryer U.2 (SW4) 00 PQ: 0 ANSI: 5",
"scsiIDPower-on or device reset occurred",
"scsiIDSES: handle(HEX), sas_addr(HEX), phy(0), device_name(HEX)",
"scsiIDenclosure logical id (HEX), slot(0)",
"scsiIDqdepth(3), tagged(1), scsi_level(6), cmd_que(1)",
"scsiIDset ignore_delay_remove for handle(HEX)",
"sdIDAttached scsi generic sg type",
"secureboot: Secure boot disabled",
"sesIDAttached Enclosure device",
"setting system clock to",
"setup_percpu: NR_CPUS:8192 nr_cpumask_bits:104 nr_cpu_ids:104 nr_node_ids:8",
"shpchp: Standard Hot Plug PCI Controller Driver version: 0.4",
"signal: max sigframe size: 11952",
"smp: Bringing up secondary CPUs ...",
"smp: Brought up 8 nodes, 104 CPUs",
"smpboot: Allowing 104 CPUs, 0 hotplug CPUs",
"smpboot: CPU 52 Converting physical 0 to logical die 1",
"smpboot: CPU_NUM: Intel(R) Xeon(R) Platinum 8470 (family: HEX, model: HEX, stepping: HEX)",
"smpboot: Max logical packages: 2",
"smpboot: Total of 104 processors activated (416000.00 BogoMIPS)",
"smpboot: x86: Booting SMP configuration:",
"software IO TLB: area num 128.",
"software IO TLB: mapped [mem HEX-HEX] (64MB)",
"spi-nor spi0.0: mx25u25635f (32768 Kbytes)",
"spi-nor: probe of spi0.1 failed with error -524",
"squashfs: version 4.0 (2009/01/31) Phillip Lougher",
"switchtec switchtec0: Management device registered.",
"switchtec: loaded.",
"system 00:01: [io HEX-HEX] has been reserved",
"system 00:02: [io HEX-HEX] has been reserved",
"systemd[1]: Activated swap /swap.img.",
"systemd[1]: Activating swap /swap.img...",
"systemd[1]: Condition check resulted in File System Check on Root Device being skipped.",
"systemd[1]: Condition check resulted in First Boot Complete being skipped.",
"systemd[1]: Condition check resulted in Kernel Module supporting RPCSEC_GSS being skipped.",
"systemd[1]: Condition check resulted in LXD - agent being skipped.",
"systemd[1]: Condition check resulted in OpenVSwitch configuration for cleanup being skipped.",
"systemd[1]: Condition check resulted in Platform Persistent Storage Archival being skipped.",
"systemd[1]: Detected architecture x86-64.",
"systemd[1]: Inserted module 'autofs4'",
"systemd[1]: Set up automount Arbitrary Executable File Formats File System Automount Point.",
"systemd[1]: modprobe@configfs.service: Deactivated successfully.",
"systemd[1]: modprobe@drm.service: Deactivated successfully.",
"systemd[1]: modprobe@efi_pstore.service: Deactivated successfully.",
"systemd[1]: modprobe@fuse.service: Deactivated successfully.",
"systemd[1]: systemd 249.11-0ubuntu3.11 running in system mode (+PAM +AUDIT +SELINUX +APPARMOR +IMA +SMACK +SECCOMP +GCRYPT +GNUTLS +OPENSSL +ACL +BLKID +CURL +ELFUTILS +FIDO2 +IDN2 -IDN +IPTC +KMOD +LIBCRYPTSETUP +LIBFDISK +PCRE2 -PWQUALITY -P11KIT -QRENCODE +BZIP2 +LZ4 +XZ +ZLIB +ZSTD -XKBCOMMON +UTMP +SYSVINIT default-hierarchy=unified)",
"systemd[1]: systemd 249.11-0ubuntu3.7 running in system mode (+PAM +AUDIT +SELINUX +APPARMOR +IMA +SMACK +SECCOMP +GCRYPT +GNUTLS +OPENSSL +ACL +BLKID +CURL +ELFUTILS +FIDO2 +IDN2 -IDN +IPTC +KMOD +LIBCRYPTSETUP +LIBFDISK +PCRE2 -PWQUALITY -P11KIT -QRENCODE +BZIP2 +LZ4 +XZ +ZLIB +ZSTD -XKBCOMMON +UTMP +SYSVINIT default-hierarchy=unified)",
"tcp_listen_portaddr_hash hash table entries: 65536 (order: 8, 1048576 bytes, vmalloc)",
"tg3 ADDR eno8303: renamed from eth0",
"tg3 ADDR eno8403: renamed from eth1",
"tg3 ADDR eth0: RXcsums[1] LinkChgREG[0] MIirq[0] ASF[1] TSOcap[1]",
"tg3 ADDR eth0: Tigon3 [partno(BCM95720) rev 5720000] (PCI Express) MAC",
"tg3 ADDR eth0: attached PHY is 5720C (10/100/1000Base-T Ethernet) (WireSpeed[1], EEE[1])",
"tg3 ADDR eth0: dma_rwctrl[00000001] dma_mask[64-bit]",
"tg3 ADDR eth1: RXcsums[1] LinkChgREG[0] MIirq[0] ASF[1] TSOcap[1]",
"tg3 ADDR eth1: Tigon3 [partno(BCM95720) rev 5720000] (PCI Express) MAC",
"tg3 ADDR eth1: attached PHY is 5720C (10/100/1000Base-T Ethernet) (WireSpeed[1], EEE[1])",
"tg3 ADDR eth1: dma_rwctrl[00000001] dma_mask[64-bit]",
"thermal_sys: Registered thermal governor",
"total RAM covered: 2095104M",
"tsc: Detected 2000.000 MHz processor",
"tun: Universal TUN/TAP device driver, 1.6",
"usb: port power management may be unreliable",
"usbcore:",
"vgaarb: loaded",
"with arguments:",
"with environment:",
"wmi_bus wmi_bus-PNP0C14:01: WQBC data block query control method not found",
"workingset: timestamp_bits=36 max_order=28 bucket_order=0",
"workqueue: work_for_cpu_fn",
"x2apic: IRQ remapping doesn't support X2APIC mode",
"x86/PAT: Configuration [0-7]: WB WC UC- UC WB WP UC- WT",
"x86/cpu: SGX disabled by BIOS.",
"x86/cpu: User Mode Instruction Prevention (UMIP) activated",
"x86/cpu: VMX (outside TXT) disabled by BIOS",
"x86/fpu: Enabled xstate features HEX, context size is 10752 bytes, using 'compacted' format.",
"x86/fpu: Supporting XSAVE feature HEX:",
"x86/mm: Checked W+X mappings: passed, no W+X pages found.",
"x86/mm: Memory block size: 2048MB",
"x86/split lock detection: #AC: crashing the kernel on kernel split_locks and warning on user-space split_locks",
"x86/tme: not enabled by BIOS",
"xhci_hcd ADDR: Host supports USB 3.1 Enhanced SuperSpeed",
"xhci_hcd ADDR: hcc params HEX hci version HEX quirks HEX",
"xhci_hcd ADDR: new USB bus registered, assigned bus number 1",
"xhci_hcd ADDR: new USB bus registered, assigned bus number 2",
"xhci_hcd ADDR: xHCI Host Controller",
"xor: automatically using best checksumming function avx",
"zbud: loaded",
"zhaoxin Shanghai",
"evict_inodes",
"nvidia-peermem nv_get_p2p_free_callback:125 ERROR detected invalid context, skipping further processing",
"mlx5_core ADDR: Port module event: module 0, Cable unplugged",
"Cannot map memory with base addr HEX and size of HEX pages",
"Received SIGTERM from PID 1 (systemd)",
"process `grep' is using deprecated sysctl",
"hid: raw HID events driver (C) Jiri Kosina",
"usbhid: USB HID core driver",
"scsiIDDirect-Access Linux RACADM 0001 PQ: 0 ANSI: 0",
"scsiIDDirect-Access Linux SECUPD 0001 PQ: 0 ANSI: 0",
"sdIDPower-on or device reset occurred",
"sdID[sda] Write Protect is off",
"sdID[sda] Mode Sense: 23 00 00 00",
"sdID[sda] No Caching mode page found",
"sdID[sda] Mode Sense: 45 00 00 00",
"sdID[sda] Write cache: disabled, read cache: enabled, doesn't support DPO or FUA",
"sdID[sda] Assuming drive cache: write through",
"sda: sda1",
"sdID[sda] Attached SCSI removable disk",
"nvidia-uvm: Loaded the UVM driver, major device number",
"scsiIDDirect-Access PNY USB 2.0 FD PMAP PQ: 0 ANSI: 6",
"nfs: server 10.0.96.201",
"Skipping core dump",
]
# A mapping from initial message to a termination message
# All messages between the initial and termination message should be whitelisted
WHITELISTED_MESSAGE_RANGES = {"invoked oom-killer: gfp_mask=": "oom_reaper: reaped process"}
# The set of all SXid error ids that are known to be harmless.
# See D.4 of https://docs.nvidia.com/datacenter/tesla/pdf/fabric-manager-user-guide.pdf
WHITELISTED_NVSWITCH_SXID_ERRORS = [
"11012",
"11021",
"11022",
"11023",
"12021",
"12023",
"15008",
"15011",
"19049",
"19055",
"19057",
"19059",
"19062",
"19065",
"19068",
"19071",
"24001",
"24002",
"24003",
"22013",
]

View File

@@ -0,0 +1,20 @@
#!/bin/bash
REMOTE_IP=$1
REMOTE_USER=$2
REMOTE_PORT=$3
execute_remote() {
ssh ${REMOTE_USER}@${REMOTE_IP} -p ${REMOTE_PORT} "$@"
}
# Reboot the machine
echo "Rebooting ${REMOTE_IP}..."
execute_remote "sudo reboot -h now"
# Wait for machine to reboot
while true; do
if execute_remote "exit"; then
break
fi
sleep 10 # Wait for 10 seconds before trying again
done

View File

@@ -0,0 +1,17 @@
#!/bin/bash
set -ux
docker info && { docker info | grep Storage | grep -q overlay || exit 0 ; }
docker ps | grep -v STATUS | awk '{ print $1 }' | xargs sudo docker kill
systemctl stop docker.socket
cp /etc/docker/daemon.json /tmp/
cat /tmp/daemon.json | jq ' ."storage-driver"="zfs"' > /etc/docker/daemon.json
if mount | grep -q '^overlay'; then
mount | grep '^overlay' | awk '{ print $3 }' | xargs umount
else
echo "No overlay filesystems are currently mounted."
fi
timeout 200 rm -r /var/lib/docker/overlay2/
systemctl restart docker.socket
docker info | grep Storage | grep -q zfs || exit 1

View File

@@ -0,0 +1,31 @@
#!/bin/bash
REMOTE_IP=$1
REMOTE_USER=$2
REMOTE_PORT=$3
execute_remote() {
ssh {REMOTE_USER}@${REMOTE_IP} -p ${REMOTE_PORT} "$@"
}
execute_remote << 'EOF'
if [ ! -d /tmp ]; then
exit 1
fi
EOF
if [ $? -ne 0 ]; then
echo "Directory /tmp does not exist on ${REMOTE_IP}"
exit 1
fi
script_dir=$(dirname "$(realpath "$0")")
scp -P ${REMOTE_PORT} ${script_dir}/fix_docker.sh ${REMOTE_USER}@${REMOTE_IP}:/tmp
echo "FINISHED COPYING FILES from ${script_dir} to ${REMOTE_IP}:/tmp"
# Pre-reboot commands
execute_remote << 'EOF'
sudo bash tmp/fix_docker.sh
EOF

View File

@@ -0,0 +1,32 @@
#!/bin/bash
REMOTE_IP=$1
REMOTE_USER=$2
REMOTE_PORT=$3
execute_remote() {
ssh ${REMOTE_USER}@${REMOTE_IP} -p ${REMOTE_PORT} "$@"
}
execute_remote << 'EOF'
if [ ! -d /tmp ]; then
exit 1
fi
EOF
if [ $? -ne 0 ]; then
echo "Directory /tmp does not exist on ${REMOTE_IP}"
exit 1
fi
script_dir=$(dirname "$(realpath "$0")")
scp -P ${REMOTE_PORT} ${script_dir}/fix_docker.sh ${REMOTE_USER}@${REMOTE_IP}:/tmp
echo "FINISHED COPYING FILES from ${script_dir} to ${REMOTE_IP}:/tmp"
# Pre-reboot commands
execute_remote << 'EOF'
sudo bash /tmp/fix_docker.sh
sudo service docker restart
EOF

View File

@@ -0,0 +1,29 @@
#!/bin/bash
REMOTE_IP=$1
REMOTE_USER=$2
REMOTE_PORT=$3
execute_remote() {
ssh ${REMOTE_USER}@${REMOTE_IP} -p ${REMOTE_PORT} "$@"
}
execute_remote << 'EOF'
if [ ! -d /tmp ]; then
exit 1
fi
EOF
if [ $? -ne 0 ]; then
echo "Directory /tmp does not exist on ${REMOTE_IP}"
exit 1
fi
script_dir=$(dirname "$(realpath "$0")")
scp -P ${REMOTE_PORT} ${script_dir}/txvr-fw-update-on-host.sh ${REMOTE_USER}@${REMOTE_IP}:/tmp
echo "FINISHED COPYING FILES from ${script_dir} to ${REMOTE_IP}:/tmp"
# Pre-reboot commands
execute_remote "sudo bash /tmp/txvr-fw-update-on-host.sh"

View File

@@ -0,0 +1,14 @@
#!/bin/bash
REMOTE_IP=$1
REMOTE_USER=$2
REMOTE_PORT=$3
execute_remote() {
ssh ${REMOTE_USER}@${REMOTE_IP} -p ${REMOTE_PORT} "$@"
}
# Remote commands
execute_remote << 'EOF'
sudo service nvidia-fabricmanager restart
EOF

View File

@@ -0,0 +1,25 @@
#!/bin/bash
REMOTE_IP=$1
REMOTE_USER=$2
REMOTE_PORT=$3
execute_remote() {
ssh ${REMOTE_USER}@${REMOTE_IP} -p ${REMOTE_PORT} "$@"
}
# Pre-reboot commands
execute_remote << 'EOF'
nvidia-smi -e 1
EOF
# Reboot the machine
echo "Rebooting ${REMOTE_IP}..."
execute_remote "sudo reboot -h now"
# Wait for machine to reboot
while true; do
if execute_remote "exit"; then
break
fi
sleep 10 # Wait for 10 seconds before trying again
done

View File

@@ -0,0 +1,20 @@
#!/bin/bash
REMOTE_IP=$1
REMOTE_USER=$2
REMOTE_PORT=$3
execute_remote() {
ssh ${REMOTE_USER}@${REMOTE_IP} -p ${REMOTE_PORT} "$@"
}
# Reboot the machine
echo "Rebooting ${REMOTE_IP}..."
execute_remote "sudo reboot -h now"
# Wait for machine to reboot
while true; do
if execute_remote "exit"; then
break
fi
sleep 10 # Wait for 10 seconds before trying again
done

View File

@@ -0,0 +1,15 @@
#!/bin/bash
REMOTE_IP=$1
REMOTE_USER=$2
REMOTE_PORT=$3
execute_remote() {
ssh ${REMOTE_USER}@${REMOTE_IP} -p ${REMOTE_PORT} "$@"
}
# Pre-reboot commands
execute_remote << 'EOF'
sudo sed -i '/swap/d' /etc/fstab
sudo swapoff -a
EOF

View File

@@ -0,0 +1,43 @@
#!/bin/bash
REMOTE_IP=$1
REMOTE_USER=$2
REMOTE_PORT=$3
execute_remote() {
ssh ${REMOTE_USER}@${REMOTE_IP} -p ${REMOTE_PORT} "$@"
}
execute_remote << 'EOF'
echo "Pre-reboot code"
EOF
script_dir=$(dirname "$(realpath "$0")")
scp -P ${REMOTE_PORT} ${script_dir}/uninstall_nvidia.sh ${REMOTE_USER}@${REMOTE_IP}:/tmp
scp -P ${REMOTE_PORT} ${script_dir}/reinstall_nvidia.sh ${REMOTE_USER}@${REMOTE_IP}:/tmp
echo "FINISHED COPYING FILES from ${script_dir} to ${REMOTE_IP}:/tmp"
# Pre-reboot commands
execute_remote << 'EOF'
bash /tmp/uninstall_nvidia.sh
bash /tmp/reinstall_nvidia.sh
EOF
# Reboot the machine
echo "Rebooting ${REMOTE_IP}..."
execute_remote "sudo reboot -h now"
# Wait for machine to reboot
while true; do
if execute_remote "exit"; then
break
fi
sleep 10 # Wait for 10 seconds before trying again
done
# Post-reboot commands
echo "${REMOTE_IP} reboot completed"
execute_remote << 'EOF'
echo "Post-reboot code"
EOF

View File

@@ -0,0 +1,30 @@
#!/bin/bash
REMOTE_IP=$1
REMOTE_USER=$2
REMOTE_PORT=$3
execute_remote() {
ssh ${REMOTE_USER}@${REMOTE_IP} -p ${REMOTE_PORT} "$@"
}
execute_remote << 'EOF'
if [ ! -d /tmp ]; then
exit 1
fi
EOF
if [ $? -ne 0 ]; then
echo "Directory /tmp does not exist on ${REMOTE_IP}"
exit 1
fi
script_dir=$(dirname "$(realpath "$0")")
scp -P ${REMOTE_PORT} ${script_dir}/fix_zpool.sh ${REMOTE_USER}@${REMOTE_IP}:/tmp
echo "FINISHED COPYING FILES from ${script_dir} to ${REMOTE_IP}:/tmp"
# Pre-reboot commands
execute_remote << 'EOF'
bash /tmp/fix_zpool.sh
EOF

View File

@@ -0,0 +1,47 @@
#!/bin/bash
export DEBIAN_FRONTEND=noninteractive
export DEBCONF_NONINTERACTIVE_SEEN=true
echo 'deb https://nvidia.github.io/libnvidia-container/stable/deb/$(ARCH) /' | sudo tee -a /etc/apt/sources.list.d/nvidia_github_io_libnvidia_container_stable_deb_ARCH.list
echo 'deb [signed-by=/usr/share/keyrings/cuda-archive-keyring.gpg] http://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/ /' | sudo tee -a /etc/apt/sources.list.d/cuda-ubuntu2204-x86_64.list
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/cuda-keyring_1.0-1_all.deb && yes | sudo dpkg -i cuda-keyring_1.0-1_all.deb
curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add -
#sudo apt-get update -y
sudo apt-get install -y linux-headers-$(uname -r)
#sudo apt-get install -y linux-nvidia=5.15.0.1042.42 linux-tools-nvidia=5.15.0.1042.42 --no-install-recommends
sudo apt-get install -y linux-nvidia=5.15.0.1047.47 linux-tools-nvidia=5.15.0.1047.47 --no-install-recommends --allow-downgrades
#sudo apt-get install -s cuda-drivers-fabricmanager-535=535.183.01-1 nvidia-fabricmanager-535=535.183.01-1 cuda-drivers-535=535.183.01-1 --no-install-recommends
#sudo apt-get install -y cuda-drivers-fabricmanager-535=535.183.01-1 nvidia-fabricmanager-535=535.183.01-1 cuda-drivers-535=535.183.01-1 --no-install-recommends
sudo apt-get install -y --no-install-recommends --allow-downgrades \
cuda-drivers-535=535.183.01-1 \
cuda-drivers-fabricmanager-535=535.183.01-1 \
libnvidia-cfg1-535:amd64=535.183.01-0ubuntu1 \
libnvidia-common-535=535.183.01-0ubuntu1 \
libnvidia-compute-535:amd64=535.183.01-0ubuntu1 \
libnvidia-decode-535:amd64=535.183.01-0ubuntu1 \
libnvidia-encode-535:amd64=535.183.01-0ubuntu1 \
libnvidia-extra-535:amd64=535.183.01-0ubuntu1 \
libnvidia-fbc1-535:amd64=535.183.01-0ubuntu1 \
libnvidia-gl-535:amd64=535.183.01-0ubuntu1 \
nvidia-compute-utils-535=535.183.01-0ubuntu1 \
nvidia-dkms-535=535.183.01-0ubuntu1 \
nvidia-driver-535=535.183.01-0ubuntu1 \
nvidia-fabricmanager-535=535.183.01-1 \
nvidia-kernel-common-535=535.183.01-0ubuntu1 \
nvidia-kernel-source-535=535.183.01-0ubuntu1 \
nvidia-utils-535=535.183.01-0ubuntu1 \
xserver-xorg-video-nvidia-535=535.183.01-0ubuntu1
sudo sed -i 's/--no-persistence-mode/--persistence-mode/' /usr/lib/systemd/system/nvidia-persistenced.service
sudo systemctl enable nvidia-fabricmanager.service
sudo apt-get install -y nvidia-container-toolkit=1.14.3-1 nvidia-container-toolkit-base=1.14.3-1
sudo modprobe nvidia
sudo service docker restart
sudo modprobe nvidia-peermem || true

View File

@@ -0,0 +1,29 @@
#!/bin/bash
FW='fw_47_171_00001_dev_signed.bin'
if [ "$1" = '--dry-run' ] ; then
DRY_RUN=echo
else
DRY_RUN=
fi
cd /tmp
if [[ ! -f $FW ]]
then
wget "https://networkingdownloads.nvidia.com/custhelp/Non_Monetized_Products/LinkX/MMA4Z00/$FW"
fi
script_dir=$(dirname "$(realpath "$0")")
for HCA in $(jq --raw-output '.ib_hcas | keys[]' "${script_dir}"/../config.json) ;
do
flint -d $HCA --linkx --downstream_device_ids 1 query full
FV=$(flint -d $HCA --linkx --downstream_device_ids 1 query | grep 47.171.0001)
if [[ $FV == "" ]]
then
$DRY_RUN flint -d $HCA --linkx --linkx_auto_update --image $FW --download_transfer --activate burn || echo 'burn failed ' $HCA
fi
done

View File

@@ -0,0 +1,14 @@
sudo lsof /dev/nvidia* | awk '{ print $2 }' | grep -v PID | uniq | xargs sudo kill -9
timeout 5 sudo systemctl stop nvidia-persistenced &
sleep 5
sudo lsof /dev/nvidia* | awk '{ print $2 }' | grep -v PID | uniq | xargs sudo kill -9
sudo rmmod nvidia_drm nvidia_modeset nvidia nvidia_uvm
sudo rmmod nvidia_drm nvidia_modeset nvidia nvidia_uvm
sudo rmmod nvidia_drm nvidia_modeset nvidia nvidia_uvm
sudo rmmod nvidia_drm nvidia_modeset nvidia nvidia_uvm
sudo rmmod nvidia_drm nvidia_modeset nvidia nvidia_uvm
sudo modprobe -r nvidia
sudo rm -r /usr/share/nvidia/nvswitch
sudo apt remove -y --purge '^nvidia-.*'
sudo apt purge -y --auto-remove '^nvidia-.*'

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,117 @@
"""
Usage: python run_health_checks.py <nodes> where nodes is a comma separated list of nodes
or provide a list of nodes as a list of strings in the config
"""
import sys
import time
from multiprocessing import Process
from multiprocessing import Queue
import os
import json
from health_checks import ComputeHostHealth, HealthCheck, get_health_check_from_str
from health_checks import HealthCheckCommandError
from health_checks import ALL_HEALTH_CHECKS
from health_checks import outcome_to_health_check_result
from utils.commands import SSHConnectionData
from utils.commands import SUBPROCESS_STOPPED_BY_REQUEST_EXIT_CODE
from utils.commands import run_remote_command
EXPECTED_CONFIG_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config.json')
with open(EXPECTED_CONFIG_FILE, 'r') as f:
EXPECTED_CONFIG = json.load(f)
def run_health_check(node: str, health_check: HealthCheck, queue: Queue, timeout_sec: int = 100) -> None:
connection = SSHConnectionData(ip=node, port=EXPECTED_CONFIG["node_info"]["port"], user=EXPECTED_CONFIG["node_info"]["user"])
command = health_check.create_command()
try:
result = run_remote_command(
machine_ssh_command=connection.get_ssh_command(),
remote_command=command,
is_checked=True,
timeout_sec=timeout_sec,
)
except Exception as e:
# If the exception does not have a returncode, we use a known sentinel.
NO_RETURN_CODE = -1
returncode = getattr(e, "returncode", NO_RETURN_CODE)
if returncode == SUBPROCESS_STOPPED_BY_REQUEST_EXIT_CODE:
print("Health check timed out.")
else:
print(f"Health check failed with return code: {returncode}")
queue.put(
(connection.ip, HealthCheckCommandError(message=str(e), returncode=returncode, cause=str(returncode)))
)
return
outcome = health_check.validate_result(result.output, result.returncode)
queue.put((connection.ip, outcome))
if __name__ == "__main__":
if len(sys.argv) > 1:
nodes = sys.argv[1]
node_list = nodes.split(",")
elif "node_info" in EXPECTED_CONFIG and "nodes" in EXPECTED_CONFIG["node_info"]:
node_list = EXPECTED_CONFIG["node_info"]["nodes"]
if len(node_list) == 0:
raise ValueError("No nodes provided, please provide either through command line or config.")
else:
raise ValueError("No nodes provided, please provide either through command line or config.")
print("Running health checks on nodes: ", node_list)
health_check = ALL_HEALTH_CHECKS
if "leaf_health_checks" in EXPECTED_CONFIG:
leaf_health_checks_str = EXPECTED_CONFIG["leaf_health_checks"]
if len(leaf_health_checks_str) == 0:
print("No health checks provided, running all health checks")
else:
health_check = get_health_check_from_str(",".join(EXPECTED_CONFIG["leaf_health_checks"]))
print("Running health checks: ", health_check)
if health_check is None:
print("Couldn't find health checks", EXPECTED_CONFIG["leaf_health_checks"])
health_check = ALL_HEALTH_CHECKS
processes = []
queue = Queue()
health_check_timeout = 100
for node in node_list:
p = Process(
target=run_health_check,
args=(node, health_check, queue, health_check_timeout),
)
processes.append(p)
p.start()
start_time = time.time()
results = []
while len(results) < len(processes) or not queue.empty():
results.append(queue.get())
for p in processes:
p.join()
health_check_results = {
ComputeHostHealth.OK: [],
ComputeHostHealth.UNHEALTHY: [],
ComputeHostHealth.CRITICAL: [],
ComputeHostHealth.UNKNOWN: [],
}
for result in results:
node, outcome = result
health_check_results[outcome_to_health_check_result(outcome)].append((node, outcome))
for health, node_outcomes in health_check_results.items():
if len(node_outcomes) > 0:
print(f"Health {health}")
if health == ComputeHostHealth.OK:
continue
for node, outcome in node_outcomes:
message = outcome.message.replace("\n", "\n\t")
remediation = outcome.suggested_remediation.replace("\n", "\n\t")
print(f"{node}: \n{message}\n{remediation}")
print("\n\n----------------------------SUMMARY:----------------------------\n")
for health, node_outcomes in health_check_results.items():
nodes = [node_outcome[0] for node_outcome in node_outcomes]
print(f"Nodes with {health}: {nodes}")

View File

@@ -0,0 +1,317 @@
import os
import shlex
import subprocess
import tempfile
import time
from abc import ABC
from abc import abstractmethod
from collections.abc import Mapping
from contextlib import contextmanager
from copy import deepcopy
from functools import cached_property
from itertools import groupby
from threading import Event
from threading import Thread
from typing_extensions import Any
from typing_extensions import Callable
from typing_extensions import Dict
from typing_extensions import Iterable
from typing_extensions import Iterator
from typing_extensions import List
from typing_extensions import NoReturn
from typing_extensions import Optional
from typing_extensions import Protocol
from typing_extensions import Self
from typing_extensions import Tuple
from typing_extensions import TypeVar
from typing_extensions import overload
from uuid import uuid4
import attr
class _SupportsLessThan(Protocol):
def __lt__(self, __other: Any) -> bool:
...
T = TypeVar("T")
TK = TypeVar("TK", bound=_SupportsLessThan)
TV = TypeVar("TV")
def group_by_helper(data: Iterable[TV], get_key: Callable[[TV], TK]) -> Dict[TK, List[TV]]:
data = sorted(data, key=get_key)
return {k: list(g) for k, g in groupby(data, get_key)}
class FrozenMapping(Mapping[T, TV], ABC):
@abstractmethod
def __hash__(self) -> int:
...
# NOTE: `_key` is not `sorted` because A. not all python objects are sortable and python dictionaries are insertion-ordered.
class _FrozenDict(Dict[T, TV], FrozenMapping[T, TV]):
def _key(self) -> Tuple[Tuple[T, TV], ...]:
return tuple(self.items())
@cached_property
def _hash(self) -> int:
return hash(self._key())
def __hash__(self) -> int: # type: ignore
return self._hash
def _mutation_error(self, method: str) -> RuntimeError:
return RuntimeError(f"Cannot call mutation method {method} on _FrozenDict {self}")
def __setitem__(self, __name: T, __value: TV) -> NoReturn:
raise self._mutation_error("__setitem__")
def __delitem__(self, __name: T) -> NoReturn:
raise self._mutation_error("__delitem__")
def update(self, __m: Mapping[T, TV]) -> NoReturn: # type: ignore
raise self._mutation_error("update")
def setdefault(self, __name: T, __value: TV) -> NoReturn:
raise self._mutation_error("setdefault")
def pop(self, __name: T, __default: TV) -> NoReturn: # type: ignore
raise self._mutation_error("pop")
def popitem(self) -> NoReturn:
raise self._mutation_error("popitem")
def clear(self) -> NoReturn:
raise self._mutation_error("clear")
def __repr__(self) -> str:
return f"_FrozenDict({super().__repr__()})"
def __copy__(self) -> Self:
return type(self)(self)
def __deepcopy__(self, memo: Dict[int, Any]) -> Self:
memo[id(self)] = self
copied_items = ((deepcopy(key, memo), deepcopy(value, memo)) for key, value in self.items())
return type(self)(copied_items)
def __reduce__(self) -> Tuple[Any, ...]:
return (_FrozenDict, (dict(self),))
@overload
def freeze_mapping(**kwargs: TV) -> FrozenMapping[str, TV]:
...
@overload
def freeze_mapping(mapping: Mapping[T, TV], **kwargs: TV) -> FrozenMapping[T, TV]:
...
@overload
def freeze_mapping(__iterable: Iterable[Tuple[T, TV]]) -> FrozenMapping[T, TV]:
...
@overload
def freeze_mapping(__iterable: Iterable[Tuple[T, TV]], **kwargs: TV) -> FrozenMapping[T, TV]:
...
def freeze_mapping(*args: object, **kwargs: object) -> _FrozenDict:
return _FrozenDict(*args, **kwargs)
def remove_none(data: Iterable[Optional[T]]) -> List[T]:
return [x for x in data if x is not None]
SUBPROCESS_STOPPED_BY_REQUEST_EXIT_CODE = -9999
def _set_event_after_time(event: Event, seconds: float) -> None:
event.wait(seconds)
event.set()
@contextmanager
def get_expiration_event(seconds: float) -> Iterator[Event]:
event = Event()
# Since we set `daemon=True`, we don't need to join the thread.
Thread(target=_set_event_after_time, args=(event, seconds), name=f"timeout_thread_{uuid4()}", daemon=True).start()
try:
yield event
finally:
event.set()
@attr.s(auto_exc=True, auto_attribs=True)
class CommandError(Exception):
"""
An error that occurred while running a command.
"""
command: str
returncode: int
output: str
ssh_command: Optional[str] = None
@attr.s(auto_exc=True, auto_attribs=True)
class CompletedProcess:
"""
Mostly a reimplementation of subprocess.CompletedProcess but allows us to deal with some specific concerns.
A class to make process results easier to work with for us. We have a couple concerns that are different from typical:
We run commands over SSH a lot and care about making sure that those errors clearly show both the command being run and the host being run on.
We put the output from stdout and stderr together (should we!?).
There's no support for binary commands.
"""
returncode: int
output: str
command: str
def check(self) -> Self:
if self.returncode != 0:
raise CommandError(
command=self.command,
returncode=self.returncode,
output=self.output,
)
return self
@attr.s(auto_exc=True, auto_attribs=True)
class RemoteCompletedProcess(CompletedProcess):
"""
A remote completed process. Beyond CompletedProcess, includes ssh information.
"""
ssh_command: str
def check(self) -> Self:
if self.returncode != 0:
raise CommandError(
command=self.command,
ssh_command=self.ssh_command,
returncode=self.returncode,
output=self.output,
)
return self
def run_local_command(
command: str,
is_checked: bool = True,
timeout_sec: Optional[int] = None,
shutdown_timeout_sec: int = 30,
) -> CompletedProcess:
process = subprocess.Popen(
command,
shell=True,
executable="/bin/bash",
bufsize=1,
encoding="UTF-8",
errors="replace",
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env={**os.environ, "TERM": "dumb"},
)
exit_code = None
start_time = time.time()
while exit_code is None:
if timeout_sec is not None and time.time() - start_time > timeout_sec:
print("Terminating process due to timeout.")
process.terminate()
try:
process.wait(timeout=shutdown_timeout_sec)
except subprocess.TimeoutExpired as e:
# this sends SIGKILL which immediately kills the process
process.kill()
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired as e:
print(f"process {process.pid} didn't terminate after kill")
# use a special exit code so it doesn't look like this was a real failure of the process
exit_code = SUBPROCESS_STOPPED_BY_REQUEST_EXIT_CODE
break
exit_code = process.poll()
time.sleep(0.1)
if exit_code == 0:
output = process.stdout.read()
else:
output = "ERROR"
result = CompletedProcess(
returncode=exit_code,
output=output,
command=command,
)
if is_checked:
result.check()
return result
pipe_to_local_file_cmd_suffix = ""
def run_remote_command(
machine_ssh_command: str,
remote_command: str,
is_checked: bool = True,
timeout_sec: Optional[int] = None,
shutdown_timeout_sec: int = 30,
) -> RemoteCompletedProcess:
"""
:raises SSHConnectionError: if `is_checked and returncode == 255` (the ssh reserved error code)
:raises RemoteCommandError: if `is_checked and returncode not in (0, 255)`
"""
escaped_remote_command = shlex.quote(remote_command)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file_name = temp_file.name
command = f"{machine_ssh_command} {escaped_remote_command} 2>&1 | tee {temp_file_name} > /dev/null"
result = run_local_command(
command=command,
is_checked=False,
timeout_sec=timeout_sec,
shutdown_timeout_sec=shutdown_timeout_sec,
)
remote_result = RemoteCompletedProcess(
returncode=result.returncode,
output=open(temp_file_name, "r").read(),
ssh_command=machine_ssh_command,
command=remote_command,
)
os.remove(temp_file_name)
if is_checked:
remote_result.check()
return remote_result
DISABLE_HOST_KEY_CHECKING = f" -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR "
@attr.s(auto_attribs=True, frozen=True)
class SSHConnectionData:
ip: str
port: int
user: str
def get_ssh_command(
self,
connection_timeout_seconds: Optional[int] = 10,
) -> str:
connection_timeout = ""
if connection_timeout_seconds is not None:
connection_timeout = f"-o ConnectTimeout={connection_timeout_seconds}"
base_command = f"ssh {DISABLE_HOST_KEY_CHECKING} -p {self.port}"
return f"{base_command} {connection_timeout} {self.user}@{self.ip}"

View File

@@ -0,0 +1,416 @@
import argparse
import json
import os
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from typing import Dict
from typing import Final
from typing import Generator
from typing import Iterable
from typing import List
from typing import Mapping
from typing import NewType
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import TypeVar
from typing import Union
import numpy as np
from gpu_connection_test import IbResultsSchema
from loguru import logger
from p2p_ib_test import run_p2p_ib_tests
from p2p_ib_test import shutdown_test
from utils.events import get_expiration_event
from utils.run_command import CommandRunner
from utils.run_command import ContainerSSHConnectionData
from utils.run_command import RemoteCommandRunner
from utils.run_command import run_local_command
from utils.serialization import deserialize_from_json
_PORT_LOCK = threading.Lock()
_PORTS_PER_HOST: Final[Dict[str, int]] = {}
IP = str
def get_port_to_use(master_addr: str) -> int:
with _PORT_LOCK:
port_to_use = _PORTS_PER_HOST.get(master_addr, 9000)
_PORTS_PER_HOST[master_addr] = port_to_use + 1
return port_to_use
T = TypeVar("T")
def format_env(env: Dict[str, str]) -> str:
return " ".join(f"{key}={value}" for key, value in env.items())
EXPIRATION_SECONDS: Final[float] = 100.0
ENV_VARS = {
"NCCL_NET": "IB",
"NCCL_DEBUG_SUBSYS": "ALL",
"NCCL_ASYNC_ERROR_HANDLING": "1",
# "NCCL_NET_GDR_LEVEL": "LOC", # LOC = DISABLE GDR (if it's enabled on the kernel)
}
CONFIG_FILE = Path(os.path.realpath(__file__)).parent.parent / "health_checks" / "config.json"
with open(CONFIG_FILE, 'r') as f:
CONFIG_FILE = json.load(f)
ENV_VARS["NCCL_IB_HCA"] = "=" + ",".join(CONFIG_FILE["infiniband_status"]["device_names"])
ErrorString = NewType("ErrorString", str)
TestResultType = Union[Tuple[float, ...], ErrorString]
def generate_chunks(iterable: Iterable[T], chunk_size: int) -> Generator[Tuple[T, ...], None, None]:
"""Yield successive n-sized chunks from any iterable"""
chunk = []
for item in iterable:
chunk.append(item)
if len(chunk) == chunk_size:
yield tuple(chunk)
chunk = []
if len(chunk) > 0:
yield tuple(chunk)
def parse_nvlink_test_for_stats(
ip_to_times: Mapping[IP, List[float]]
) -> Dict[str, Dict[str, Union[str, float, List[float]]]]:
ip_to_times_summary = {}
for ip, times in ip_to_times.items():
if not (all([isinstance(time, float) for time in times])):
# Add the mean with an error message as well, then we can always read one value in the health check
ip_to_times_summary[ip] = {
"error": "times are not all floats",
"mean": "Error: times are not all floats",
"times": times,
}
else:
ip_to_times_summary[ip] = {
"count": len(times),
"min": min(times),
"max": max(times),
"mean": sum(times) / len(times),
"25_percentile": np.percentile(times, 25),
"50_percentile": np.percentile(times, 50),
"75_percentile": np.percentile(times, 75),
}
logger.info(ip_to_times_summary)
return ip_to_times_summary
def run_ib_test(
connection: CommandRunner,
master_addr: str,
master_port: int,
rail: int,
rank: int,
world_size: int,
extra_flags: Optional[Dict[str, str]] = None,
) -> TestResultType:
try:
with get_expiration_event(EXPIRATION_SECONDS) as event:
extra_flags_str = " ".join([f"--{k} {v}" for k, v in extra_flags.items()]) if extra_flags else ""
command = (
f"{format_env(ENV_VARS)} python3 -m host_validation.gpu_connection_test ib --rail {rail} --master_addr {master_addr}"
+ f" --master_port {master_port} --rank {rank} --world_size {world_size} {extra_flags_str}"
)
result = connection.run_command(
command=command,
shutdown_event=event,
)
logger.debug(f"running {command} on {connection}")
ib_results_str = result.output.strip().split("\n")[-1]
ib_results: IbResultsSchema = deserialize_from_json(ib_results_str)
if ib_results.rail != rail:
logger.info(
f"WARNING: Expected output for rail {rail} but actually got output for rail {ib_results.rail}"
)
logger.debug(f"success running {command} on {connection}; result {ib_results.results}")
return ib_results.results
except Exception as e:
logger.info(f"caught exception running {command} on {connection}:\n{e}")
shutdown_test(connection, "tests.ib_tes[t]")
return ErrorString(f"Caught exception running tests: {e}")
def run_single_group(group: Sequence[CommandRunner], rail: int) -> Dict[Tuple[CommandRunner, int], TestResultType]:
master_addr = group[0].ip
master_port = get_port_to_use(master_addr)
logger.info(f"running {len(group)} node group {[x.ip for x in group]} on rail {rail}")
with ThreadPoolExecutor(max_workers=len(group)) as executor:
results = executor.map(
lambda rank_and_connection: run_ib_test(
rank_and_connection[1], master_addr, master_port, rail, rank_and_connection[0], len(group)
),
enumerate(group),
)
result = {(connection, rail): scores for connection, scores in zip(group, results)}
# result is kinda messy to print here
logger.info(f"finished running {len(group)} node group {[x.ip for x in group]} on rail {rail}")
return result
def run_single_group_across_all_rails(
group: Sequence[CommandRunner],
) -> Dict[Tuple[CommandRunner, int], TestResultType]:
master_addr = group[0].ip
master_port = get_port_to_use(master_addr)
connection_and_rail = [(connection, rail) for connection in group for rail in range(8)]
logger.info(
f"running {len(group)} node group {[x.ip for x in group]} across all rails with {len(connection_and_rail)} total workers"
)
with ThreadPoolExecutor(max_workers=len(connection_and_rail)) as executor:
results = executor.map(
lambda rank_connection_rail: run_ib_test(
rank_connection_rail[1][0],
master_addr,
master_port,
rank_connection_rail[1][1],
rank_connection_rail[0],
len(connection_and_rail),
),
enumerate(connection_and_rail),
)
result = {connection_rail: scores for connection_rail, scores in zip(connection_and_rail, results)}
# result is kinda messy to print here
logger.info(f"finished running {len(group)} node group {[x.ip for x in group]} on all rails")
return result
def run_experiments(
groups: Sequence[Sequence[CommandRunner]],
rail_aligned: bool = True,
) -> Dict[Tuple[CommandRunner, str], TestResultType]:
# each thread runs a single (group, rail) pair, so it will open group_size SSH connections
concurrent_groups = len(groups) * 8
with ThreadPoolExecutor(max_workers=concurrent_groups) as executor:
if not rail_aligned:
results = executor.map(run_single_group_across_all_rails, groups)
scores_by_connection_rail = {}
for result in results:
for (connection, rail), scores in result.items():
scores_by_connection_rail[(connection, str(rail))] = scores
return scores_by_connection_rail
else:
results = executor.map(
lambda group_and_rail: run_single_group(*group_and_rail),
[(group, rail) for group in groups for rail in range(8)],
)
scores_by_connection_rail = {(connection, "total"): 0.0 for group in groups for connection in group}
for result in results:
for (connection, rail), scores in result.items():
if all(isinstance(s, float) for s in scores):
scores_by_connection_rail[(connection, "total")] += sum(
[s for s in scores if isinstance(s, float)]
)
else:
scores_by_connection_rail[(connection, "total")] = ErrorString(f"Error on gpu {rail}")
scores_by_connection_rail[(connection, str(rail))] = scores
return scores_by_connection_rail
def run_group_ib_tests(
connections: Sequence[CommandRunner],
output_file: Optional[Path] = None,
rail_aligned: bool = True,
group_sizes: Tuple[int] = (1000,),
max_iterations: int = 1,
) -> Dict[Tuple[int, int], Dict[Tuple[CommandRunner, str], TestResultType]]:
if output_file:
with output_file.open("a+") as f:
f.write(f"Starting group IB tests with connections {[connection.ip for connection in connections]}\n")
iteration_to_scores = dict()
for count in range(max_iterations):
random.seed(count)
for group_size in group_sizes:
group_size = min(group_size, len(connections))
scores_by_connection = dict()
logger.info(f"Running tests for group size {group_size}")
group_size = min(group_size, len(connections))
mixed_nodes = list(connections)
random.shuffle(mixed_nodes)
groups = tuple(generate_chunks(mixed_nodes, group_size))
for connection_and_rail, scores in run_experiments(groups, rail_aligned).items():
scores_by_connection[connection_and_rail] = scores
logger.info(f"Finished tests for group size {group_size}")
log_lines = [
f"{connection.ip}-{rail}: {scores}"
for (connection, rail), scores in sorted(scores_by_connection.items())
]
logger.info(f"Results for group size {group_size}: \n" + "\n".join(log_lines))
if output_file:
with output_file.open("a+") as f:
f.write(f"Results for group size {group_size}: \n" + "\n".join(log_lines) + "\n")
iteration_to_scores[(count, group_size)] = scores_by_connection
return iteration_to_scores
def run_nvlink_test_single_host(
connection: CommandRunner,
dims: int,
loops: int,
) -> Tuple[Union[CommandRunner, str], TestResultType]:
logger.info(f"running on {connection.ip} node")
try:
with get_expiration_event(EXPIRATION_SECONDS) as event:
command = (
f"{format_env(ENV_VARS)} torchrun --nproc_per_node 8 host_validation/gpu_connection_test.py nvlink --dims {dims} --loops {loops}"
)
result = connection.run_command(
command=command,
shutdown_event=event,
)
nvlink_results_str = result.output.strip().split("\n")[-1]
nvlink_results = deserialize_from_json(nvlink_results_str)
return connection, nvlink_results.results
except Exception as e:
logger.info(f"caught exception running {command} on {connection}:\n{e}")
shutdown_test(connection, "tests.nvlink_tes[t]")
return connection, ErrorString(f"Caught exception running tests: {e}")
def run_nvlink_tests(
connections: Sequence[CommandRunner],
output_file: Optional[Path] = None,
dims: int = 1_000_000_000,
loops: int = 20,
) -> Dict[str, Dict[str, Union[float, List[float]]]]:
if output_file:
with output_file.open("a+") as f:
f.write(f"Starting nvlink tests with connections {[connection.ip for connection in connections]}\n")
group_size = 1
scores_by_connection = {}
nodes = [group[0] for group in generate_chunks(connections, group_size)]
with ThreadPoolExecutor(max_workers=len(nodes)) as executor:
results = executor.map(lambda node: run_nvlink_test_single_host(node, dims, loops), nodes)
for connection, scores in results:
scores_by_connection[connection.ip] = scores
log_lines = [
f"{connection}: {json.dumps(scores)}"
for connection, scores in sorted(scores_by_connection.items(), key=lambda item: item[1])
]
logger.info(f"Results: \n" + "\n".join(log_lines))
if output_file:
with output_file.open("a+") as f:
f.write(f"Results: \n" + "\n".join(log_lines) + "\n")
return parse_nvlink_test_for_stats(scores_by_connection)
def run_tests_single_node(command_runner: RemoteCommandRunner) -> None:
time = datetime.now()
logger.info(f"Starting single node tests for {command_runner.ip}")
readable_time = time.strftime("%Y-%m-%d %H:%M:%S")
filename_time = time.strftime("%Y%m%d%H%M%S")
tests_dir = Path(f"/mnt/private/tmp/health_tests/{command_runner.ip}/")
run_local_command(f"sudo mkdir -p {tests_dir}")
run_local_command(f"sudo chown user {tests_dir}")
run_local_command(f"sudo chgrp user {tests_dir}")
remote_tests_dir = Path("/mnt/unsafe_raw_shared_fs/tmp/health_tests/")
command_runner.run_command(f"sudo mkdir -p {remote_tests_dir}")
command_runner.run_command(f"sudo chown user {remote_tests_dir}")
command_runner.run_command(f"sudo chgrp user {remote_tests_dir}")
nvlink_results = run_nvlink_tests(
[command_runner], output_file=(tests_dir / "nvlink.txt"), dims=16_000_000, loops=30_000
)[command_runner.ip]
logger.info(f"Finished running nvlink tests for {command_runner.ip}")
results_dict = {
"time": readable_time,
"ip": command_runner.ip,
"nvlink": nvlink_results,
}
with open(tests_dir / "summary.json", "w+") as f:
json.dump(results_dict, f)
p2p_results = run_p2p_ib_tests(
[command_runner], single_host=True, output_file=(tests_dir / "p2p_ib.txt"), num_iterations=5
)[command_runner]
logger.info(f"Finished running p2p tests for {command_runner.ip}")
results_dict["p2p_ib"] = p2p_results
with open(tests_dir / "summary.json", "w+") as f:
json.dump(results_dict, f)
logger.info(f"Finished running tests for {command_runner.ip}")
command_runner.run_command(
f"cd {remote_tests_dir} && if [ -L latest ]; then rm latest; fi && ln -s {filename_time} latest"
)
def run_all_single_node_tests(connections: List[CommandRunner]) -> None:
with ThreadPoolExecutor(max_workers=len(connections)) as executor:
executor.map(run_tests_single_node, connections)
logger.info("Finished running all single node tests")
POSSIBLE_TESTS = ["group_ib", "p2p_ib", "nvlink", "wait", "all_single_node"]
def get_worker_connections() -> List[RemoteCommandRunner]:
host_info = CONFIG_FILE["node_info"]
nodes, port, user = host_info["nodes"], int(host_info["port"]), host_info["user"]
return [RemoteCommandRunner(connection=ContainerSSHConnectionData(ip=node, port=port, user=user)) for node in nodes]
def run(test: str) -> None:
assert test in POSSIBLE_TESTS, f"test {test} not in {POSSIBLE_TESTS}"
connections = get_worker_connections()
match test:
case "group_ib":
run_group_ib_tests(connections, rail_aligned=True)
case "all_rail_group_ib":
run_group_ib_tests(connections, rail_aligned=False)
case "p2p_ib":
run_p2p_ib_tests(connections)
case "nvlink":
run_nvlink_tests(connections)
case "all_single_node":
logger.info(f"Starting single node tests on {','.join([str(c.ip) for c in connections])}")
run_all_single_node_tests(connections)
case "wait":
logger.info("Waiting forever")
print(f"connections: {','.join([str(c) for c in connections])}")
logger.info(f"connections: {','.join([str(c) for c in connections])}")
time.sleep(100000)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--test")
args = parser.parse_args()
run(test=args.test)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,152 @@
import argparse
import os
import resource
from typing import List
from typing import Optional
from typing import Tuple
import attr
import torch
import torch.distributed as dist
from loguru import logger
from utils.dist import get_local_rank
from utils.dist import init_dist
from utils.serialization import serialize_to_json
from utils.timer import Timer
@attr.s(auto_attribs=True, frozen=True)
class NvlinkResultsSchema:
# Each result represents the duration of a single round of the nvlink test in ms
results: Tuple[float, ...] = attr.ib(converter=tuple)
@attr.s(auto_attribs=True, frozen=True)
class IbResultsSchema:
# Each result represents the duration of a single round of the ib test in ms
results: Tuple[float, ...] = attr.ib(converter=tuple)
rail: int
def run_checks(dim_items: int, loops: int, rail: Optional[int] = None) -> List[float]:
"""
Runs some diagnostics to check for gpu and communication issues, either communicating over nvlink or ib
"""
timer = Timer()
if not rail:
device_id = get_local_rank()
else:
device_id = rail
with timer("cuda"):
device = torch.device("cuda", device_id)
torch.cuda.set_device(device_id)
buffer = torch.ones((dim_items, dim_items), device=device, dtype=torch.float64)
# warmup
dist.all_reduce(buffer, op=dist.ReduceOp.AVG, async_op=False)
results = []
for i in range(loops):
with timer(f"all_reduce_{i}"):
with timer("send"):
waiter = dist.all_reduce(buffer, op=dist.ReduceOp.AVG, async_op=True)
with timer("sync"):
waiter.wait()
dist.barrier()
with timer("stat"):
buffer_sum = buffer.sum().item()
results.append(timer[f"all_reduce_{i}"])
return results
def run_ib(
master_addr: str, master_port: int, rank: int, world_size: int, rail: int, dims: int, loops: int, force_ib: bool
) -> None:
if force_ib:
os.environ.update(
{
"NCCL_P2P_DISABLE": "1",
"NCCL_SHM_DISABLE": "1",
}
)
dist.init_process_group(
backend="nccl", init_method=f"tcp://{master_addr}:{master_port}", rank=rank, world_size=world_size
)
logger.info(f"inited dist for ib_test: rank {rank}, world size {world_size}, rail {rail}")
results = run_checks(dim_items=int(dims**0.5), rail=rail, loops=loops)
logger.info(f"completed ib_test: rank {rank}, world size {world_size}, rail {rail}: result {results}")
# this is for the parent to grab in stdout
results_schema = IbResultsSchema(results=tuple(results), rail=rail)
# Can't use a block here since it can interleave badly with the other rails
print(serialize_to_json(results_schema))
def run_nvlink(dims: int, loops: int) -> None:
logger.info(f"starting nvlink_test")
init_dist()
rank = get_local_rank()
logger.info(f"inited dist for nvlink_test with rank {rank}")
results = run_checks(dim_items=int(dims**0.5), loops=loops)
logger.info(f"completed nvlink_test: result {results}")
# this is for the parent to grab in stdout
# a block could probably be used here instead, but just leave it consistent with the above test
results_schema = NvlinkResultsSchema(results=tuple(results))
print(serialize_to_json(results_schema))
@logger.catch(reraise=True)
def main() -> None:
parser = argparse.ArgumentParser()
command_parsers = parser.add_subparsers(dest="command")
nvlink_parser = command_parsers.add_parser(
"nvlink",
description="Run work to exercise the nvlink and time the communication, example usage: 'gpu_connection_test.py nvlink --dims 10000 --loops 10'",
)
nvlink_parser.add_argument(
"--dims",
type=int,
default=1_000_000_000,
help="items in array to all_reduce, specifically we create a sqrt(dim_items) x sqrt(dim_items) array",
)
nvlink_parser.add_argument("--loops", type=int, default=200, help="number of loops of allreduce to run")
nvlink_parser.set_defaults(func=run_nvlink)
ib_parser = command_parsers.add_parser(
"ib",
description="Run work to exercise the infiniband and time the communication, example usage: 'gpu_connection_test ib --rail 0 --master_addr 10.0.200.1 --master_port 5001 --rank 0 --world_size 8', the master_addr and master_port should be the same for all gpus in a group",
)
ib_parser.add_argument("--rail", type=int, required=True, help="the rail being run on")
ib_parser.add_argument("--master_addr", required=True)
ib_parser.add_argument("--master_port", type=int, required=True)
ib_parser.add_argument("--rank", type=int, required=True, help="the rank of the machine running this process")
ib_parser.add_argument("--world_size", type=int, required=True, help="the total number of gpus")
ib_parser.add_argument(
"--dims",
type=int,
default=1_000_000_000,
help="items in array to all_reduce, specifically we create a sqrt(dim_items) x sqrt(dim_items) array",
)
ib_parser.add_argument("--loops", type=int, default=200, help="number of loops of allreduce to run")
ib_parser.add_argument("--force_ib", action="store_true", help="whether to force communication over infiniband")
ib_parser.set_defaults(func=run_ib)
r_soft, r_hard = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (r_hard, r_hard))
args = parser.parse_args()
args_to_ignore = ("suppress_errors", "func", "command")
args.func(**{k: v for k, v in vars(args).items() if k not in args_to_ignore})
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,360 @@
import json
import os
import random
import re
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Dict
from typing import Final
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union
import attr
from loguru import logger
from utils.events import get_expiration_event
from utils.run_command import CommandRunner
from utils.run_command import FullConnection
EXPIRATION_SECONDS: Final[float] = 100.0
BW_ERROR_VALUE: Final[float] = 0.0
LAT_ERROR_VALUE: Final[float] = 10000000
USE_GDR: Final[bool] = True
BW_LOWER_LIMIT_GDR: Final[float] = 720
BW_LOWER_LIMIT_NO_GDR: Final[float] = 300
LAT_UPPER_LIMIT: Final[float] = 4.2
BW_TEST_OUTPUT_KEY = "BWaverage[Gb/sec]"
LAT_TEST_OUTPUT_KEY = "99%percentile[usec]"
IP = str
@attr.s(auto_attribs=True, frozen=True)
class HcaDescription:
pcie_device_description: str
@property
def pcie_slot_index(self) -> int:
return int(self.pcie_device_description.split("_")[1])
def get_gpu_index(self) -> int:
return MLX_CARDS.index(self.pcie_device_description)
def __str__(self) -> str:
return self.pcie_device_description
EXPECTED_VERSION_FILE = Path(os.path.realpath(__file__)).parent.parent / "health_checks" / "config.json"
with open(EXPECTED_VERSION_FILE, 'r') as f:
EXPECTED_VERSIONS = json.load(f)
MLX_CARDS: Final[Tuple[HcaDescription, ...]] = tuple(HcaDescription(device_name) for device_name in EXPECTED_VERSIONS["infiniband_status"]["device_names"])
def is_passing_host(card_to_result: Dict[str, Tuple[float, float]], gdr_enabled: bool = USE_GDR) -> bool:
if gdr_enabled:
bw_lower_limit = BW_LOWER_LIMIT_GDR
else:
bw_lower_limit = BW_LOWER_LIMIT_NO_GDR
for card, (bw, lat) in card_to_result.items():
if bw < bw_lower_limit or lat > LAT_UPPER_LIMIT:
return False
return True
def find_good_hosts(
connections_to_result: Dict[str, Dict[str, Tuple[float, float]]], gdr_enabled: bool = USE_GDR
) -> List[str]:
good_hosts = []
for connection, result in connections_to_result.items():
if is_passing_host(result, gdr_enabled):
good_hosts.append(connection)
return good_hosts
def parse_p2p_output(uncleaned_output: str, key: str) -> Optional[float]:
"""
The p2p output is terrible:
- The headers are not separated by tabs, but by variable numbers of spaces.
- The header values may themselves contain spaces.
- The --output=json option produces invalid JSON.
As a result, we have some nasty parsing logic here; see the unit tests for illustrative
examples.
If/when there is a better way to extract the desired information, we should use it.
"""
split_text = re.split("-+", uncleaned_output)
data_values = split_text[-2].strip()
# Change all the headers to not have spaces within them
data_values = data_values.replace("% percentile", "%percentile")
data_values = data_values.replace("BW ", "BW")
data_values = re.sub("Conflicting CPU frequency.*", "", data_values)
lines = [l for l in data_values.splitlines() if len(l.strip()) > 0]
headers = [x.strip() for x in re.split(r"\s+", lines[0]) if len(x.strip()) > 0]
values = [x.strip() for x in re.split(r"\s+", lines[1]) if len(x.strip()) > 0]
for header, val in zip(headers, values):
if header == key:
return float(val)
raise ValueError(f"Could not find key {key} in output {uncleaned_output}, output format may have changed")
def _build_ib_write_bw_command(
card: HcaDescription,
iters: int,
port: int,
use_gdr: bool,
other_ip: Optional[str] = None,
) -> str:
return " ".join(
(
"ib_write_bw",
"-b",
f"-d {card}",
*((f"--use_cuda", str(card.get_gpu_idx())) if use_gdr else ()),
*((other_ip,) if other_ip is not None else ()),
f"--iters {iters}",
f"-p {port}",
"--report_gbits",
)
)
def shutdown_test(connection: CommandRunner, command: str) -> None:
if "[" not in command:
# This is to escape the command, so we don't end up killing the pkill before it kills the process we care about
command = "[" + command[0] + "]" + command[1:]
tries = 0
max_retries = 10
while True:
running_commands_count = int(connection.run_command(f"ps aux | grep {command} | wc -l ").output.strip())
if running_commands_count == 0:
break
try:
connection.run_command(f"pkill -f {command}")
logger.info(f"killed {command} on {connection} on try {tries}")
except:
pass
tries += 1
if tries >= max_retries:
break
logger.info(f"failed to kill {command} on {connection} after {max_retries} tries")
def run_single_rail_test(
connection: CommandRunner,
other_ip: str,
is_head: bool,
gpu_idx_and_card: Tuple[int, HcaDescription],
same_host: bool = False,
iters: int = 5_000,
) -> Tuple[Union[CommandRunner, str], str, Tuple[float, float]]:
gpu_idx, card = gpu_idx_and_card
bw_output, lat_output = BW_ERROR_VALUE, LAT_ERROR_VALUE
try:
if is_head:
# Ensure the other card acting as a server has time to spin up
time.sleep(5)
with get_expiration_event(EXPIRATION_SECONDS) as event:
other_ip = other_ip if is_head else ""
if same_host:
port = 18515 + int(card.pcie_slot_index) % 6
else:
port = 18515 + int(card.pcie_slot_index)
command = _build_ib_write_bw_command(
card=card,
other_ip=other_ip,
iters=iters,
port=port,
use_gdr=USE_GDR,
)
bw_result = connection.run_command(command, shutdown_event=event)
if bw_result.returncode == 0:
bw_output = parse_p2p_output(bw_result.output, key=BW_TEST_OUTPUT_KEY)
else:
logger.info(
f"Trying to kill ib_write_bw on {connection.ip}:{card} with {bw_result.returncode} {bw_result.output}"
)
shutdown_test(connection, f"'ib_write_b[w] -d {card}'")
if is_head:
# Ensure the other card acting as a server has time to spin up
time.sleep(5)
with get_expiration_event(EXPIRATION_SECONDS) as event:
other_ip = other_ip if is_head else ""
if same_host:
port = 18514 - int(card.split("_")[1]) % 6
else:
port = 18514 - int(card.split("_")[1])
# Perftest supports CUDA latency tests with read/send verbs only
command = f"ib_write_lat -d {card} {other_ip} --iters {iters} -p {port}"
lat_result = connection.run_command(command, shutdown_event=event)
if lat_result.returncode == 0:
lat_output = parse_p2p_output(lat_result.output, key=LAT_TEST_OUTPUT_KEY)
else:
logger.info(
f"Trying to kill ib_write_lat on {connection.ip}:{card} with {lat_result.returncode} {lat_result.output}"
)
shutdown_test(connection, f"'ib_write_[l]at -d {card}'")
logger.info(f"Results for {connection.ip}:{card} bw: {bw_output} lat: {lat_output}")
return connection, card, (bw_output, lat_output)
except Exception as e:
# We add square brackets around the w such that we avoid killing the `pkill` command itself?
shutdown_test(connection, f"'ib_write_[l]at -d {card}'")
shutdown_test(connection, f"'ib_write_b[w] -d {card}'")
logger.info(f"caught exception on {connection}:\n{e}")
return connection, card, (bw_output, lat_output)
def run_p2p_ib_test(
connection: Union[CommandRunner, str], other_ip: str, is_head: True
) -> Tuple[str, Dict[str, Tuple[float, float]]]:
card_to_result = {}
for gpu_idx, card in enumerate(MLX_CARDS):
_, _, card_result = run_single_rail_test(connection, other_ip, is_head, (gpu_idx, card), same_host=False)
card_to_result[card] = card_result
return connection.ip, card_to_result
def run_single_p2p(
run: int,
connections: Sequence[FullConnection],
output_file: Optional[Path] = None,
) -> Dict[str, Dict[str, Tuple[float, float]]]:
connection_pairs = list(zip(connections[: len(connections) // 2], connections[len(connections) // 2 :]))
servers = [(pair[0].ssh_connection, pair[1].internal_ip, False) for pair in connection_pairs]
clients = [(pair[1].ssh_connection, pair[0].internal_ip, True) for pair in connection_pairs]
alternating_server_client = [item for pair in zip(servers, clients) for item in pair]
connection_to_result = {}
with ThreadPoolExecutor(max_workers=len(alternating_server_client)) as executor:
results = executor.map(
lambda group_and_card: run_p2p_ib_test(
connection=group_and_card[0],
other_ip=group_and_card[1],
is_head=group_and_card[2],
),
alternating_server_client,
)
for connection, result in results:
if output_file:
with output_file.open("a+") as f:
f.write(f"Results for {connection} in run {run}: {result}\n")
connection_to_result[connection] = result
return connection_to_result
def run_single_host_p2p(
run: int,
full_connections: Sequence[FullConnection],
output_file: Optional[Path] = None,
) -> Dict[str, Dict[str, Tuple[float, float]]]:
first_half_cards = MLX_CARDS[: len(MLX_CARDS) // 2]
second_half_cards = MLX_CARDS[len(MLX_CARDS) // 2 :]
servers = [
(connection.ssh_connection, connection.internal_ip, False, (gpu_idx, driver))
for gpu_idx, driver in enumerate(first_half_cards)
for connection in full_connections
]
clients = [
(connection.ssh_connection, connection.internal_ip, True, (gpu_idx + 4, driver))
for gpu_idx, driver in enumerate(second_half_cards)
for connection in full_connections
]
alternating_server_client = [item for pair in zip(servers, clients) for item in pair]
max_workers = len(alternating_server_client)
connection_to_result = {connection.ssh_connection.ip: dict() for connection in full_connections}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
run_results = executor.map(
lambda group_and_card: run_single_rail_test(
connection=group_and_card[0],
other_ip=group_and_card[1],
is_head=group_and_card[2],
gpu_idx_and_card=group_and_card[3],
same_host=True,
),
alternating_server_client,
)
for connection, card, result in run_results:
if output_file:
with output_file.open("a+") as f:
f.write(f"Results for {connection} {card} in run {run}: {result}\n")
connection_to_result[connection.ip][card] = result
logger.info(f"Finished running run {run} with {connection_to_result}")
return connection_to_result
def run_p2p_ib_tests(
connections: Sequence[CommandRunner],
output_file: Optional[Path] = None,
single_host: bool = False,
num_iterations: int = 15,
) -> Dict[str, Dict[str, int]]:
test = "p2p_ib" if not single_host else "host_p2p_ib"
ip_to_runs_passed: Dict[str, int] = {connection.ip: 0 for connection in connections}
if output_file:
with output_file.open("a+") as f:
f.write(f"Starting {test} test with connections {connections}\n")
full_connections = [FullConnection(ssh_connection=c, internal_ip="127.0.0.1") for c in connections]
last_results = None
for run_count in range(num_iterations):
try:
local_rng = random.Random()
local_rng.seed(run_count)
run_count += 1
if not single_host:
mixed_nodes = local_rng.sample(full_connections, len(full_connections))
connection_to_result = run_single_p2p(run_count, mixed_nodes, output_file)
else:
connection_to_result = run_single_host_p2p(run_count, full_connections, output_file)
good_hosts = find_good_hosts(connection_to_result)
for host in good_hosts:
if host not in ip_to_runs_passed:
raise ValueError(f"Host {host} not in ip_to_runs_passed")
ip_to_runs_passed[host] += 1
last_results = connection_to_result
bad_hosts = [connection.ip for connection in connections if connection.ip not in good_hosts]
bad_hosts_results = {ip: last_results.get(ip, (BW_ERROR_VALUE, LAT_ERROR_VALUE)) for ip in bad_hosts}
logger.info(f"Bad p2p_hosts {bad_hosts} with results: {bad_hosts_results}")
logger.info(
f"{test} after {run_count} iterations results: {sorted(ip_to_runs_passed.items(), key = lambda item: item[1])}"
)
if output_file:
with output_file.open("a+") as f:
f.write(f"All results for run {run_count}: {connection_to_result}\n")
f.write(f"Bad p2p_hosts {bad_hosts} with results: {bad_hosts_results}\n")
f.write(
f"{test} after {run_count} iterations results: {sorted(ip_to_runs_passed.items(), key=lambda item: item[1])}\n"
)
finally:
for connection in connections:
shutdown_test(connection, "ib_writ[e]_")
# Wait a little after the tests to ensure everything can be cleaned up correctly
time.sleep(5)
logger.info(f"From last run all p2p_hosts results: {last_results}\n")
logger.info(f"Final p2p_host results: {sorted(ip_to_runs_passed.items(), key = lambda item: item[1])}")
if output_file:
with output_file.open("a+") as f:
f.write(f"From last run all p2p_hosts results: {last_results}")
f.write(f"Final p2p_host results: {sorted(ip_to_runs_passed.items(), key=lambda item: item[1])}")
ip_to_metrics = {
ip: {"passes": passes, "count": num_iterations, "ratio": passes / num_iterations}
for ip, passes in ip_to_runs_passed.items()
}
return ip_to_metrics

View File

@@ -0,0 +1,23 @@
import datetime
import os
import torch.distributed as dist
# Have collectives timeout after 10 minutes instead of the default 30 minutes.
DIST_TIMEOUT = datetime.timedelta(minutes=10)
def init_dist() -> None:
"""Initialize distributed process group."""
if "RANK" in os.environ:
# defaults to initializing from environment variables
dist.init_process_group(backend="nccl", timeout=DIST_TIMEOUT)
else:
# this is a dummy singlegpu setup
dist.init_process_group(
backend="nccl", world_size=1, rank=0, init_method="tcp://localhost:12345", timeout=DIST_TIMEOUT
)
def get_local_rank() -> int:
return int(os.environ.get("LOCAL_RANK", 0))

View File

@@ -0,0 +1,23 @@
from contextlib import contextmanager
from threading import Event
from threading import Thread
from typing import Iterator
from uuid import uuid4
def _set_event_after_time(event: Event, seconds: float) -> None:
event.wait(seconds)
event.set()
@contextmanager
def get_expiration_event(seconds: float) -> Iterator[Event]:
event = Event()
# Since we set `daemon=True`, we don't need to join the thread.
Thread(target=_set_event_after_time, args=(event, seconds), name=f"timeout_thread_{uuid4()}", daemon=True).start()
try:
yield event
finally:
event.set()

View File

@@ -0,0 +1,41 @@
from types import TracebackType
from typing import Any
from typing import Dict
from typing import Optional
from typing import Self
from typing import cast
from tblib import Traceback
class FixedTraceback(Traceback):
"""
This class exists mostly to fix a bug in tblib where tb_lasti is not properly initialized.
We don't care about that value, so we just set it to -1.
While I was at it, I also fixed the types for the methods we use, and include the actual traceback when available.
This allows for easier debugging in the normal case that you are raising a regular (not serialized) traceback.
"""
def __init__(self, tb: TracebackType) -> None:
self.full_traceback: Optional[TracebackType] = None
super().__init__(tb)
tb_next = self
while tb_next:
setattr(tb_next, "tb_lasti", -1)
tb_next = tb_next.tb_next
def as_traceback(self) -> Optional[TracebackType]:
if self.full_traceback is not None:
return self.full_traceback
return cast(Optional[TracebackType], super().as_traceback())
@classmethod
def from_tb(cls, tb: TracebackType) -> Self:
result = cls(tb)
result.full_traceback = tb
return result
@classmethod
def from_dict(cls, dct: Dict[str, Any]) -> Self:
return cast(Self, super().from_dict(dct))

View File

@@ -0,0 +1,83 @@
from abc import ABC
from abc import abstractmethod
from copy import deepcopy
from functools import cached_property
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
from typing import Mapping
from typing import NoReturn
from typing import Optional
from typing import Self
from typing import Tuple
from typing import TypeVar
T = TypeVar("T")
TV = TypeVar("TV")
def remove_none(data: Iterable[Optional[T]]) -> List[T]:
return [x for x in data if x is not None]
class FrozenMapping(Mapping[T, TV], ABC):
@abstractmethod
def __hash__(self) -> int:
...
# NOTE: `_key` is not `sorted` because A. not all python objects are sortable and python dictionaries are insertion-ordered.
class _FrozenDict(Dict[T, TV], FrozenMapping[T, TV]):
def _key(self) -> Tuple[Tuple[T, TV], ...]:
return tuple(self.items())
@cached_property
def _hash(self) -> int:
return hash(self._key())
def __hash__(self) -> int: # type: ignore
return self._hash
def _mutation_error(self, method: str) -> RuntimeError:
return RuntimeError(f"Cannot call mutation method {method} on _FrozenDict {self}")
def __setitem__(self, __name: T, __value: TV) -> NoReturn:
raise self._mutation_error("__setitem__")
def __delitem__(self, __name: T) -> NoReturn:
raise self._mutation_error("__delitem__")
def update(self, __m: Mapping[T, TV]) -> NoReturn: # type: ignore
raise self._mutation_error("update")
def setdefault(self, __name: T, __value: TV) -> NoReturn:
raise self._mutation_error("setdefault")
def pop(self, __name: T, __default: TV) -> NoReturn: # type: ignore
raise self._mutation_error("pop")
def popitem(self) -> NoReturn:
raise self._mutation_error("popitem")
def clear(self) -> NoReturn:
raise self._mutation_error("clear")
def __repr__(self) -> str:
return f"_FrozenDict({super().__repr__()})"
def __copy__(self) -> Self:
return type(self)(self)
def __deepcopy__(self, memo: Dict[int, Any]) -> Self:
memo[id(self)] = self
copied_items = ((deepcopy(key, memo), deepcopy(value, memo)) for key, value in self.items())
return type(self)(copied_items)
def __reduce__(self) -> Tuple[Any, ...]:
return (_FrozenDict, (dict(self),))
def freeze_mapping(*args: object, **kwargs: object) -> _FrozenDict:
return _FrozenDict(*args, **kwargs)

View File

@@ -0,0 +1,71 @@
import subprocess
IP = str
import shlex
from typing import Protocol
import attr
def run_local_command(
command: str,
) -> None:
# This call to subprocess.Popen is not robust and is meant to be a placeholder for whatever method
# you use for running arbitrary commands locally.
process = subprocess.Popen(
command.split(" "),
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
stdout, stderr = process.communicate()
return
@attr.s(auto_attribs=True, frozen=True)
class ProcessResult:
returncode: int
output: str
class CommandRunner(Protocol):
def run_command(self, command: str, **kwargs: object) -> ProcessResult:
...
@property
def ip(self) -> IP:
...
@attr.s(auto_attribs=True, frozen=True)
class ContainerSSHConnectionData:
ip: str
port: int
user: str
def run_command(self, command: str) -> None:
escaped_command = shlex.quote(command)
run_local_command(f"ssh {self.user}@{self.ip} -p {self.port} {escaped_command}")
@attr.s(auto_attribs=True, frozen=True)
class RemoteCommandRunner(CommandRunner):
connection: ContainerSSHConnectionData
def run_command(self, command: str, **kwargs: object) -> ProcessResult:
# This is a placeholder for whatever method you use to run commands over ssh
self.connection.run_command(command, is_checked=True)
return ProcessResult(returncode=0, output=str())
@property
def ip(self) -> IP:
return self.connection.ip
def __str__(self) -> str:
return f"{self.connection.ip}:{self.connection.port}"
@attr.s(auto_attribs=True, frozen=True)
class FullConnection:
ssh_connection: CommandRunner
internal_ip: str

View File

@@ -0,0 +1,344 @@
import datetime
import json
from enum import Enum
from importlib.metadata import version
from pathlib import PosixPath
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
from typing import Mapping
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union
from typing import cast
from uuid import UUID
from loguru import logger
from yasoo import Deserializer
from yasoo import Serializer
from yasoo.constants import ENUM_VALUE_KEY
from yasoo.objects import DictWithSerializedKeys
from yasoo.serialization import _convert_to_json_serializable
from yasoo.utils import get_fields
from yasoo.utils import is_obj_supported_primitive
from yasoo.utils import normalize_type
from yasoo.utils import resolve_types
from yasoo.utils import type_to_string
from fixed_traceback import FixedTraceback
from mapping import FrozenMapping
from mapping import freeze_mapping
assert (
version("yasoo") == "0.12.6"
), "This code was written for yasoo 0.12.6 and requires inheriting / monkeypatching the deserializer, so you probably don't want to use any other version without fixing TupleDeserializer"
T = TypeVar("T")
class TupleDeserializer(Deserializer):
def _deserialize(
self,
data: Optional[Union[bool, int, float, str, List[Any], Dict[str, Any]]],
obj_type: Optional[Type[T]],
type_key: Optional[str],
allow_extra_fields: bool,
external_globals: Dict[str, Any],
ignore_custom_deserializer: bool = False,
) -> object:
all_globals = dict(globals())
all_globals.update(external_globals)
if is_obj_supported_primitive(data):
return data
if isinstance(data, list):
list_types = self._get_list_types(obj_type, data)
return tuple([self._deserialize(d, t, type_key, allow_extra_fields, all_globals) for t, d in list_types])
assert isinstance(data, dict), f"Expected a dict, but got {type(data)}"
# load wrapped primitives
if type_key is not None:
type_data = data.get(type_key, None)
if type_data is not None and type_data.startswith("builtins.") and type_data != "builtins.dict":
return data["value"]
obj_type = self._get_object_type(obj_type, data, type_key, all_globals)
if type_key in data:
data.pop(type_key)
real_type, generic_args = normalize_type(obj_type, all_globals)
if external_globals and isinstance(real_type, type):
bases = {real_type}
while bases:
all_globals.update((b.__name__, b) for b in bases)
bases = {ancestor for b in bases for ancestor in b.__bases__}
if not ignore_custom_deserializer:
deserialization_method = self._custom_deserializers.get(
obj_type, self._custom_deserializers.get(real_type)
)
if deserialization_method:
return deserialization_method(data)
for base_class, method in self._inheritance_deserializers.items():
if issubclass(real_type, base_class):
return method(data, real_type)
key_type = None
try:
fields = {f.name: f for f in get_fields(obj_type)}
except TypeError:
if obj_type is FixedTraceback:
return FixedTraceback.from_dict(data["value"])
if issubclass(real_type, Enum):
value = data[ENUM_VALUE_KEY]
if isinstance(value, str):
try:
return real_type[value]
except KeyError:
for e in real_type:
if e.name.lower() == value.lower():
return e
return real_type(value)
elif issubclass(real_type, Mapping):
key_type = generic_args[0] if generic_args else None
if self._is_mapping_dict_with_serialized_keys(key_type, data):
obj_type = DictWithSerializedKeys
fields = {f.name: f for f in get_fields(obj_type)}
value_type = generic_args[1] if generic_args else Any
fields["data"].field_type = Dict[str, value_type] # type: ignore
else:
return self._load_mapping(
data,
real_type,
generic_args,
type_key,
allow_extra_fields,
all_globals,
)
elif issubclass(real_type, Iterable):
# If we got here it means data is not a list, so obj_type came from the data itself and is safe to use
return self._load_iterable(data, obj_type, type_key, allow_extra_fields, all_globals)
elif real_type != obj_type:
return self._deserialize(data, real_type, type_key, allow_extra_fields, external_globals)
else:
raise
self._check_for_missing_fields(data, fields, obj_type)
self._check_for_extraneous_fields(data, fields, obj_type, allow_extra_fields)
self._load_inner_fields(data, fields, type_key, allow_extra_fields, all_globals)
if obj_type is DictWithSerializedKeys:
return self._load_dict_with_serialized_keys(
obj_type(**data), key_type, type_key, allow_extra_fields, all_globals
)
kwargs = {k: v for k, v in data.items() if fields[k].init}
assert obj_type is not None
result = obj_type(**kwargs)
for k, v in data.items():
if k not in kwargs:
setattr(result, k, v)
return result
class FrozenSerializer(Serializer):
def _serialize_iterable(
self,
obj: Iterable[object],
type_key: Any,
fully_qualified_types: Any,
preserve_iterable_types: Any,
stringify_dict_keys: Any,
) -> List[object]:
if isinstance(obj, list):
if self._allow_unsafe_list_serialization:
logger.info(f"Converting list to tuple for serialization: {obj}")
obj = tuple(obj)
else:
raise Exception(
f"Lists are not allowed for serialization. Use tuples instead. Current iterable: {obj}"
)
assert isinstance(
obj, (tuple, frozenset, bytes)
), f"All iterables should be tuples or frozenset. Received {obj}"
return cast(
List[object],
tuple(
self._serialize(
item,
type_key,
fully_qualified_types,
preserve_iterable_types,
stringify_dict_keys,
)
for item in obj
),
)
# overriding this method just to get some better error messages out--previously it would just "type error" and
# moan about things like int64 not being serializable, which is fine, but it is nicer if the key is included
def serialize(
self,
obj: Any,
type_key: Optional[str] = "__type",
fully_qualified_types: bool = True,
preserve_iterable_types: bool = False,
stringify_dict_keys: bool = True,
globals: Optional[Dict[str, Any]] = None,
) -> Optional[Union[bool, int, float, str, list, Dict[str, Any]]]:
if is_obj_supported_primitive(obj):
return obj # type: ignore
if globals:
self._custom_serializers = resolve_types(self._custom_serializers, globals) # type: ignore
result = self._serialize(
obj,
type_key,
fully_qualified_types,
preserve_iterable_types,
stringify_dict_keys,
inner=False,
)
try:
result = _convert_to_json_serializable(result)
except TypeError:
_convert_to_json_serializable_with_better_errors(result)
assert False, "previous method should have raised..."
return result # type: ignore
def _convert_to_json_serializable_with_better_errors(
obj: Any, path: str = ""
) -> Union[int, float, str, list, dict, None]:
if is_obj_supported_primitive(obj):
return obj # type: ignore
if isinstance(obj, Mapping):
return {
key: _convert_to_json_serializable_with_better_errors(value, f"{path}.{key}") for key, value in obj.items()
}
if isinstance(obj, Iterable):
return [_convert_to_json_serializable_with_better_errors(item, f"{path}[{i}]") for i, item in enumerate(obj)]
raise TypeError(f'Found object of type "{type(obj).__name__}" at {path} which cannot be serialized')
SERIALIZER = FrozenSerializer()
SERIALIZER._allow_unsafe_list_serialization = False
DESERIALIZER = TupleDeserializer()
# note: you cannot change this without changing other calls to yasoo, this is its default
TYPE_KEY = "__type"
class SerializationError(Exception):
pass
@SERIALIZER.register()
def serialize_frozen_mapping(data: FrozenMapping) -> Dict:
value = SERIALIZER.serialize(data)
value[TYPE_KEY] = type_to_string(type(data), fully_qualified=True) # type: ignore
return cast(Dict[Any, Any], value)
@DESERIALIZER.register()
def deserialize_frozen_mapping(data: Dict) -> FrozenMapping:
return freeze_mapping(DESERIALIZER.deserialize(data, dict))
@SERIALIZER.register()
def serialize_frozen_set(data: frozenset) -> Dict:
value = SERIALIZER.serialize(tuple(data))
return {"value": value}
@DESERIALIZER.register()
def deserialize_frozen_set(data: Dict) -> frozenset:
return frozenset(DESERIALIZER.deserialize(data["value"], tuple))
@SERIALIZER.register()
def serialize_uuid(data: UUID) -> Dict:
return {"value": data.hex}
@DESERIALIZER.register()
def deserialize_uuid(data: Dict) -> UUID:
return UUID(data["value"])
@SERIALIZER.register()
def serialize_traceback(data: FixedTraceback) -> Dict:
return {"value": data.to_dict()}
@DESERIALIZER.register()
def deserialize_traceback(data: Dict) -> FixedTraceback:
return FixedTraceback.from_dict(data["value"])
@SERIALIZER.register()
def serialize_posix_path(data: PosixPath) -> Dict:
return {"value": str(data)}
@DESERIALIZER.register()
def deserialize_posix_path(data: Dict) -> PosixPath:
return PosixPath(data["value"])
@SERIALIZER.register()
def serialize_datetime(data: datetime.datetime) -> Dict:
return {
"time": data.astimezone(datetime.timezone.utc).timestamp(),
"tzaware": data.tzinfo is not None,
"__type": "datetime.datetime",
}
@DESERIALIZER.register()
def deserialize_datetime(data: Dict) -> datetime.datetime:
return datetime.datetime.fromtimestamp(data["time"], datetime.timezone.utc if data.get("tzaware", None) else None)
def serialize_to_dict(obj: Any) -> Dict[str, Any]:
return cast(Dict[str, Any], SERIALIZER.serialize(obj))
def force_serialize_to_dict(obj: Any) -> Mapping[str, Any]:
"""Forces primitives to become dicts as well by wrapping with a type"""
if obj is None:
return {}
if is_obj_supported_primitive(obj):
return {"value": obj, TYPE_KEY: "builtins." + type(obj).__name__}
return cast(Mapping[str, Any], SERIALIZER.serialize(obj))
def serialize_to_json(obj: Any, indent: Optional[int] = None, sort_keys: bool = False) -> str:
try:
return json.dumps(SERIALIZER.serialize(obj), indent=indent, sort_keys=sort_keys)
except Exception as e:
raise SerializationError(str(e)) from e
def deserialize_from_json(data: str) -> Any:
try:
return DESERIALIZER.deserialize(json.loads(data))
except Exception as e:
raise SerializationError(str(e)) from e
def deserialize_from_dict_with_type(data: Dict[str, Any], obj_type: Type[T]) -> T:
try:
result = DESERIALIZER.deserialize(data, obj_type=obj_type)
assert isinstance(result, obj_type), f"Expected an object of type {obj_type}, but got {result}"
return result
except Exception as e:
raise SerializationError(str(e)) from e
def deserialize_from_json_with_type(data: Union[str, bytes, bytearray], obj_type: Type[T]) -> T:
try:
return deserialize_from_dict_with_type(json.loads(data), obj_type=obj_type)
except Exception as e:
raise SerializationError(str(e)) from e

View File

@@ -0,0 +1,32 @@
import time
from contextlib import contextmanager
from typing import Dict
from typing import Iterator
from typing import List
from typing import Mapping
class Timer(Mapping[str, float]):
def __init__(self) -> None:
self._times: Dict[str, List[float]] = {}
@contextmanager
def __call__(self, name: str) -> Iterator[None]:
start = time.perf_counter()
try:
yield
finally:
end = time.perf_counter()
self._times.setdefault(name, []).append(1000 * (end - start))
def __getitem__(self, name: str) -> float:
if len(self._times[name]) == 1:
return self._times[name][0]
else:
return max(self._times[name][1:])
def __iter__(self) -> Iterator[str]:
return iter(self._times)
def __len__(self) -> int:
return len(self._times)

View File

@@ -0,0 +1,607 @@
"""
Basic procedure for using this script:
When you want to identify ports that have been misbehaving in the UFM event logs, run the subcommand `process-logs`:
```
$ python ufm_events/find_problematic_events.py process-logs ~/actions.jsonl
```
This will produce a file called `actions.jsonl` that contains a list of actions to take to disable the misbehaving ports.
In addition to the UFM event logs, we have found several other sources of information to be helpful:
You can run
```
sudo iblinkinfo | tee IBLINK_OUTPUT_PATH
python ufm_events/find_problematic_events.py get-bad-from-iblinkinfo IBLINK_OUTPUT_PATH OUTPUT_PATH
```
to get bad ports based on the results of the iblinkinfo command.
Similarly, if you have previously performed an IB burn, you can run
```
python ufm_events/find_problematic_events.py get-bad-from-counters
```
on the results of the burn.
"""
from __future__ import annotations
import argparse
import datetime
import functools
import os
import re
import subprocess
import sys
import tempfile
from contextlib import contextmanager
from pathlib import Path
from typing import Dict
from typing import Final
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Self
from typing import Set
from typing import TextIO
from typing import Tuple
from typing import TypeAlias
from typing import TypeVar
from typing import Union
from zoneinfo import ZoneInfo
import attr
import requests
import urllib3
ERROR_FILE: TextIO = sys.stderr
urllib3.disable_warnings()
UFM_LOCAL_IP: Final[str] = "UFM_LOCAL_IP"
UFM_AUTH: Final[Tuple[str, str]] = ("USERNAME", "PASSWORD")
@functools.cache
def get_ports_by_port_id() -> Dict[PortId, Dict]:
result = {}
port_list = requests.get(f"https://{UFM_LOCAL_IP}/ufmRest/resources/ports", auth=UFM_AUTH, verify=False).json()
for port in port_list:
node_description = port["node_description"]
try:
switch, port_str = node_description.split(":")
except ValueError:
# Some node descriptions that do not describe switches do not contain colons.
# For example: "MT1111 ConnectX7 Mellanox Technologies"
continue
port_id: PortId
if port_str.count("/") == 1:
port_id = PortId(switch=switch, port=port_str)
elif port_str.count("/") == 2:
port_str = port_str.removeprefix("1/")
port_id = PortId(switch=switch, port=port_str)
elif port_str.count("/") == 0:
# Some ports now have the number format of ports
port_id = get_portid(switch, int(port_str))
else:
raise ValueError(f"Unexpected port description: {port_str}")
result[port_id] = port
return result
Action: TypeAlias = Union["DisablePortAction", "ReenablePortAction"]
ActionT = TypeVar("ActionT", bound=Action)
R = TypeVar("R")
@attr.s(auto_attribs=True, frozen=True)
class DisablePortAction:
"""An action to disable a port."""
port: PortId
cause: PortRelatedEvent
def __str__(self) -> str:
return f"Disabling {self.port} due to {self.cause}"
@attr.s(auto_attribs=True, frozen=True)
class ReenablePortAction:
"""An action to re-enable a port."""
port: PortId
def __str__(self) -> str:
return f"Re-enabling {self.port}"
ENTRY_REGEX: Final[re.Pattern] = re.compile(
r"^(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+) \[(?P<entry_id>\d+)\] \[(?P<message_code>\d+)\] (?P<level>\w+) \[(?P<topic>\w+)\] (?P<device_type>\w+) \[(?P<device_description>[^\]]+)\]( \[dev_id: (?P<device_id>\w+)\])?: (?P<message>.*)$"
)
def parse_pacific(s: str) -> datetime.datetime:
"""Parse a timestamp and set the timezone to Pacific time.
At the time of writing, the timestamps in the event log are in Pacific time and look like this:
2023-12-20 07:46:24.442
"""
return datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S.%f").replace(tzinfo=ZoneInfo("America/Los_Angeles"))
@attr.s(auto_attribs=True, frozen=True)
class Entry:
timestamp: datetime.datetime
message_code: int
level: str
device_type: str
device_description: str
device_id: Optional[str]
message: str
original_line: str = attr.field(repr=False)
@classmethod
def build_from_line(cls, line: str) -> Self:
maybe_match = ENTRY_REGEX.match(line)
if maybe_match is None:
raise ValueError(f"Line does not match regex: {line}")
return cls(
timestamp=parse_pacific(maybe_match.group("timestamp")),
message_code=int(maybe_match.group("message_code")),
level=maybe_match.group("level"),
device_type=maybe_match.group("device_type"),
device_description=maybe_match.group("device_description"),
device_id=maybe_match.group("device_id"),
message=maybe_match.group("message"),
original_line=line,
)
THRESHOLD_EXCEEDED_CODES: Final[Tuple[int, ...]] = (
110, # Symbol error counter rate
112, # Link downed
113, # Port receive errors
115, # Receive switch relay errors
116, # Transmit discards
)
# Example message: Link went down: (Switch:T3-E21-N-U47:1/17/1)9c0591030090e000:33 - (Switch:T4-E45-L-U51:1/4/2)fc6a1c0300244e00:8, cable S/N: MT2330FT09118
LINK_WENT_DOWN_MESSAGE_PATTERN: Final[re.Pattern] = re.compile(
r"^Link went down: \(Switch:(?P<switch>[^:]+):(1/)?(?P<port>[^)]+)\)[^(]+\(Switch:(?P<peer_switch>[^:]+):(1/)?(?P<peer_port>[^)]+)\)"
)
# Example message: Peer Port T2-E63-L-U37:1/19/2 is considered by SM as unhealthy due to FLAPPING.
PORT_IS_CONSIDERED_UNHEALTHY_MESSAGE_PATTERN: Final[re.Pattern] = re.compile(
r"^Peer Port (?P<switch>[^:]+):(1/)?(?P<port>[^ ]+) is considered by SM as unhealthy due to \w+\."
)
def standardize_ports(ports: List[PortId]) -> List[PortId]:
for i, port in enumerate(ports):
if "/" not in port.port:
ports[i] = get_portid(port.switch, int(port.port))
return ports
@attr.s(auto_attribs=True, frozen=True)
class PortRelatedEvent:
timestamp: datetime.datetime
ports: Tuple[PortId, ...]
original_line: str
def __str__(self) -> str:
return f"{self.timestamp}: {self.original_line} affecting {self.ports}"
@classmethod
def build_from_threshold_exceeded_entry(cls, entry: Entry) -> Self:
assert (
entry.message_code in THRESHOLD_EXCEEDED_CODES
), "Entry must be a threshold exceeded event: {THRESHOLD_EXCEEDED_CODES}"
ports = [PortId.build_from_device_description(entry.device_description)]
try:
ports.append(PortId.build_from_device_description(entry.message.rstrip(".")))
except ValueError:
pass
ports = standardize_ports(ports)
return cls(timestamp=entry.timestamp, ports=tuple(sorted(ports, key=str)), original_line=entry.original_line)
@classmethod
def build_from_link_went_down_entry(cls, entry: Entry) -> Self:
assert entry.message_code == 329, "Entry must be a link went down event"
match = LINK_WENT_DOWN_MESSAGE_PATTERN.match(entry.message)
if match is None:
raise ValueError(f"Message does not match expected pattern: {entry.message}")
ports = [
PortId(switch=match.group("switch"), port=match.group("port")),
PortId(switch=match.group("peer_switch"), port=match.group("peer_port")),
]
ports = standardize_ports(ports)
return cls(timestamp=entry.timestamp, ports=tuple(sorted(ports, key=str)), original_line=entry.original_line)
@classmethod
def build_from_port_considered_unhealthy_entry(cls, entry: Entry) -> Self:
assert entry.message_code == 702, "Entry must be a port is considered unhealthy event"
ports = [PortId.build_from_device_description(entry.device_description)]
match = PORT_IS_CONSIDERED_UNHEALTHY_MESSAGE_PATTERN.match(entry.message)
if match is not None:
ports.append(PortId(switch=match.group("switch"), port=match.group("port")))
ports = standardize_ports(ports)
return cls(timestamp=entry.timestamp, ports=tuple(sorted(ports, key=str)), original_line=entry.original_line)
@classmethod
def build_from_symbol_bit_error_rate_entry(cls, entry: Entry) -> Self:
assert entry.message_code in (917, 918), "Entry must be a symbol bit error rate event"
port_id = PortId.build_from_device_description(entry.device_description)
return cls(timestamp=entry.timestamp, ports=(port_id,), original_line=entry.original_line)
@attr.s(auto_attribs=True, frozen=True)
class PortId:
switch: str
port: str
def __str__(self) -> str:
return f"{self.switch} {self.port}"
def counterpart(self) -> Self:
prefix, port_idx_str = self.port.rsplit("/", 1)
new_port_idx = 1 if int(port_idx_str) == 2 else 2
return type(self)(switch=self.switch, port=f"{prefix}/{new_port_idx}")
@classmethod
def build_from_device_description(cls, device_description: str) -> Self:
try:
_default, switch, port = device_description.split(" / ", 2)
except ValueError as e:
raise ValueError(f"Device description does not match expected format: {device_description}") from e
return cls(switch.removeprefix("Switch: "), port.removeprefix("1/"))
@classmethod
def build_from_informal_description(cls, informal_description: str) -> Self:
"""Sometimes we get back descriptions of ports that look like this:
E17-L-U43 17/1
"""
abbreviated_switch, port = informal_description.split(" ", 1)
# Find the full switch name.
for port_id in get_ports_by_port_id().keys():
if port_id.switch.endswith(abbreviated_switch):
return cls(port_id.switch, port)
raise ValueError(f"Could not find switch with abbreviated name: {abbreviated_switch}")
def read_entries(prune_consecutive=True) -> Tuple[Entry, ...]:
result = []
with download_event_log_locally() as event_path_filename:
with open(event_path_filename, "r") as f:
for line in f.readlines():
try:
result.append(Entry.build_from_line(line))
except Exception:
print(f"Failed to parse line: {line}", file=ERROR_FILE)
if prune_consecutive:
result = prune_many_consecutive_entries(result)
return tuple(result)
BASE_CUTOFF_TIMESTAMP: Final[datetime.datetime] = datetime.datetime(
year=2024, month=6, day=20, hour=0, minute=0, second=0, tzinfo=ZoneInfo("America/Los_Angeles")
)
def should_include(event: PortRelatedEvent, entry: Entry) -> bool:
if "quincy" in entry.message:
return False
if any("Computer" in port_id.switch for port_id in event.ports):
print(f"Skipping event for computer: {entry.original_line.strip()}", file=ERROR_FILE)
return False
if any(read_peer_port_mapping().get(port_id) is None for port_id in event.ports):
print(f"Not skipping event for port without peer: {entry.original_line.strip()}")
return True
MESSAGE_CODES_TO_HANDLE: Final[Tuple[int, ...]] = (
110, # Symbol error counter rate
112, # Link downed
113, # Port receive errors
115, # Receive switch relay errors
116, # Transmit discards
329, # Link went down
702, # Port is considered unhealthy
917, # Symbol bit error rate
918, # Symbol bit error rate warning
)
MESSAGE_CODES_TO_IGNORE: Final[Tuple[int, ...]] = (
64, # GID address out of service
65, # GID address in service
66, # MCast group created
67, # MCast group deleted
328, # Link went up
331, # Node is down
332, # Node is up
336, # Port action disable succeeded
395, # Action get_cables_info started
517, # Fabric health report
527, # UFM CPU usage
603, # UFM non-critical event suppression
604, # Fabric analysis report succeeded
908, # Switch up
1500, # New cable detected
1502, # Cable detected in a new location
# These should be handled at some point.
394, # Switch critical failure
907, # Switch down
920, # Cable Low Temperature Alarm reported
1503, # Duplicate cable detected
)
def latest_port_related_events(entries: Tuple[Entry, ...]) -> Dict[Tuple[PortId, ...], PortRelatedEvent]:
seen_unexpected_message_codes: Set[int] = set()
latest_events_by_ports: Dict[Tuple[PortId, ...], PortRelatedEvent] = {}
for entry in entries:
if entry.timestamp < BASE_CUTOFF_TIMESTAMP:
print(f"Skipping entry before cutoff: {entry.original_line.strip()}", file=ERROR_FILE)
continue
if "Aggregation Node" in entry.message or "Aggregation Node" in entry.device_description:
print(f"Skipping aggregation node event: {entry.original_line.strip()}", file=ERROR_FILE)
continue
if entry.message_code in MESSAGE_CODES_TO_IGNORE:
continue
elif entry.message_code in MESSAGE_CODES_TO_HANDLE:
if entry.message_code == 329:
if "Computer" in entry.message or "Aggregation Node" in entry.message:
continue
event = PortRelatedEvent.build_from_link_went_down_entry(entry)
elif entry.message_code in THRESHOLD_EXCEEDED_CODES:
event = PortRelatedEvent.build_from_threshold_exceeded_entry(entry)
elif entry.message_code == 702:
if "MANUAL" in entry.message:
continue
event = PortRelatedEvent.build_from_port_considered_unhealthy_entry(entry)
elif entry.message_code in (917, 918):
event = PortRelatedEvent.build_from_symbol_bit_error_rate_entry(entry)
else:
raise ValueError(f"Unexpected message code: {entry.message_code}")
if not should_include(event, entry):
continue
existing = latest_events_by_ports.get(event.ports)
if existing is None or entry.timestamp > existing.timestamp:
latest_events_by_ports[event.ports] = event
else:
if entry.message_code not in seen_unexpected_message_codes:
print(f"Unexpected message code: {entry.message_code}: {entry.original_line.strip()}")
seen_unexpected_message_codes.add(entry.message_code)
return latest_events_by_ports
# These numbers are picked somewhat arbitrarily.
MAX_EVENTS_IN_TIMEFRAME = 50
TIMEFRAME_SECONDS = 1
def prune_many_consecutive_entries(entries: List[Entry, ...]) -> List[Entry, ...]:
"""
Some big events in ufm (such as a reboot) cause a lot of events in quick succession.
We expect most of these events to be noise and disregard them.
"""
entries = sorted(entries, key=lambda entry: entry.timestamp)
prev_entries: List[Entry] = [entries[0]]
prev_start_time: datetime = entries[0].timestamp
final_entries = []
for entry in entries:
entry_time = entry.timestamp
if entry_time - prev_start_time < datetime.timedelta(seconds=TIMEFRAME_SECONDS):
prev_entries.append(entry)
prev_start_time = entry_time
else:
if len(prev_entries) < MAX_EVENTS_IN_TIMEFRAME:
final_entries.extend(prev_entries)
else:
print(f"Skipping {len(prev_entries)} entries with start of {prev_start_time}")
prev_entries = [entry]
prev_start_time = entry_time
print(f"Pruned {len(entries) - len(final_entries)} entries, leaving {len(final_entries)} entries.")
return final_entries
def get_actions_from_logs(entries: Tuple[Entry, ...]) -> Tuple[Action, ...]:
latest_events = latest_port_related_events(entries)
results = {}
for ports, event in latest_events.items():
for port in ports:
if port in results:
continue
results[port] = DisablePortAction(
port=port,
cause=event,
)
return tuple(sorted(results.values(), key=str))
def write_action_file(actions: Iterable[Action], filename: str) -> None:
with open(filename, "w") as f:
for action in actions:
f.write(f"{action}\n")
def process_logs(output_filename: str) -> None:
entries = read_entries(prune_consecutive=True)
actions = get_actions_from_logs(entries)
write_action_file(actions, output_filename)
UFM_EVENT_LOG_PATH: Final[str] = "/opt/ufm/log/event.log"
@contextmanager
def download_event_log_locally() -> Iterator[Path]:
with tempfile.TemporaryDirectory() as temp_dir:
event_log_path = Path(temp_dir) / "event.log"
subprocess.run(
(
"scp",
f"host@{UFM_LOCAL_IP}:{UFM_EVENT_LOG_PATH}",
str(event_log_path),
),
check=True,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
yield event_log_path
def convert(s: str) -> PortId:
s = s.replace(":", "-", 1)
switch, port = s.split(":")
if port.startswith("0"):
port = port[1:]
return PortId(switch, port)
@functools.cache
def read_peer_port_mapping() -> Dict[PortId, PortId]:
this_path = Path(__file__)
sid_txt = this_path.parent.parent / "portcheck/inputs/sid.txt"
mapping = {}
with open(sid_txt, "r") as f:
for line in f:
if "IB" in line:
continue
left, right = line.strip().split(" ")
left_pid = convert(left)
right_pid = convert(right)
mapping[left_pid] = right_pid
mapping[right_pid] = left_pid
return mapping
def get_portid(switch: str, num: int) -> PortId:
port_str = f"{(num + 1) // 2}/{num % 2 if num % 2 == 1 else 2}"
portid = PortId(switch, port_str)
return portid
def get_bad_ports_and_peers_from_counters(bad_ports_file: Path, iblinkinfo_file: Path) -> None:
"""
Find bad ports based on their error counters as produced by ibpc and counters.sh on the ufm box
"""
previously_disabled_ports = set(parse_bad_ports(iblinkinfo_file))
with open(bad_ports_file) as f:
lines = f.readlines()
for line in lines[1:]:
guid, lid, device, switch, port_num, active, link_down, tb_sent, tb_recv, stable = line.split()
port = get_portid(switch, int(port_num))
if port in previously_disabled_ports:
print(f"Port {port} is already disabled")
continue
peer_port = read_peer_port_mapping().get(port)
if peer_port is None:
# In our networking setup, T2 switches can directly connect to hosts, so it doesn't make sense
# to disable the "peer" port in this case.
assert (
port.switch.startswith("T2") and int(port.port.split("/")[0]) <= 16
), f"Peer port not found for {port}"
continue
previously_disabled_ports.add(port)
previously_disabled_ports.add(peer_port)
def parse_bad_ports(filename: Path) -> List[PortId]:
"""
Parses a file that contains the output directly from iblinkinfo
"""
pattern = re.compile(r"(\d+)\[\s*\]")
ports = []
current_switch = None
with open(filename) as f:
lines = f.readlines()
for line in lines:
if line.startswith("Switch"):
current_switch = line.split(" ")[-1].strip().replace(":", "")
if "Down/ Polling" in line or "Initialize" in line:
number_group = re.search(pattern, line)
number = int(number_group.group(1))
ports.append(get_portid(current_switch, number))
return ports
def get_ports_with_bad_states_iblinkinfo(
infile: Path, outfile: Path, only_bad_ports: bool = False
) -> None:
"""
Finds ports and peers of ports that are in a bad state (Polling or Initialized).
If only_bad_ports is True, then the infile should have a port and number on each line
Otherwise, the infile should be the output of iblinkinfo
"""
if only_bad_ports:
with open(infile, "r") as f:
port_strs = [line.strip().split() for line in f.readlines()]
ports = [get_portid(switch, int(num)) for switch, num in port_strs]
else:
ports = parse_bad_ports(infile)
date_str = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
to_disable_ports = set()
with open(outfile, "w") as f:
print(f"Found total of {len(ports)} ports in bad state")
# For each port in a bad state, find its peer port
for port in ports:
peer_port = read_peer_port_mapping().get(port)
if peer_port is None:
# Skip the peer port if it's a T2 connected directly to a host
assert (
port.switch.startswith("T2") and int(port.port.split("/")[0]) <= 16
), f"Peer port not found for {port}"
continue
to_disable_ports.add(port)
to_disable_ports.add(peer_port)
f.write(f"{port}\n")
f.write(f"{peer_port}\n")
f.write(f"Found {len(to_disable_ports)} ports and peers of bad ports on {date_str}\n")
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--suppress-errors", action="store_true")
command_parsers = parser.add_subparsers(dest="command")
process_logs_parser = command_parsers.add_parser("process-logs")
process_logs_parser.add_argument("output_filename")
process_logs_parser.set_defaults(func=process_logs)
disable_from_file_parser = command_parsers.add_parser("get-bad-from-iblinkinfo")
disable_from_file_parser.add_argument("--infile")
disable_from_file_parser.add_argument("--outfile")
disable_from_file_parser.add_argument("--only-bad-ports", action="store_true")
disable_from_file_parser.set_defaults(func=get_ports_with_bad_states_iblinkinfo)
disable_from_counters_parser = command_parsers.add_parser("get-bad-from-counters")
disable_from_counters_parser.add_argument("--bad-ports-file")
disable_from_counters_parser.add_argument("--iblinkinfo-file")
disable_from_counters_parser.set_defaults(func=get_bad_ports_and_peers_from_counters)
args = parser.parse_args()
if args.command is None:
parser.print_help()
sys.exit(1)
if args.suppress_errors:
global ERROR_FILE
ERROR_FILE = open(os.devnull, "w")
args_to_ignore = ("suppress_errors", "func", "command")
args.func(**{k: v for k, v in vars(args).items() if k not in args_to_ignore})
if __name__ == "__main__":
main()