CICD_Nexus_PyPi/cicd_parser.py

106 lines
5.4 KiB
Python

import argparse
from module_fetch import Nexus, Project
import os
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
def main():
"""
Works just as arguments parses.
Parsed Arguments:
str: nexus_upload
str: nexus_vulnerability
str: nexus_cleanup
"""
nexus_ips = [os.environ['NEXUS_IP'], os.environ['NEXUS2_IP']]
problems = []
for ip in nexus_ips:
try:
parser = argparse.ArgumentParser(description="Run Nexus and Project operations for CI/CD pipeline")
parser.add_argument("action", choices=['nexus_upload', 'nexus_vulnerability', 'nexus_cleanup'],
help="Action to perform")
args = parser.parse_args()
project = Project()
nexus = Nexus(ip)
if args.action == "nexus_vulnerability":
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running fetch_pypi... for Nexus {ip}")
nexus_libs = nexus.fetch_pypi()
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running vulnerability scan... for Nexus {ip}")
vulnerabilities = nexus.check_vulnerabilities(packages=nexus_libs)
print(f"{bcolors.OKGREEN}DEBUG: {bcolors.ENDC} Finished for Nexus {ip}")
if vulnerabilities is not None:
for vuln in vulnerabilities:
print(f"{bcolors.FAIL}Vulnerability found: {bcolors.ENDC} {vuln}")
print(f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}")
problems.append(
f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}")
continue
elif args.action == "nexus_upload":
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running fetch_pypi...for Nexus {ip}")
nexus_libs = nexus.fetch_pypi()
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Fetching libs missing on Nexus {ip}...")
missing_libs = project.compare_libs(nexus_libs=nexus_libs, mode='missing')
if len(missing_libs) < 1:
print(f"{bcolors.WARNING}DEBUG: {bcolors.ENDC} No missing libs found. Exiting for Nexus {ip}")
continue
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Following libs are missing: ")
for name, versions in missing_libs.items():
for version in versions:
print(f"Name: {name} | Version: {version}")
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running vulnerability scan...")
vulnerabilities = nexus.check_vulnerabilities(packages=missing_libs)
if vulnerabilities is not None:
for vuln in vulnerabilities:
print(f"{bcolors.FAIL}Vulnerability found: {bcolors.ENDC} {vuln}")
print(f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}")
problems.append(
f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}")
continue
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Downloading WHLS")
project.download_whls(missing_libs=missing_libs)
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Uploading WHLS to Nexus {ip}")
nexus.upload_whls()
print(f"{bcolors.OKGREEN}DEBUG: {bcolors.ENDC} Finished for Nexus {ip}")
elif args.action == "nexus_cleanup":
extra_repos = [os.environ['EXTRA_REPO_DIR'], os.environ['EXTRA2_REPO_DIR']]
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running fetch_pypi...for Nexus {ip}")
nexus_libs = nexus.fetch_pypi()
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Fetching extra libs on Nexus {ip}")
extra_libs = project.compare_libs(nexus_libs=nexus_libs, mode='extra', extra_repos=extra_repos)
if len(extra_libs) < 1:
print(f"{bcolors.WARNING}DEBUG: {bcolors.ENDC} No extra libs found. Exiting for Nexus {ip}")
continue
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Following libs are unused on Nexus {ip} ")
for name, versions in extra_libs.items():
for version in versions:
print(f"Name: {name} | Version: {version}")
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Deleting unused modules from Nexus {ip}")
nexus.del_unused(extra_libs=extra_libs)
print(f"{bcolors.OKGREEN}DEBUG: {bcolors.ENDC} Finished for Nexus {ip}")
except Exception as e:
print(f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Exception for device: {ip}.\n EXCEPTION: {e}")
problems.append(
f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Exception for device: {ip}.\n {bcolors.FAIL}EXCEPTION: {bcolors.ENDC}{e}")
continue
if len(problems) > 0:
for problem in problems:
print(problem)
exit(1)
if __name__ == "__main__":
main()