CUDA_test/CUDA.py

324 lines
12 KiB
Python

import time
import zlib
import sys
import random
import multiprocessing
import subprocess
import os
import torch
import datetime
import logging
# Configure logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__file__)
class CudaStresser:
"""
A class to stress test CUDA-enabled GPUs by performing various operations
to measure GPU memory bandwidth and stress the device.
"""
def __init__(self, load_perc: float = 0.99):
"""
Initialize CUDA device information and setup.
Parameters:
- load_perc (float): Desired VRAM percentage usage.
"""
self.datatypes = [torch.float, torch.double, torch.half]
self.datavelocity = [14, 18, 12, 1, 2, 4, 8]
self.load_perc = self._clamp(load_perc, 0.01, 0.99)
self.devices_info = self._initialize_devices()
@staticmethod
def _clamp(n: float, minn: float, maxn: float) -> float:
"""
Clamp a value to be within a specified range.
Parameters:
- n (int): The value to clamp.
- minn (int): Minimum allowed value.
- maxn (int): Maximum allowed value.
Returns:
- int: The clamped value.
"""
clamped = max(min(maxn, n), minn)
if clamped != n:
logger.warning(f'The argument value {n} was clamped to {clamped}')
return clamped
def _initialize_devices(self) -> list:
"""
Initialize and gather information about CUDA devices.
Returns:
- list: A list of tuples containing (device, total_memory) for each CUDA device.
"""
devices_info = []
if torch.cuda.device_count() < 1:
logger.error("No CUDA devices detected.")
raise RuntimeError("No CUDA devices detected.")
for i in range(torch.cuda.device_count()):
device, total_memory = self._cuda_check(i)
devices_info.append((device, total_memory))
logger.info(f"Device {i}: {device}, Total memory: {total_memory}")
return devices_info
@staticmethod
def _cuda_check(device_no: int) -> tuple:
"""
Check CUDA device properties.
Parameters:
- device_no (int): The index of the CUDA device.
Returns:
- tuple: Device and total memory of the CUDA device.
"""
if torch.cuda.is_available():
device = torch.device(f"cuda:{device_no}")
else:
device = torch.device("cpu")
total_memory = torch.cuda.get_device_properties(device_no).total_memory
torch.cuda.set_device(device_no)
return device, total_memory
@staticmethod
def _log_gpu_info(pipe: multiprocessing.Pipe, poll_time: int) -> None:
"""
Log GPU utilization and temperature information periodically.
Parameters:
- pipe (multiprocessing.Pipe): Pipe for communication with the main process.
- poll_time (int): Time between logs in seconds.
"""
gpu_log = []
while True:
process = subprocess.Popen(
['powershell', '-Command', 'nvidia-smi -q | Select-String -Pattern "Utilization" -Context 0,6 |'
' findstr /C:"Gpu" /C:"Memory"; nvidia-smi -q | findstr /C:"GPU Current Temp"'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, errors = process.communicate()
lines = output.decode().split('\r\n')
log = {
"Timestamp": datetime.datetime.now().isoformat(),
"GPU Utilization": int(lines[0].split(':')[-1].strip().replace('%', '')),
"Memory Utilization": int(lines[1].split(':')[-1].strip().replace('%', '')),
"GPU Temperature": int(lines[2].split(':')[-1].strip().split()[0])
}
gpu_log.append(log)
if pipe.poll(0): # Check if something is in the pipe.
if pipe.recv():
pipe.send(gpu_log)
time.sleep(poll_time - 1)
@staticmethod
def _progress_bar(count_value: int, total: int, suffix: str = '') -> None:
"""
Display a progress bar in the console.
Parameters:
- count_value (int): Current count.
- total (int): Total count.
- suffix (str): Additional suffix to display.
"""
bar_length = 20
filled_length = int(round(bar_length * count_value / float(total)))
percentage = round(100.0 * count_value / float(total), 1)
bar = '=' * filled_length + '-' * (bar_length - filled_length)
sys.stdout.write(f'[{bar}] {percentage}% ... {suffix}\r')
sys.stdout.flush()
def cuda_stress(self, timing: int = 60, tensor_num: int = 1000, poll_time: int = 5) -> list:
"""
Stress test CUDA cores by creating and manipulating tensors.
Parameters:
- timing (int): Duration of the test in seconds.
- tensor_num (int): Number of tensors to create.
- poll_time (int): Interval for logging GPU data in seconds.
Returns:
- list: GPU log as a list of dictionaries.
"""
# Clamp arguments to valid ranges
try:
timing = int(self._clamp(timing, 1, 60 * 60 * 24 * 7))
poll_time = int(self._clamp(poll_time, 1, 600))
tensor_num = int(self._clamp(tensor_num, 2, 1000))
except Exception as e:
return ["Arguments processing FAILED", str(e)]
tensors = []
numero = 0
killer = 0
try:
for device, total_memory in self.devices_info:
free_memory = (total_memory * self.load_perc - torch.cuda.memory_allocated(device.index))
tensor_size = int(free_memory / tensor_num + 1)
while free_memory > int(tensor_size) and killer < 16:
try:
rand = random.randint(0, len(self.datatypes) - 1)
temp_type = self.datatypes[0] if rand > 2 else self.datatypes[rand]
dtype_size = torch.tensor(0, dtype=temp_type).element_size()
tensor = torch.rand(int(tensor_size / dtype_size), dtype=temp_type, device=device)
if rand > 2:
tensor = (tensor + 1).to(self.datatypes[rand])
tensors.append(tensor)
free_memory = (total_memory * self.load_perc - torch.cuda.memory_allocated(device.index))
numero += 1
except Exception as e:
killer += 1
logger.warning(f"Failed to allocate VRAM or unsupported dtype: {e}")
logger.info(f'{len(tensors)} tensors created.')
except Exception as e:
return ["Error detecting CUDA device", str(e)]
logger.info(f"Starting CUDA stress test with {len(tensors)} tensors")
parent_pipe, child_pipe = multiprocessing.Pipe(True)
log_process = multiprocessing.Process(target=self._log_gpu_info, args=(child_pipe, poll_time))
log_process.start()
time.sleep(1)
time_started = time.time()
rotation = 0
tensors_number = len(tensors)
while time.time() - time_started < timing:
rotation += 1
try:
idx1, idx2 = random.sample(range(tensors_number), 2)
idx1_size = len(tensors[idx1])
idx2_size = len(tensors[idx2])
temp_tensor, changed = (tensors[idx1], idx1) if idx1_size > idx2_size else (tensors[idx2], idx2)
min_size = min(idx1_size, idx2_size)
operation = random.choice(["add", "sub", "mul", "div"])
if operation == "add":
tensors[idx1][:min_size] += tensors[idx2][:min_size]
elif operation == "sub":
tensors[idx1][:min_size] -= tensors[idx2][:min_size]
elif operation == "mul":
tensors[idx1][:min_size] *= tensors[idx2][:min_size]
elif operation == "div":
tensors[idx1][:min_size] /= tensors[idx2][:min_size]
tensors[changed] = temp_tensor
self._progress_bar(int(time.time() - time_started), timing, str(rotation))
except Exception as e:
logger.error(f"Test iteration {rotation} FAILED: {e}")
finally:
del temp_tensor
for tensor in tensors:
del tensor
logger.info(f"Final results: {rotation / timing} iterations per second.")
parent_pipe.send(True)
gpu_log = parent_pipe.recv()
parent_pipe.close()
log_process.terminate()
log_process.join()
return gpu_log
def _loader(self, proc_number: int, timing: int, que: multiprocessing.Queue, tensor: torch.Tensor, index: int) -> None:
"""
Load and unload tensors to stress memory bandwidth.
Parameters:
- proc_number (int): The process number.
- timing (int): Duration of the test in seconds.
- que (multiprocessing.Queue): Queue for returning results.
- tensor (torch.Tensor): Tensor to load and unload.
- index (int): CUDA device index.
"""
number_of_errors = 0
reference_crc32 = zlib.crc32(tensor.numpy().tobytes())
time_median = []
number_of_transfers = 0
device, _ = self._cuda_check(index)
time_start = time.time()
while time.time() - time_start < timing:
try:
start_time = time.time()
tensor.to(device)
tensor.to("cpu")
duration = time.time() - start_time
torch.cuda.empty_cache()
time_median.append(duration)
number_of_transfers += 1
if proc_number == 0:
self._progress_bar(int(time.time() - time_start), timing)
except Exception as e:
logger.error(f"Tensor moving to GPU failed: {e}")
crc32 = zlib.crc32(tensor.numpy().tobytes())
if crc32 != reference_crc32:
number_of_errors += 1
que.put({
"number of errors": number_of_errors,
"average transfer time": sum(time_median) / number_of_transfers if number_of_transfers else 0,
"number of transfers": number_of_transfers
})
def cuda_load_unload(self, timing: int = 100) -> str:
"""
Load and unload VRAM to stress test bandwidth and check data consistency.
Parameters:
- timing (int): Duration of the test in seconds.
Returns:
- str: Summary of the test results.
"""
timing = int(self._clamp(timing, 60, 60 * 60 * 24 * 7))
try:
received = []
for device, total_memory in self.devices_info:
que = multiprocessing.Queue()
procs = []
free_memory = (total_memory * 0.99 - torch.cuda.memory_allocated(device.index))
tensor_size = int(free_memory / 5.0 / os.cpu_count())
for p in range(os.cpu_count()):
try:
tensor = torch.randn(tensor_size, dtype=torch.float).to("cpu")
proc = multiprocessing.Process(target=self._loader, args=(p, timing, que, tensor, device.index))
proc.start()
procs.append(proc)
except Exception as e:
logger.warning(f"Error starting process on GPU {device.index}: {e}")
time_started = time.time()
while time.time() - time_started < timing * 2 and len(received) < len(procs):
if que.empty():
time.sleep(0.2)
else:
received.append(que.get())
except Exception as e:
return f"Error detecting CUDA device: {e}"
return f"Final results: {received}."
if __name__ == "__main__":
stresser = CudaStresser(load_perc=0.99)
logger.info(stresser.cuda_stress(timing=60, tensor_num=15))
time.sleep(2)
logger.info(stresser.cuda_load_unload(timing=60))
logger.info("Test finished.")
sys.exit()