import time import zlib import sys import random import multiprocessing import subprocess import os import torch import datetime import logging # Configure logger logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__file__) class CudaStresser: """ A class to stress test CUDA-enabled GPUs by performing various operations to measure GPU memory bandwidth and stress the device. """ def __init__(self, load_perc: float = 0.99): """ Initialize CUDA device information and setup. Parameters: - load_perc (float): Desired VRAM percentage usage. """ self.datatypes = [torch.float, torch.double, torch.half] self.datavelocity = [14, 18, 12, 1, 2, 4, 8] self.load_perc = self._clamp(load_perc, 0.01, 0.99) self.devices_info = self._initialize_devices() @staticmethod def _clamp(n: float, minn: float, maxn: float) -> float: """ Clamp a value to be within a specified range. Parameters: - n (int): The value to clamp. - minn (int): Minimum allowed value. - maxn (int): Maximum allowed value. Returns: - int: The clamped value. """ clamped = max(min(maxn, n), minn) if clamped != n: logger.warning(f'The argument value {n} was clamped to {clamped}') return clamped def _initialize_devices(self) -> list: """ Initialize and gather information about CUDA devices. Returns: - list: A list of tuples containing (device, total_memory) for each CUDA device. """ devices_info = [] if torch.cuda.device_count() < 1: logger.error("No CUDA devices detected.") raise RuntimeError("No CUDA devices detected.") for i in range(torch.cuda.device_count()): device, total_memory = self._cuda_check(i) devices_info.append((device, total_memory)) logger.info(f"Device {i}: {device}, Total memory: {total_memory}") return devices_info @staticmethod def _cuda_check(device_no: int) -> tuple: """ Check CUDA device properties. Parameters: - device_no (int): The index of the CUDA device. Returns: - tuple: Device and total memory of the CUDA device. """ if torch.cuda.is_available(): device = torch.device(f"cuda:{device_no}") else: device = torch.device("cpu") total_memory = torch.cuda.get_device_properties(device_no).total_memory torch.cuda.set_device(device_no) return device, total_memory @staticmethod def _log_gpu_info(pipe: multiprocessing.Pipe, poll_time: int) -> None: """ Log GPU utilization and temperature information periodically. Parameters: - pipe (multiprocessing.Pipe): Pipe for communication with the main process. - poll_time (int): Time between logs in seconds. """ gpu_log = [] while True: process = subprocess.Popen( ['powershell', '-Command', 'nvidia-smi -q | Select-String -Pattern "Utilization" -Context 0,6 |' ' findstr /C:"Gpu" /C:"Memory"; nvidia-smi -q | findstr /C:"GPU Current Temp"'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, errors = process.communicate() lines = output.decode().split('\r\n') log = { "Timestamp": datetime.datetime.now().isoformat(), "GPU Utilization": int(lines[0].split(':')[-1].strip().replace('%', '')), "Memory Utilization": int(lines[1].split(':')[-1].strip().replace('%', '')), "GPU Temperature": int(lines[2].split(':')[-1].strip().split()[0]) } gpu_log.append(log) if pipe.poll(0): # Check if something is in the pipe. if pipe.recv(): pipe.send(gpu_log) time.sleep(poll_time - 1) @staticmethod def _progress_bar(count_value: int, total: int, suffix: str = '') -> None: """ Display a progress bar in the console. Parameters: - count_value (int): Current count. - total (int): Total count. - suffix (str): Additional suffix to display. """ bar_length = 20 filled_length = int(round(bar_length * count_value / float(total))) percentage = round(100.0 * count_value / float(total), 1) bar = '=' * filled_length + '-' * (bar_length - filled_length) sys.stdout.write(f'[{bar}] {percentage}% ... {suffix}\r') sys.stdout.flush() def cuda_stress(self, timing: int = 60, tensor_num: int = 1000, poll_time: int = 5) -> list: """ Stress test CUDA cores by creating and manipulating tensors. Parameters: - timing (int): Duration of the test in seconds. - tensor_num (int): Number of tensors to create. - poll_time (int): Interval for logging GPU data in seconds. Returns: - list: GPU log as a list of dictionaries. """ # Clamp arguments to valid ranges try: timing = int(self._clamp(timing, 1, 60 * 60 * 24 * 7)) poll_time = int(self._clamp(poll_time, 1, 600)) tensor_num = int(self._clamp(tensor_num, 2, 1000)) except Exception as e: return ["Arguments processing FAILED", str(e)] tensors = [] numero = 0 killer = 0 try: for device, total_memory in self.devices_info: free_memory = (total_memory * self.load_perc - torch.cuda.memory_allocated(device.index)) tensor_size = int(free_memory / tensor_num + 1) while free_memory > int(tensor_size) and killer < 16: try: rand = random.randint(0, len(self.datatypes) - 1) temp_type = self.datatypes[0] if rand > 2 else self.datatypes[rand] dtype_size = torch.tensor(0, dtype=temp_type).element_size() tensor = torch.rand(int(tensor_size / dtype_size), dtype=temp_type, device=device) if rand > 2: tensor = (tensor + 1).to(self.datatypes[rand]) tensors.append(tensor) free_memory = (total_memory * self.load_perc - torch.cuda.memory_allocated(device.index)) numero += 1 except Exception as e: killer += 1 logger.warning(f"Failed to allocate VRAM or unsupported dtype: {e}") logger.info(f'{len(tensors)} tensors created.') except Exception as e: return ["Error detecting CUDA device", str(e)] logger.info(f"Starting CUDA stress test with {len(tensors)} tensors") parent_pipe, child_pipe = multiprocessing.Pipe(True) log_process = multiprocessing.Process(target=self._log_gpu_info, args=(child_pipe, poll_time)) log_process.start() time.sleep(1) time_started = time.time() rotation = 0 tensors_number = len(tensors) while time.time() - time_started < timing: rotation += 1 try: idx1, idx2 = random.sample(range(tensors_number), 2) idx1_size = len(tensors[idx1]) idx2_size = len(tensors[idx2]) temp_tensor, changed = (tensors[idx1], idx1) if idx1_size > idx2_size else (tensors[idx2], idx2) min_size = min(idx1_size, idx2_size) operation = random.choice(["add", "sub", "mul", "div"]) if operation == "add": tensors[idx1][:min_size] += tensors[idx2][:min_size] elif operation == "sub": tensors[idx1][:min_size] -= tensors[idx2][:min_size] elif operation == "mul": tensors[idx1][:min_size] *= tensors[idx2][:min_size] elif operation == "div": tensors[idx1][:min_size] /= tensors[idx2][:min_size] tensors[changed] = temp_tensor self._progress_bar(int(time.time() - time_started), timing, str(rotation)) except Exception as e: logger.error(f"Test iteration {rotation} FAILED: {e}") finally: del temp_tensor for tensor in tensors: del tensor logger.info(f"Final results: {rotation / timing} iterations per second.") parent_pipe.send(True) gpu_log = parent_pipe.recv() parent_pipe.close() log_process.terminate() log_process.join() return gpu_log def _loader(self, proc_number: int, timing: int, que: multiprocessing.Queue, tensor: torch.Tensor, index: int) -> None: """ Load and unload tensors to stress memory bandwidth. Parameters: - proc_number (int): The process number. - timing (int): Duration of the test in seconds. - que (multiprocessing.Queue): Queue for returning results. - tensor (torch.Tensor): Tensor to load and unload. - index (int): CUDA device index. """ number_of_errors = 0 reference_crc32 = zlib.crc32(tensor.numpy().tobytes()) time_median = [] number_of_transfers = 0 device, _ = self._cuda_check(index) time_start = time.time() while time.time() - time_start < timing: try: start_time = time.time() tensor.to(device) tensor.to("cpu") duration = time.time() - start_time torch.cuda.empty_cache() time_median.append(duration) number_of_transfers += 1 if proc_number == 0: self._progress_bar(int(time.time() - time_start), timing) except Exception as e: logger.error(f"Tensor moving to GPU failed: {e}") crc32 = zlib.crc32(tensor.numpy().tobytes()) if crc32 != reference_crc32: number_of_errors += 1 que.put({ "number of errors": number_of_errors, "average transfer time": sum(time_median) / number_of_transfers if number_of_transfers else 0, "number of transfers": number_of_transfers }) def cuda_load_unload(self, timing: int = 100) -> str: """ Load and unload VRAM to stress test bandwidth and check data consistency. Parameters: - timing (int): Duration of the test in seconds. Returns: - str: Summary of the test results. """ timing = int(self._clamp(timing, 60, 60 * 60 * 24 * 7)) try: received = [] for device, total_memory in self.devices_info: que = multiprocessing.Queue() procs = [] free_memory = (total_memory * 0.99 - torch.cuda.memory_allocated(device.index)) tensor_size = int(free_memory / 5.0 / os.cpu_count()) for p in range(os.cpu_count()): try: tensor = torch.randn(tensor_size, dtype=torch.float).to("cpu") proc = multiprocessing.Process(target=self._loader, args=(p, timing, que, tensor, device.index)) proc.start() procs.append(proc) except Exception as e: logger.warning(f"Error starting process on GPU {device.index}: {e}") time_started = time.time() while time.time() - time_started < timing * 2 and len(received) < len(procs): if que.empty(): time.sleep(0.2) else: received.append(que.get()) except Exception as e: return f"Error detecting CUDA device: {e}" return f"Final results: {received}." if __name__ == "__main__": stresser = CudaStresser(load_perc=0.99) logger.info(stresser.cuda_stress(timing=60, tensor_num=15)) time.sleep(2) logger.info(stresser.cuda_load_unload(timing=60)) logger.info("Test finished.") sys.exit()