CICD_Nexus_PyPi/module_fetch.py

302 lines
12 KiB
Python
Raw Normal View History

2024-08-09 16:46:00 +00:00
import requests
from bs4 import BeautifulSoup
import os
from pathlib import Path
import subprocess
import json
wheel_dir = "whls_downloaded"
class Nexus:
def __init__(self, nexus_ip: str, repository_name: str = "pypi",
output_file: str = "nexus_libs.txt"):
self.base_url = f"http://{nexus_ip}"
self.repository_name = repository_name
self.output_file = output_file
self.username = os.environ['NEXUS_USERNAME'] # username and pwd are stored in environment variables
self.password = os.environ['NEXUS_PW']
self.packages_url = f"{self.base_url}/repository/{self.repository_name}/"
self.rest_api_url = f"{self.base_url}/service/rest/v1/"
self.session = requests.Session()
self.session.auth = (self.username, self.password)
@staticmethod
def _get_html(url):
try:
response = requests.get(url=url, timeout=10)
response.raise_for_status()
return response.text
except requests.RequestException as e:
raise RuntimeError(f"Request failed for {url}: {e}")
@staticmethod
def _parse_versions(html):
if html is None:
return []
soup = BeautifulSoup(html, 'html.parser')
versions = []
for link in soup.find_all('a'):
version_text = link.text.strip('/')
if version_text:
version = version_text.split('-')[1].split('-')[0]
versions.append(version)
return versions
@staticmethod
def _parse_packages_index(html):
if html is None:
return {}
soup = BeautifulSoup(html, 'html.parser')
packages = {}
for link in soup.find_all('a'):
package_name = link.text.strip('/')
package_link = link.get('href').strip('/')
packages[package_name] = package_link
return packages
def fetch_pypi(self) -> dict:
"""
Fetches all python modules from Nexus PyPi
Returns:
dict: Dictionary of modules and their versions.
"""
print("Fetching package list...")
packages_html = self._get_html(f"{self.packages_url}simple/")
packages = self._parse_packages_index(packages_html)
libs = {}
with open(self.output_file, 'w') as file:
for package, relative_path in packages.items():
package_url = f"{self.packages_url}simple/{relative_path}/"
version_html = self._get_html(package_url)
versions = self._parse_versions(version_html)
for version in versions:
file.write(f"{package}=={version}\n")
if package in libs:
libs[package].add(version)
else:
libs[package] = {version}
print(f"All package versions have been written to {self.output_file}.")
return libs
def upload_whls(self):
"""
Uploads .whl files to Nexus PyPi
"""
files = Path(wheel_dir).glob('*.whl')
for file in files:
with open(file, 'rb') as f:
filename = os.path.basename(file)
file_pth = f"./{wheel_dir}/{filename}"
print(f"Uploading {filename}...")
try:
subprocess.run(
["twine", "upload", "-u", self.username, "-p", self.password, "--repository-url",
self.packages_url,
file_pth])
except subprocess.CalledProcessError:
print(f"[ERROR] Uploading {filename} to Nexus has failed!")
@staticmethod
def check_vulnerabilities(packages):
"""
Check vulnerabilities for the given dictionary of packages and their versions.
Args:
packages (dict): Dictionary with package names as keys and sets of versions as values.
Returns:
list: List of vulnerabilities in JSON format or None if there are no vulnerabilities.
"""
package_specs = [f'{name}=={version}' for name, versions in packages.items() for version in versions]
if not package_specs:
return None
# Write packages to a temporary requirements file
with open('temp_requirements.txt', 'w') as temp_file:
temp_file.write("\n".join(package_specs))
# Run the safety check command
result = subprocess.run(['safety', 'check', '--file', 'temp_requirements.txt', '--json'], capture_output=True,
text=True)
# Parse the output
if result.stdout:
output = json.loads(result.stdout)
vulnerabilities = output['vulnerabilities']
if len(vulnerabilities) > 0:
return vulnerabilities
return None
def del_unused(self, extra_libs):
"""
Deletes unused modules from Nexus PyPi
"""
for package, versions in extra_libs.items():
for version in versions:
try:
self._delete_package(package, version)
except Exception as e:
print(f"[ERROR] Deleting {package}=={version} from Nexus has failed! Error: {e}")
continue
@staticmethod
def _parse_package_line(line):
if line and not line.startswith('#'):
return line.strip().split('==')
return None, None
def _delete_package(self, package_name, package_version):
url = f"{self.rest_api_url}search?repository={self.repository_name}&name={package_name}&version={package_version}"
response = self.session.get(url)
response.raise_for_status()
items = response.json()['items']
if items:
component_id = items[0]['id']
delete_url = f"{self.rest_api_url}components/{component_id}"
delete_response = self.session.delete(delete_url)
delete_response.raise_for_status()
print(f"Deleted {package_name}=={package_version} from Nexus")
class Project:
def __init__(self, project_path=None):
self.project_path = Path(project_path) if project_path else Path.cwd()
def fetch_requirements(self, project_path) -> list:
"""
Walks through the project path and finds all the requirement files.
Returns:
list: List of paths to the requirement files.
"""
requirements_pth = []
for root, dirs, files in os.walk(project_path):
for file in files:
if "requirement" in file and file.endswith(".txt"):
file_path = Path(root) / file # Using Path for path operations
requirements_pth.append(
str(file_path)) # Convert Path object to string if needed elsewhere as string
print(f"[INFO] Found requirements: {requirements_pth}")
return requirements_pth
def fetch_libs(self, project_path):
"""
Walks through all the requirements and gathers all modules
Returns:
dict: Dictionary of modules and their versions from all requirements.
"""
libs = {}
for requirement in self.fetch_requirements(project_path=project_path):
with open(requirement, "r") as file:
for line in file:
if len(line) < 3: # to avoid empty rows
continue
line = line.strip().replace("~", "=")
if '==' in line: # when version is needed, fetch it
lib_name, lib_version = line.split('==', 1)
lib_name = lib_name.strip()
lib_version = lib_version.strip()
elif '>=' in line: # when version is optional, get the latest
lib_name, lib_version = line.split('>=', 1)
lib_name = lib_name.strip()
lib_version = self.fetch_latest_version(lib_name)
else: # when no version, get the latest
lib_name = line.strip()
lib_version = self.fetch_latest_version(lib_name)
if lib_name in libs:
libs[lib_name].add(lib_version)
else:
libs[lib_name] = {lib_version}
return libs
def compare_libs(self, nexus_libs: dict, mode: str = 'missing', extra_repos=None) -> dict:
"""
Compares libraries between Nexus and local requirements.
Args:
mode (str): 'missing' to find libs in requirements but not on Nexus,
'extra' to find libs on Nexus but not in requirements.
Returns:
dict: Dictionary of libraries that are either missing or extra.
"""
try:
nexus_libs = nexus_libs
local_libs = self.fetch_libs(self.project_path)
if extra_repos is not None:
for path in extra_repos:
temp_libs = self.fetch_libs(project_path=path)
local_libs.update(temp_libs)
result_libs = {}
if mode == 'missing':
# Find libraries that are in local requirements but missing from Nexus
for lib, versions in local_libs.items():
if lib in nexus_libs:
missing_versions = versions.difference(nexus_libs[lib])
if missing_versions:
result_libs[lib] = missing_versions
else:
result_libs[lib] = versions
elif mode == 'extra':
# Find libraries that are on Nexus but not in local requirements
for lib, versions in nexus_libs.items():
if lib not in local_libs:
result_libs[lib] = versions
else:
extra_versions = nexus_libs[lib].difference(versions)
if extra_versions:
result_libs[lib] = extra_versions
return result_libs
except Exception as e:
print(f"Error comparing libraries: {e}")
return {}
@staticmethod
def fetch_latest_version(package_name: str) -> str:
"""
Fetches the latest version number from the given package.
Args:
package_name (str): Package to be found.
Returns:
str: Latest version number e.g. 10.0.0
"""
url = f"https://pypi.org/pypi/{package_name}/json"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
return data['info']['version']
except requests.RequestException:
print(f"Failed to fetch version for {package_name}")
return ""
@staticmethod
def download_whls(missing_libs):
"""
Downloads all missing module whls
"""
# Define the directory to store downloaded .whl files
os.makedirs(wheel_dir, exist_ok=True)
# Process each package in missing_libs
for package, versions in missing_libs.items():
for version in versions:
package_spec = f"{package}=={version}"
try:
# Download the specific package version as a .whl file
subprocess.run(['pip', 'download', '--dest', wheel_dir, package_spec])
except Exception as e:
print(f"Failed to download module {package_spec}. Skipping to next.\n Exception: {e}")
continue