import libvirt
import uuid
import random
import ipaddress
import shutil
import os
class KVMManager:
def __init__(self): # 定义类的引用self接口: __init__ 内置初始化函数 被自定义名字 self 继承; 自定执行,不需要在其它函数调用
self.conn = self.connect_to_libvirt() # 定义: self 的属性成员 conn 的表达方式 由 connect_to_libvirt 函数里面定义得来
# 下次使用 self.conn 函数为初始化链接 qemu
def connect_to_libvirt(self):
try: # try 代码块 用于捕获可能发生的异常
conn = libvirt.open('qemu:///system')
if conn is None:
raise Exception('Failed to connect to hypervisor')
# raise 关键字: 抛出异常, 异常的方法 Exception 是一个字符串
return conn # 如果连接成功,返回连接对象 conn,赋值给 self.conn
except libvirt.libvirtError as e: # except 捕捉异常并处理它;libvirt 是kvm虚拟化api模块;libvirtError 异常类: 错误信息变量
print(f'Connection failed: {e}')
exit(1) # 程序退出的条件是 出现异常1 判断的是 当前代码块 except libvirt.libvirtError as e
def generate_mac(self):
"""Generate a random MAC address."""
mac = [0x52, 0x54, 0x00] + [random.randint(0x00, 0x7f) for _ in range(3)]
return ':'.join(map(lambda x: f'{x:02x}', mac))
def generate_ip(self, subnet):
"""Generate a random IP address in the given subnet."""
network = ipaddress.ip_network(subnet, strict=False)
return str(random.choice(list(network.hosts())))
def ensure_dir_exists(self, directory):
"""Ensure that the directory exists, creating it if necessary."""
if not os.path.exists(directory):
os.makedirs(directory)
def copy_image(self, source_path, dest_dir, new_name):
"""Copy the VM image to the destination directory with a new filename."""
self.ensure_dir_exists(dest_dir)
destination_path = os.path.join(dest_dir, new_name)
try:
shutil.copy(source_path, destination_path)
print(f'Image copied to {destination_path}')
return destination_path
except IOError as e:
print(f'Failed to copy image: {e}')
return None
def create_vm(self, name, memory, vcpu, source_image_path, subnet):
dest_dir = '/datadisk/vm/pyauto'
new_filename = 'debian12-01.qcow2'
destination_image_path = self.copy_image(source_image_path, dest_dir, new_filename)
if destination_image_path is None:
print(f'Failed to copy image. VM creation aborted.')
return
mac_address = self.generate_mac()
ip_address = self.generate_ip(subnet)
xml_config = f"""
<domain type='kvm'>
<name>{name}</name>
<memory unit='KiB'>{memory}</memory>
<vcpu placement='static'>{vcpu}</vcpu>
<os>
<type arch='x86_64' machine='pc-i440fx-2.9'>hvm</type>
</os>
<devices>
<disk type='file' device='disk'>
<driver name='qemu' type='qcow2'/>
<source file='{destination_image_path}'/>
<target dev='vda' bus='virtio'/>
<address type='pci' domain='0x0000' bus='0x00' slot='0x04' function='0x0'/>
</disk>
<interface type='network'>
<mac address='{mac_address}'/>
<source network='default'/>
<model type='virtio'/>
<address type='pci' domain='0x0000' bus='0x00' slot='0x03' function='0x0'/>
</interface>
<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'/>
</devices>
<metadata>
<network>
<ip address='{ip_address}' netmask='255.255.255.0'/>
</network>
</metadata>
</domain>
"""
try:
self.conn.defineXML(xml_config)
print(f'VM {name} created successfully with MAC {mac_address} and IP {ip_address}')
except libvirt.libvirtError as e:
print(f'Failed to create VM: {e}')
def start_vm(self, name):
try:
vm = self.conn.lookupByName(name)
vm.create()
print(f'VM {name} started successfully')
except libvirt.libvirtError as e:
print(f'Failed to start VM: {e}')
def stop_vm(self, name):
try:
vm = self.conn.lookupByName(name)
vm.shutdown()
print(f'VM {name} shutdown request sent')
except libvirt.libvirtError as e:
print(f'Failed to stop VM: {e}')
def delete_vm(self, name):
try:
vm = self.conn.lookupByName(name)
vm.undefine()
print(f'VM {name} deleted successfully')
except libvirt.libvirtError as e:
print(f'Failed to delete VM: {e}')
def list_vms(self):
try:
vms = self.conn.listAllDomains()
for vm in vms:
print(f'VM Name: {vm.name()}')
except libvirt.libvirtError as e:
print(f'Failed to list VMs: {e}')
def close(self):
self.conn.close()
if __name__ == '__main__':
manager = KVMManager()
# 示例操作
manager.create_vm('test-vm', '2048000', '2', '/datadisk/myfolder/iso/qcow2/linux/debian12-username-eisc-000000.qcow2', '192.168.122.0/24')
manager.start_vm('test-vm')
manager.list_vms()
manager.stop_vm('test-vm')
manager.delete_vm('test-vm')
manager.close()
'''
python312 -m pip install --upgrade pip # 更新pip 安装工具
pip install libvirt-python # 安装kvm api 库
'''Powered by ddoss.cn 12.0
©2015 - 2025 ddoss
渝公网安备50011302222260号
渝ICP备2024035333号
【实验平台安全承诺书】
小绿叶技术社区,优化网络中,点击查看配置信息
主机监控系统: 安全防火墙已开启检查cc攻击-下载文件完成后等待10s 恢复访问,检查连接数低于峰值恢复访问
您的IP:216.73.216.110,2025-12-01 14:26:05,Processed in 0.01047 second(s).