From 64d6e5c94ea31186e275052047a8116029ae8d64 Mon Sep 17 00:00:00 2001 From: karlji Date: Mon, 12 Aug 2024 15:20:44 +0000 Subject: [PATCH] Upload files to "/" --- CUDA.py | 323 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + 2 files changed, 324 insertions(+) create mode 100644 CUDA.py create mode 100644 requirements.txt diff --git a/CUDA.py b/CUDA.py new file mode 100644 index 0000000..9b61fc6 --- /dev/null +++ b/CUDA.py @@ -0,0 +1,323 @@ +import time +import zlib +import sys +import random +import multiprocessing +import subprocess +import os +import torch +import datetime +import logging + +# Configure logger +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__file__) + + +class CudaStresser: + """ + A class to stress test CUDA-enabled GPUs by performing various operations + to measure GPU memory bandwidth and stress the device. + """ + + def __init__(self, load_perc: float = 0.99): + """ + Initialize CUDA device information and setup. + + Parameters: + - load_perc (float): Desired VRAM percentage usage. + """ + self.datatypes = [torch.float, torch.double, torch.half] + self.datavelocity = [14, 18, 12, 1, 2, 4, 8] + self.load_perc = self._clamp(load_perc, 0.01, 0.99) + self.devices_info = self._initialize_devices() + + @staticmethod + def _clamp(n: float, minn: float, maxn: float) -> float: + """ + Clamp a value to be within a specified range. + + Parameters: + - n (int): The value to clamp. + - minn (int): Minimum allowed value. + - maxn (int): Maximum allowed value. + + Returns: + - int: The clamped value. + """ + clamped = max(min(maxn, n), minn) + if clamped != n: + logger.warning(f'The argument value {n} was clamped to {clamped}') + return clamped + + def _initialize_devices(self) -> list: + """ + Initialize and gather information about CUDA devices. + + Returns: + - list: A list of tuples containing (device, total_memory) for each CUDA device. + """ + devices_info = [] + if torch.cuda.device_count() < 1: + logger.error("No CUDA devices detected.") + raise RuntimeError("No CUDA devices detected.") + + for i in range(torch.cuda.device_count()): + device, total_memory = self._cuda_check(i) + devices_info.append((device, total_memory)) + logger.info(f"Device {i}: {device}, Total memory: {total_memory}") + + return devices_info + + @staticmethod + def _cuda_check(device_no: int) -> tuple: + """ + Check CUDA device properties. + + Parameters: + - device_no (int): The index of the CUDA device. + + Returns: + - tuple: Device and total memory of the CUDA device. + """ + if torch.cuda.is_available(): + device = torch.device(f"cuda:{device_no}") + else: + device = torch.device("cpu") + total_memory = torch.cuda.get_device_properties(device_no).total_memory + torch.cuda.set_device(device_no) + return device, total_memory + + @staticmethod + def _log_gpu_info(pipe: multiprocessing.Pipe, poll_time: int) -> None: + """ + Log GPU utilization and temperature information periodically. + + Parameters: + - pipe (multiprocessing.Pipe): Pipe for communication with the main process. + - poll_time (int): Time between logs in seconds. + """ + gpu_log = [] + while True: + process = subprocess.Popen( + ['powershell', '-Command', 'nvidia-smi -q | Select-String -Pattern "Utilization" -Context 0,6 |' + ' findstr /C:"Gpu" /C:"Memory"; nvidia-smi -q | findstr /C:"GPU Current Temp"'], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, errors = process.communicate() + lines = output.decode().split('\r\n') + log = { + "Timestamp": datetime.datetime.now().isoformat(), + "GPU Utilization": int(lines[0].split(':')[-1].strip().replace('%', '')), + "Memory Utilization": int(lines[1].split(':')[-1].strip().replace('%', '')), + "GPU Temperature": int(lines[2].split(':')[-1].strip().split()[0]) + } + gpu_log.append(log) + + if pipe.poll(0): # Check if something is in the pipe. + if pipe.recv(): + pipe.send(gpu_log) + time.sleep(poll_time - 1) + + @staticmethod + def _progress_bar(count_value: int, total: int, suffix: str = '') -> None: + """ + Display a progress bar in the console. + + Parameters: + - count_value (int): Current count. + - total (int): Total count. + - suffix (str): Additional suffix to display. + """ + bar_length = 20 + filled_length = int(round(bar_length * count_value / float(total))) + percentage = round(100.0 * count_value / float(total), 1) + bar = '=' * filled_length + '-' * (bar_length - filled_length) + sys.stdout.write(f'[{bar}] {percentage}% ... {suffix}\r') + sys.stdout.flush() + + def cuda_stress(self, timing: int = 60, tensor_num: int = 1000, poll_time: int = 5) -> list: + """ + Stress test CUDA cores by creating and manipulating tensors. + + Parameters: + - timing (int): Duration of the test in seconds. + - tensor_num (int): Number of tensors to create. + - poll_time (int): Interval for logging GPU data in seconds. + + Returns: + - list: GPU log as a list of dictionaries. + """ + # Clamp arguments to valid ranges + try: + timing = int(self._clamp(timing, 1, 60 * 60 * 24 * 7)) + poll_time = int(self._clamp(poll_time, 1, 600)) + tensor_num = int(self._clamp(tensor_num, 2, 1000)) + except Exception as e: + return ["Arguments processing FAILED", str(e)] + + tensors = [] + numero = 0 + killer = 0 + + try: + for device, total_memory in self.devices_info: + free_memory = (total_memory * self.load_perc - torch.cuda.memory_allocated(device.index)) + tensor_size = int(free_memory / tensor_num + 1) + + while free_memory > int(tensor_size) and killer < 16: + try: + rand = random.randint(0, len(self.datatypes) - 1) + temp_type = self.datatypes[0] if rand > 2 else self.datatypes[rand] + dtype_size = torch.tensor(0, dtype=temp_type).element_size() + tensor = torch.rand(int(tensor_size / dtype_size), dtype=temp_type, device=device) + if rand > 2: + tensor = (tensor + 1).to(self.datatypes[rand]) + tensors.append(tensor) + free_memory = (total_memory * self.load_perc - torch.cuda.memory_allocated(device.index)) + numero += 1 + except Exception as e: + killer += 1 + logger.warning(f"Failed to allocate VRAM or unsupported dtype: {e}") + logger.info(f'{len(tensors)} tensors created.') + + except Exception as e: + return ["Error detecting CUDA device", str(e)] + + logger.info(f"Starting CUDA stress test with {len(tensors)} tensors") + parent_pipe, child_pipe = multiprocessing.Pipe(True) + log_process = multiprocessing.Process(target=self._log_gpu_info, args=(child_pipe, poll_time)) + log_process.start() + time.sleep(1) + time_started = time.time() + rotation = 0 + tensors_number = len(tensors) + + while time.time() - time_started < timing: + rotation += 1 + try: + idx1, idx2 = random.sample(range(tensors_number), 2) + idx1_size = len(tensors[idx1]) + idx2_size = len(tensors[idx2]) + temp_tensor, changed = (tensors[idx1], idx1) if idx1_size > idx2_size else (tensors[idx2], idx2) + min_size = min(idx1_size, idx2_size) + + operation = random.choice(["add", "sub", "mul", "div"]) + if operation == "add": + tensors[idx1][:min_size] += tensors[idx2][:min_size] + elif operation == "sub": + tensors[idx1][:min_size] -= tensors[idx2][:min_size] + elif operation == "mul": + tensors[idx1][:min_size] *= tensors[idx2][:min_size] + elif operation == "div": + tensors[idx1][:min_size] /= tensors[idx2][:min_size] + + tensors[changed] = temp_tensor + self._progress_bar(int(time.time() - time_started), timing, str(rotation)) + + except Exception as e: + logger.error(f"Test iteration {rotation} FAILED: {e}") + finally: + del temp_tensor + for tensor in tensors: + del tensor + + logger.info(f"Final results: {rotation / timing} iterations per second.") + + parent_pipe.send(True) + gpu_log = parent_pipe.recv() + parent_pipe.close() + log_process.terminate() + log_process.join() + return gpu_log + + def _loader(self, proc_number: int, timing: int, que: multiprocessing.Queue, tensor: torch.Tensor, index: int) -> None: + """ + Load and unload tensors to stress memory bandwidth. + + Parameters: + - proc_number (int): The process number. + - timing (int): Duration of the test in seconds. + - que (multiprocessing.Queue): Queue for returning results. + - tensor (torch.Tensor): Tensor to load and unload. + - index (int): CUDA device index. + """ + number_of_errors = 0 + reference_crc32 = zlib.crc32(tensor.numpy().tobytes()) + time_median = [] + number_of_transfers = 0 + device, _ = self._cuda_check(index) + time_start = time.time() + + while time.time() - time_start < timing: + try: + start_time = time.time() + tensor.to(device) + tensor.to("cpu") + duration = time.time() - start_time + torch.cuda.empty_cache() + time_median.append(duration) + number_of_transfers += 1 + if proc_number == 0: + self._progress_bar(int(time.time() - time_start), timing) + except Exception as e: + logger.error(f"Tensor moving to GPU failed: {e}") + + crc32 = zlib.crc32(tensor.numpy().tobytes()) + if crc32 != reference_crc32: + number_of_errors += 1 + + que.put({ + "number of errors": number_of_errors, + "average transfer time": sum(time_median) / number_of_transfers if number_of_transfers else 0, + "number of transfers": number_of_transfers + }) + + def cuda_load_unload(self, timing: int = 100) -> str: + """ + Load and unload VRAM to stress test bandwidth and check data consistency. + + Parameters: + - timing (int): Duration of the test in seconds. + + Returns: + - str: Summary of the test results. + """ + timing = int(self._clamp(timing, 60, 60 * 60 * 24 * 7)) + + try: + received = [] + for device, total_memory in self.devices_info: + que = multiprocessing.Queue() + procs = [] + free_memory = (total_memory * 0.99 - torch.cuda.memory_allocated(device.index)) + tensor_size = int(free_memory / 5.0 / os.cpu_count()) + + for p in range(os.cpu_count()): + try: + tensor = torch.randn(tensor_size, dtype=torch.float).to("cpu") + proc = multiprocessing.Process(target=self._loader, args=(p, timing, que, tensor, device.index)) + proc.start() + procs.append(proc) + except Exception as e: + logger.warning(f"Error starting process on GPU {device.index}: {e}") + + time_started = time.time() + while time.time() - time_started < timing * 2 and len(received) < len(procs): + if que.empty(): + time.sleep(0.2) + else: + received.append(que.get()) + + except Exception as e: + return f"Error detecting CUDA device: {e}" + + return f"Final results: {received}." + + +if __name__ == "__main__": + stresser = CudaStresser(load_perc=0.99) + logger.info(stresser.cuda_stress(timing=60, tensor_num=15)) + time.sleep(2) + logger.info(stresser.cuda_load_unload(timing=60)) + logger.info("Test finished.") + sys.exit() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..88314f5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +torch==2.3.1