make device_capabilities async running on a thread pool

This commit is contained in:
Alex Cheema
2024-12-16 21:17:30 +00:00
parent 036224f877
commit 1b14be6013
8 changed files with 96 additions and 63 deletions

View File

@@ -325,4 +325,29 @@ async def shutdown(signal, loop, server):
def is_frozen():
return getattr(sys, 'frozen', False) or os.path.basename(sys.executable) == "exo" \
or ('Contents/MacOS' in str(os.path.dirname(sys.executable))) \
or '__nuitka__' in globals() or getattr(sys, '__compiled__', False)
or '__nuitka__' in globals() or getattr(sys, '__compiled__', False)
async def get_mac_system_info() -> Tuple[str, str, int]:
"""Get Mac system information using system_profiler."""
try:
output = await asyncio.get_running_loop().run_in_executor(
subprocess_pool,
lambda: subprocess.check_output(["system_profiler", "SPHardwareDataType"]).decode("utf-8")
)
model_line = next((line for line in output.split("\n") if "Model Name" in line), None)
model_id = model_line.split(": ")[1] if model_line else "Unknown Model"
chip_line = next((line for line in output.split("\n") if "Chip" in line), None)
chip_id = chip_line.split(": ")[1] if chip_line else "Unknown Chip"
memory_line = next((line for line in output.split("\n") if "Memory" in line), None)
memory_str = memory_line.split(": ")[1] if memory_line else "Unknown Memory"
memory_units = memory_str.split()
memory_value = int(memory_units[0])
memory = memory_value * 1024 if memory_units[1] == "GB" else memory_value
return model_id, chip_id, memory
except Exception as e:
if DEBUG >= 2: print(f"Error getting Mac system info: {e}")
return "Unknown Model", "Unknown Chip", 0

View File

@@ -40,7 +40,7 @@ class TailscaleDiscovery(Discovery):
self.update_task = None
async def start(self):
self.device_capabilities = device_capabilities()
self.device_capabilities = await device_capabilities()
self.discovery_task = asyncio.create_task(self.task_discover_peers())
self.cleanup_task = asyncio.create_task(self.task_cleanup_peers())
self.update_task = asyncio.create_task(self.task_update_device_posture_attributes())

View File

@@ -64,7 +64,7 @@ class UDPDiscovery(Discovery):
self.cleanup_task = None
async def start(self):
self.device_capabilities = device_capabilities()
self.device_capabilities = await device_capabilities()
self.broadcast_task = asyncio.create_task(self.task_broadcast_presence())
self.listen_task = asyncio.create_task(self.task_listen_for_peers())
self.cleanup_task = asyncio.create_task(self.task_cleanup_peers())

View File

@@ -8,7 +8,7 @@ from typing import List, Dict, Optional, Tuple, Union, Set
from exo.networking import Discovery, PeerHandle, Server
from exo.inference.inference_engine import InferenceEngine, Shard
from exo.topology.topology import Topology
from exo.topology.device_capabilities import device_capabilities
from exo.topology.device_capabilities import device_capabilities, UNKNOWN_DEVICE_CAPABILITIES
from exo.topology.partitioning_strategy import Partition, PartitioningStrategy, map_partitions_to_shards
from exo import DEBUG
from exo.helpers import AsyncCallbackSystem
@@ -37,7 +37,7 @@ class Node:
self.partitioning_strategy = partitioning_strategy
self.peers: List[PeerHandle] = {}
self.topology: Topology = Topology()
self.device_capabilities = device_capabilities()
self.device_capabilities = UNKNOWN_DEVICE_CAPABILITIES
self.buffered_token_output: Dict[str, Tuple[List[int], bool]] = {}
self.buffered_logits: Dict[str, List[np.ndarray]] = {}
self.buffered_inputs: Dict[str, List[np.ndarray]] = {}
@@ -56,6 +56,7 @@ class Node:
self.outstanding_requests = {}
async def start(self, wait_for_peers: int = 0) -> None:
self.device_capabilities = await device_capabilities()
await self.server.start()
await self.discovery.start()
await self.update_peers(wait_for_peers)

View File

@@ -1,6 +1,7 @@
import unittest
from unittest.mock import Mock, AsyncMock
import numpy as np
import pytest
from .node import Node
from exo.networking.peer_handle import PeerHandle
@@ -55,3 +56,11 @@ class TestNode(unittest.IsolatedAsyncioTestCase):
await self.node.process_tensor(input_tensor, None)
self.node.inference_engine.process_shard.assert_called_once_with(input_tensor)
@pytest.mark.asyncio
async def test_node_capabilities():
node = Node()
await node.initialize()
caps = await node.get_device_capabilities()
assert caps is not None
assert caps.model != ""

View File

@@ -3,6 +3,8 @@ from pydantic import BaseModel
from exo import DEBUG
import subprocess
import psutil
import asyncio
from exo.helpers import get_mac_system_info, subprocess_pool
TFLOPS = 1.00
@@ -144,11 +146,11 @@ CHIP_FLOPS.update({f"{key} LAPTOP GPU": value for key, value in CHIP_FLOPS.items
CHIP_FLOPS.update({f"{key} Laptop GPU": value for key, value in CHIP_FLOPS.items()})
def device_capabilities() -> DeviceCapabilities:
async def device_capabilities() -> DeviceCapabilities:
if psutil.MACOS:
return mac_device_capabilities()
return await mac_device_capabilities()
elif psutil.LINUX:
return linux_device_capabilities()
return await linux_device_capabilities()
else:
return DeviceCapabilities(
model="Unknown Device",
@@ -158,27 +160,18 @@ def device_capabilities() -> DeviceCapabilities:
)
def mac_device_capabilities() -> DeviceCapabilities:
# Fetch the model of the Mac using system_profiler
model = subprocess.check_output(["system_profiler", "SPHardwareDataType"]).decode("utf-8")
model_line = next((line for line in model.split("\n") if "Model Name" in line), None)
model_id = model_line.split(": ")[1] if model_line else "Unknown Model"
chip_line = next((line for line in model.split("\n") if "Chip" in line), None)
chip_id = chip_line.split(": ")[1] if chip_line else "Unknown Chip"
memory_line = next((line for line in model.split("\n") if "Memory" in line), None)
memory_str = memory_line.split(": ")[1] if memory_line else "Unknown Memory"
memory_units = memory_str.split()
memory_value = int(memory_units[0])
if memory_units[1] == "GB":
memory = memory_value*1024
else:
memory = memory_value
# Assuming static values for other attributes for demonstration
return DeviceCapabilities(model=model_id, chip=chip_id, memory=memory, flops=CHIP_FLOPS.get(chip_id, DeviceFlops(fp32=0, fp16=0, int8=0)))
async def mac_device_capabilities() -> DeviceCapabilities:
model_id, chip_id, memory = await get_mac_system_info()
return DeviceCapabilities(
model=model_id,
chip=chip_id,
memory=memory,
flops=CHIP_FLOPS.get(chip_id, DeviceFlops(fp32=0, fp16=0, int8=0))
)
def linux_device_capabilities() -> DeviceCapabilities:
async def linux_device_capabilities() -> DeviceCapabilities:
import psutil
from tinygrad import Device

View File

@@ -1,8 +1,10 @@
from abc import ABC, abstractmethod
from typing import List
from typing import List, Dict
from dataclasses import dataclass
from .topology import Topology
from exo.inference.shard import Shard
from exo.topology.device_capabilities import device_capabilities
import asyncio
# Partitions shard-space into pieces of contiguous shards, represented by floating point range [start, end) between 0 and 1

View File

@@ -1,11 +1,11 @@
import unittest
import pytest
from unittest.mock import patch
from exo.topology.device_capabilities import mac_device_capabilities, DeviceCapabilities, DeviceFlops, TFLOPS
from exo.topology.device_capabilities import mac_device_capabilities, DeviceCapabilities, DeviceFlops, TFLOPS, device_capabilities
class TestMacDeviceCapabilities(unittest.TestCase):
@patch("subprocess.check_output")
def test_mac_device_capabilities_pro(self, mock_check_output):
@pytest.mark.asyncio
@patch("subprocess.check_output")
async def test_mac_device_capabilities_pro(mock_check_output):
# Mock the subprocess output
mock_check_output.return_value = b"""
Hardware:
@@ -27,20 +27,19 @@ Activation Lock Status: Enabled
"""
# Call the function
result = mac_device_capabilities()
result = await mac_device_capabilities()
# Check the results
self.assertIsInstance(result, DeviceCapabilities)
self.assertEqual(result.model, "MacBook Pro")
self.assertEqual(result.chip, "Apple M3 Max")
self.assertEqual(result.memory, 131072) # 16 GB in MB
self.assertEqual(
str(result),
"Model: MacBook Pro. Chip: Apple M3 Max. Memory: 131072MB. Flops: 14.20 TFLOPS, fp16: 28.40 TFLOPS, int8: 56.80 TFLOPS",
)
assert isinstance(result, DeviceCapabilities)
assert result.model == "MacBook Pro"
assert result.chip == "Apple M3 Max"
assert result.memory == 131072 # 128 GB in MB
assert str(result) == "Model: MacBook Pro. Chip: Apple M3 Max. Memory: 131072MB. Flops: 14.20 TFLOPS, fp16: 28.40 TFLOPS, int8: 56.80 TFLOPS"
@patch("subprocess.check_output")
def test_mac_device_capabilities_air(self, mock_check_output):
@pytest.mark.asyncio
@patch("subprocess.check_output")
async def test_mac_device_capabilities_air(mock_check_output):
# Mock the subprocess output
mock_check_output.return_value = b"""
Hardware:
@@ -62,30 +61,34 @@ Activation Lock Status: Disabled
"""
# Call the function
result = mac_device_capabilities()
result = await mac_device_capabilities()
# Check the results
self.assertIsInstance(result, DeviceCapabilities)
self.assertEqual(result.model, "MacBook Air")
self.assertEqual(result.chip, "Apple M2")
self.assertEqual(result.memory, 8192) # 8 GB in MB
assert isinstance(result, DeviceCapabilities)
assert result.model == "MacBook Air"
assert result.chip == "Apple M2"
assert result.memory == 8192 # 8 GB in MB
@unittest.skip("Unskip this test when running on a MacBook Pro, Apple M3 Max, 128GB")
def test_mac_device_capabilities_real(self):
@pytest.mark.skip(reason="Unskip this test when running on a MacBook Pro, Apple M3 Max, 128GB")
@pytest.mark.asyncio
async def test_mac_device_capabilities_real():
# Call the function without mocking
result = mac_device_capabilities()
result = await mac_device_capabilities()
# Check the results
self.assertIsInstance(result, DeviceCapabilities)
self.assertEqual(result.model, "MacBook Pro")
self.assertEqual(result.chip, "Apple M3 Max")
self.assertEqual(result.memory, 131072) # 128 GB in MB
self.assertEqual(result.flops, DeviceFlops(fp32=14.20*TFLOPS, fp16=28.40*TFLOPS, int8=56.80*TFLOPS))
self.assertEqual(
str(result),
"Model: MacBook Pro. Chip: Apple M3 Max. Memory: 131072MB. Flops: 14.20 TFLOPS, fp16: 28.40 TFLOPS, int8: 56.80 TFLOPS",
)
assert isinstance(result, DeviceCapabilities)
assert result.model == "MacBook Pro"
assert result.chip == "Apple M3 Max"
assert result.memory == 131072 # 128 GB in MB
assert result.flops == DeviceFlops(fp32=14.20*TFLOPS, fp16=28.40*TFLOPS, int8=56.80*TFLOPS)
assert str(result) == "Model: MacBook Pro. Chip: Apple M3 Max. Memory: 131072MB. Flops: 14.20 TFLOPS, fp16: 28.40 TFLOPS, int8: 56.80 TFLOPS"
if __name__ == "__main__":
unittest.main()
@pytest.mark.asyncio
async def test_device_capabilities():
caps = await device_capabilities()
assert caps.model != ""
assert caps.chip != ""
assert caps.memory > 0
assert caps.flops is not None