import requests from bs4 import BeautifulSoup import os from pathlib import Path import subprocess import json wheel_dir = "whls_downloaded" class Nexus: def __init__(self, nexus_ip: str, repository_name: str = "pypi", output_file: str = "nexus_libs.txt"): self.base_url = f"http://{nexus_ip}" self.repository_name = repository_name self.output_file = output_file self.username = os.environ['NEXUS_USERNAME'] # username and pwd are stored in environment variables self.password = os.environ['NEXUS_PW'] self.packages_url = f"{self.base_url}/repository/{self.repository_name}/" self.rest_api_url = f"{self.base_url}/service/rest/v1/" self.session = requests.Session() self.session.auth = (self.username, self.password) @staticmethod def _get_html(url): try: response = requests.get(url=url, timeout=10) response.raise_for_status() return response.text except requests.RequestException as e: raise RuntimeError(f"Request failed for {url}: {e}") @staticmethod def _parse_versions(html): if html is None: return [] soup = BeautifulSoup(html, 'html.parser') versions = [] for link in soup.find_all('a'): version_text = link.text.strip('/') if version_text: version = version_text.split('-')[1].split('-')[0] versions.append(version) return versions @staticmethod def _parse_packages_index(html): if html is None: return {} soup = BeautifulSoup(html, 'html.parser') packages = {} for link in soup.find_all('a'): package_name = link.text.strip('/') package_link = link.get('href').strip('/') packages[package_name] = package_link return packages def fetch_pypi(self) -> dict: """ Fetches all python modules from Nexus PyPi Returns: dict: Dictionary of modules and their versions. """ print("Fetching package list...") packages_html = self._get_html(f"{self.packages_url}simple/") packages = self._parse_packages_index(packages_html) libs = {} with open(self.output_file, 'w') as file: for package, relative_path in packages.items(): package_url = f"{self.packages_url}simple/{relative_path}/" version_html = self._get_html(package_url) versions = self._parse_versions(version_html) for version in versions: file.write(f"{package}=={version}\n") if package in libs: libs[package].add(version) else: libs[package] = {version} print(f"All package versions have been written to {self.output_file}.") return libs def upload_whls(self): """ Uploads .whl files to Nexus PyPi """ files = Path(wheel_dir).glob('*.whl') for file in files: with open(file, 'rb') as f: filename = os.path.basename(file) file_pth = f"./{wheel_dir}/{filename}" print(f"Uploading {filename}...") try: subprocess.run( ["twine", "upload", "-u", self.username, "-p", self.password, "--repository-url", self.packages_url, file_pth]) except subprocess.CalledProcessError: print(f"[ERROR] Uploading {filename} to Nexus has failed!") @staticmethod def check_vulnerabilities(packages): """ Check vulnerabilities for the given dictionary of packages and their versions. Args: packages (dict): Dictionary with package names as keys and sets of versions as values. Returns: list: List of vulnerabilities in JSON format or None if there are no vulnerabilities. """ package_specs = [f'{name}=={version}' for name, versions in packages.items() for version in versions] if not package_specs: return None # Write packages to a temporary requirements file with open('temp_requirements.txt', 'w') as temp_file: temp_file.write("\n".join(package_specs)) # Run the safety check command result = subprocess.run(['safety', 'check', '--file', 'temp_requirements.txt', '--json'], capture_output=True, text=True) # Parse the output if result.stdout: output = json.loads(result.stdout) vulnerabilities = output['vulnerabilities'] if len(vulnerabilities) > 0: return vulnerabilities return None def del_unused(self, extra_libs): """ Deletes unused modules from Nexus PyPi """ for package, versions in extra_libs.items(): for version in versions: try: self._delete_package(package, version) except Exception as e: print(f"[ERROR] Deleting {package}=={version} from Nexus has failed! Error: {e}") continue @staticmethod def _parse_package_line(line): if line and not line.startswith('#'): return line.strip().split('==') return None, None def _delete_package(self, package_name, package_version): url = f"{self.rest_api_url}search?repository={self.repository_name}&name={package_name}&version={package_version}" response = self.session.get(url) response.raise_for_status() items = response.json()['items'] if items: component_id = items[0]['id'] delete_url = f"{self.rest_api_url}components/{component_id}" delete_response = self.session.delete(delete_url) delete_response.raise_for_status() print(f"Deleted {package_name}=={package_version} from Nexus") class Project: def __init__(self, project_path=None): self.project_path = Path(project_path) if project_path else Path.cwd() def fetch_requirements(self, project_path) -> list: """ Walks through the project path and finds all the requirement files. Returns: list: List of paths to the requirement files. """ requirements_pth = [] for root, dirs, files in os.walk(project_path): for file in files: if "requirement" in file and file.endswith(".txt"): file_path = Path(root) / file # Using Path for path operations requirements_pth.append( str(file_path)) # Convert Path object to string if needed elsewhere as string print(f"[INFO] Found requirements: {requirements_pth}") return requirements_pth def fetch_libs(self, project_path): """ Walks through all the requirements and gathers all modules Returns: dict: Dictionary of modules and their versions from all requirements. """ libs = {} for requirement in self.fetch_requirements(project_path=project_path): with open(requirement, "r") as file: for line in file: if len(line) < 3: # to avoid empty rows continue line = line.strip().replace("~", "=") if '==' in line: # when version is needed, fetch it lib_name, lib_version = line.split('==', 1) lib_name = lib_name.strip() lib_version = lib_version.strip() elif '>=' in line: # when version is optional, get the latest lib_name, lib_version = line.split('>=', 1) lib_name = lib_name.strip() lib_version = self.fetch_latest_version(lib_name) else: # when no version, get the latest lib_name = line.strip() lib_version = self.fetch_latest_version(lib_name) if lib_name in libs: libs[lib_name].add(lib_version) else: libs[lib_name] = {lib_version} return libs def compare_libs(self, nexus_libs: dict, mode: str = 'missing', extra_repos=None) -> dict: """ Compares libraries between Nexus and local requirements. Args: mode (str): 'missing' to find libs in requirements but not on Nexus, 'extra' to find libs on Nexus but not in requirements. Returns: dict: Dictionary of libraries that are either missing or extra. """ try: nexus_libs = nexus_libs local_libs = self.fetch_libs(self.project_path) if extra_repos is not None: for path in extra_repos: temp_libs = self.fetch_libs(project_path=path) local_libs.update(temp_libs) result_libs = {} if mode == 'missing': # Find libraries that are in local requirements but missing from Nexus for lib, versions in local_libs.items(): if lib in nexus_libs: missing_versions = versions.difference(nexus_libs[lib]) if missing_versions: result_libs[lib] = missing_versions else: result_libs[lib] = versions elif mode == 'extra': # Find libraries that are on Nexus but not in local requirements for lib, versions in nexus_libs.items(): if lib not in local_libs: result_libs[lib] = versions else: extra_versions = nexus_libs[lib].difference(versions) if extra_versions: result_libs[lib] = extra_versions return result_libs except Exception as e: print(f"Error comparing libraries: {e}") return {} @staticmethod def fetch_latest_version(package_name: str) -> str: """ Fetches the latest version number from the given package. Args: package_name (str): Package to be found. Returns: str: Latest version number e.g. 10.0.0 """ url = f"https://pypi.org/pypi/{package_name}/json" try: response = requests.get(url) response.raise_for_status() data = response.json() return data['info']['version'] except requests.RequestException: print(f"Failed to fetch version for {package_name}") return "" @staticmethod def download_whls(missing_libs): """ Downloads all missing module whls """ # Define the directory to store downloaded .whl files os.makedirs(wheel_dir, exist_ok=True) # Process each package in missing_libs for package, versions in missing_libs.items(): for version in versions: package_spec = f"{package}=={version}" try: # Download the specific package version as a .whl file subprocess.run(['pip', 'download', '--dest', wheel_dir, package_spec]) except Exception as e: print(f"Failed to download module {package_spec}. Skipping to next.\n Exception: {e}") continue