import requests import json import time import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Proxmox API details hosts = ['https://10.0.0.5:8006'] hostnames = ['pve'] username = 'root@pam' password = 'mypasswordhere' vmids = [100, 101, 102, 103, 104, 105, 106, 107] #, 101, 102, 103, 104, 105, 106, 107] # Replace with your VMIDs # Function to get the authentication ticket and CSRF token def get_auth_ticket(username, password, host): url = f"{host}/api2/json/access/ticket" data = { 'username': username, 'password': password } print(f"Connecting to {host}") response = requests.post(url, data=data, verify=False) #print(response.json()) result = response.json() return result['data']['ticket'], result['data']['CSRFPreventionToken'] # Function to get the latest snapshot for a VM def get_latest_snapshot(vm_id, ticket, csrf_token, host, node): if not check_server(vm_id, ticket, csrf_token, host, node): return None url = f"{host}/api2/json/nodes/{node}/qemu/{vm_id}/snapshot" headers = { 'Cookie': f"PVEAuthCookie={ticket}" } response = requests.get(url, headers=headers, verify=False) snapshots = response.json()['data'] if snapshots: snapshots = [s for s in snapshots if 'snaptime' in s] #print(response.json()['data']) else: return None if snapshots: latest_snapshot = max(snapshots, key=lambda x: x['snaptime']) return latest_snapshot['name'] return None # Function to restore a snapshot for a VM def restore_snapshot(vm_id, snapshot_name, ticket, csrf_token, host, node): url = f"{host}/api2/json/nodes/{node}/qemu/{vm_id}/snapshot/{snapshot_name}/rollback?start=1" headers = { 'Cookie': f"PVEAuthCookie={ticket}", 'CSRFPreventionToken': csrf_token } urla = f"{host}/api2/json/cluster/ha/resources/vm:{vm_id}?state=ignored" response = requests.put(urla, headers=headers, verify=False) print(f"Disabled HA for VM {vm_id}") time.sleep(1) # wait for HA to stop response = requests.post(url, headers=headers, verify=False) if response.status_code == 200: print(f"VM {vm_id}: Restoring snapshot '{snapshot_name}'") #print(response.json()['data']) return response.json()['data'] else: print(f"VM {vm_id}: Failed to restore snapshot '{snapshot_name}'") return None # Function to check the status of a task def check_task_status(node, task_id, ticket, host): url = f"{host}/api2/json/nodes/{node}/tasks/{task_id}/status" headers = { 'Cookie': f"PVEAuthCookie={ticket}" } response = requests.get(url, headers=headers, verify=False) #print(response.json()) return response.json()['data']['status'] == 'stopped' # Function to wait for all tasks to complete def wait_for_tasks_to_complete(tasks, ticket, host): print("Waiting for rollbacks to complete...") while tasks: time.sleep(3) for node, task_id in tasks.copy(): #print("Checking task", task_id) if check_task_status(node, task_id, ticket, host): print(f"Task {task_id} on node {node} completed") tasks.remove((node, task_id)) print("All VMs rolled back.") def enable_ha(vm_id, ticket, csrf_token, host): headers = { 'Cookie': f"PVEAuthCookie={ticket}", 'CSRFPreventionToken': csrf_token } urla = f"{host}/api2/json/cluster/ha/resources/vm:{vm_id}?state=started" response = requests.put(urla, headers=headers, verify=False) print(f"HA re-enabled for VMID {vm_id}") def vm_snapshot(station=0): if station > 0: # individual station restore vm_id = 99 + station for host in hosts: tasks = [] ticket, csrf_token = get_auth_ticket(username, password, host) for node in hostnames: latest_snapshot = get_latest_snapshot(vm_id, ticket, csrf_token, host, node) if latest_snapshot: task = restore_snapshot(vm_id, latest_snapshot, ticket, csrf_token, host, node) if task: tasks.append((node, task)) else: for host in hosts: tasks = [] ticket, csrf_token = get_auth_ticket(username, password, host) for node in hostnames: for vm_id in vmids: latest_snapshot = get_latest_snapshot(vm_id, ticket, csrf_token, host, node) if latest_snapshot: task = restore_snapshot(vm_id, latest_snapshot, ticket, csrf_token, host, node) if task: tasks.append((node, task)) wait_for_tasks_to_complete(tasks, ticket, host) for vm_id in vmids: enable_ha(vm_id, ticket, csrf_token, host) def check_server(vm_id, ticket, csrf_token, host, node): headers = { 'Cookie': f"PVEAuthCookie={ticket}", 'CSRFPreventionToken': csrf_token } url = f"{host}/api2/json/nodes/{node}/qemu" response = requests.get(url, headers=headers, verify=False) #print("Server check:", response.json()) for entry in response.json()['data']: if entry['vmid'] == vm_id: return True return False def check_online(vm_id, ticket, csrf_token, host, node): # check if VM is booted by seeing if the guest agent is running headers = { 'Cookie': f"PVEAuthCookie={ticket}", 'CSRFPreventionToken': csrf_token } url = f"{host}/api2/json/nodes/{node}/qemu/{vm_id}/agent/ping" response = requests.post(url, headers=headers, verify=False) #print(response.json()) return response.json()['data'] is not None