mirror of
https://github.com/xlang-ai/OSWorld.git
synced 2024-04-29 12:26:03 +03:00
144 lines
4.9 KiB
Python
144 lines
4.9 KiB
Python
import os
|
|
import platform
|
|
import subprocess
|
|
import requests
|
|
from tqdm import tqdm
|
|
import zipfile
|
|
from time import sleep
|
|
|
|
import socket
|
|
|
|
# Define the path to the virtual machine
|
|
VM_PATH = r"Ubuntu\Ubuntu.vmx" # change this to the path of your downloaded virtual machine
|
|
MAX_RETRY_TIMES = 10
|
|
|
|
|
|
def download_and_unzip_vm():
|
|
# Determine the platform and CPU architecture to decide the correct VM image to download
|
|
if platform.machine() == 'arm64': # macOS with Apple Silicon
|
|
url = "https://huggingface.co/datasets/xlangai/ubuntu_arm/resolve/main/Ubuntu.zip"
|
|
elif platform.machine().lower() in ['amd64', "x86_64"]:
|
|
url = "https://huggingface.co/datasets/xlangai/ubuntu_x86/resolve/main/Ubuntu.zip"
|
|
else:
|
|
raise Exception("Unsupported platform or architecture")
|
|
|
|
# Download the virtual machine image
|
|
print("Downloading the virtual machine image...")
|
|
filename = "Ubuntu.zip"
|
|
downloaded_size = 0
|
|
|
|
while True:
|
|
headers = {}
|
|
if os.path.exists(filename):
|
|
downloaded_size = os.path.getsize(filename)
|
|
headers["Range"] = f"bytes={downloaded_size}-"
|
|
|
|
with requests.get(url, headers=headers, stream=True) as response:
|
|
response.raise_for_status()
|
|
total_size = int(response.headers.get('content-length', 0))
|
|
|
|
with open(filename, "ab") as file, tqdm(
|
|
desc="Progress",
|
|
total=total_size,
|
|
unit='iB',
|
|
unit_scale=True,
|
|
unit_divisor=1024,
|
|
initial=downloaded_size,
|
|
ascii=True
|
|
) as progress_bar:
|
|
try:
|
|
for data in response.iter_content(chunk_size=1024):
|
|
size = file.write(data)
|
|
progress_bar.update(size)
|
|
except (requests.exceptions.RequestException, IOError) as e:
|
|
print(f"Download error: {e}")
|
|
sleep(1) # Wait for 1 second before retrying
|
|
print("Retrying...")
|
|
else:
|
|
print("Download succeeds.")
|
|
break # Download completed successfully
|
|
|
|
# Unzip the downloaded file
|
|
print("Unzipping the downloaded file...☕️")
|
|
current_directory = os.getcwd()
|
|
with zipfile.ZipFile('Ubuntu.zip', 'r') as zip_ref:
|
|
zip_ref.extractall(current_directory)
|
|
print("Files have been successfully extracted to the current working directory:", current_directory)
|
|
|
|
|
|
# Execute the function to download and unzip the VM
|
|
if not os.path.exists(VM_PATH):
|
|
download_and_unzip_vm()
|
|
else:
|
|
print(f"Virtual machine exists: {VM_PATH}")
|
|
|
|
|
|
# Determine the platform of the host machine and decide the parameter for vmrun
|
|
def get_vmrun_type():
|
|
if platform.system() == 'Windows' or platform.system() == 'Linux':
|
|
return '-T ws'
|
|
elif platform.system() == 'Darwin': # Darwin is the system name for macOS
|
|
return '-T fusion'
|
|
else:
|
|
raise Exception("Unsupported operating system")
|
|
|
|
|
|
# Start the virtual machine
|
|
subprocess.run(f'vmrun {get_vmrun_type()} start "{VM_PATH}"', shell=True)
|
|
print("Starting virtual machine...")
|
|
|
|
# Get the IP address of the virtual machine
|
|
for i in range(MAX_RETRY_TIMES):
|
|
get_vm_ip = subprocess.run(f'vmrun {get_vmrun_type()} getGuestIPAddress "{VM_PATH}" -wait', shell=True,
|
|
capture_output=True,
|
|
text=True)
|
|
if "Error" in get_vm_ip.stdout:
|
|
print("Retry on getting IP")
|
|
continue
|
|
print("Virtual machine IP address:", get_vm_ip.stdout.strip())
|
|
break
|
|
|
|
vm_ip = get_vm_ip.stdout.strip()
|
|
|
|
|
|
def is_url_accessible(url, timeout=1):
|
|
try:
|
|
response = requests.head(url, timeout=timeout)
|
|
return response.status_code == 200
|
|
except requests.exceptions.RequestException:
|
|
return False
|
|
|
|
|
|
print("--------------------------------")
|
|
url = f"http://{vm_ip}:5000/screenshot"
|
|
ckeck_url = is_url_accessible(url)
|
|
print(f"check url: {url} | is accessible: {ckeck_url}")
|
|
print("--------------------------------")
|
|
|
|
|
|
# Function used to check whether the virtual machine is ready
|
|
def download_screenshot(ip):
|
|
url = f"http://{ip}:5000/screenshot"
|
|
try:
|
|
# max trey times 1, max timeout 1
|
|
response = requests.get(url, timeout=(1, 1))
|
|
if response.status_code == 200:
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
print(f"Type: {type(e).__name__}")
|
|
print(f"Error detail: {str(e)}")
|
|
sleep(2)
|
|
return False
|
|
|
|
|
|
# Try downloading the screenshot until successful
|
|
while not download_screenshot(vm_ip):
|
|
print("Check whether the virtual machine is ready...")
|
|
|
|
print("Virtual machine is ready. Start to make a snapshot on the virtual machine. It would take a while...")
|
|
|
|
# Create a snapshot of the virtual machine
|
|
subprocess.run(f'vmrun {get_vmrun_type()} snapshot "{VM_PATH}" "init_state"', shell=True)
|
|
print("Snapshot created.")
|