import argparse from module_fetch import Nexus, Project import os class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' def main(): """ Works just as arguments parses. Parsed Arguments: str: nexus_upload str: nexus_vulnerability str: nexus_cleanup """ nexus_ips = [os.environ['NEXUS_IP'], os.environ['NEXUS2_IP']] problems = [] for ip in nexus_ips: try: parser = argparse.ArgumentParser(description="Run Nexus and Project operations for CI/CD pipeline") parser.add_argument("action", choices=['nexus_upload', 'nexus_vulnerability', 'nexus_cleanup'], help="Action to perform") args = parser.parse_args() project = Project() nexus = Nexus(ip) if args.action == "nexus_vulnerability": print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running fetch_pypi... for Nexus {ip}") nexus_libs = nexus.fetch_pypi() print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running vulnerability scan... for Nexus {ip}") vulnerabilities = nexus.check_vulnerabilities(packages=nexus_libs) print(f"{bcolors.OKGREEN}DEBUG: {bcolors.ENDC} Finished for Nexus {ip}") if vulnerabilities is not None: for vuln in vulnerabilities: print(f"{bcolors.FAIL}Vulnerability found: {bcolors.ENDC} {vuln}") print(f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}") problems.append( f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}") continue elif args.action == "nexus_upload": print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running fetch_pypi...for Nexus {ip}") nexus_libs = nexus.fetch_pypi() print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Fetching libs missing on Nexus {ip}...") missing_libs = project.compare_libs(nexus_libs=nexus_libs, mode='missing') if len(missing_libs) < 1: print(f"{bcolors.WARNING}DEBUG: {bcolors.ENDC} No missing libs found. Exiting for Nexus {ip}") continue print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Following libs are missing: ") for name, versions in missing_libs.items(): for version in versions: print(f"Name: {name} | Version: {version}") print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running vulnerability scan...") vulnerabilities = nexus.check_vulnerabilities(packages=missing_libs) if vulnerabilities is not None: for vuln in vulnerabilities: print(f"{bcolors.FAIL}Vulnerability found: {bcolors.ENDC} {vuln}") print(f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}") problems.append( f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}") continue print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Downloading WHLS") project.download_whls(missing_libs=missing_libs) print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Uploading WHLS to Nexus {ip}") nexus.upload_whls() print(f"{bcolors.OKGREEN}DEBUG: {bcolors.ENDC} Finished for Nexus {ip}") elif args.action == "nexus_cleanup": extra_repos = [os.environ['EXTRA_REPO_DIR'], os.environ['EXTRA2_REPO_DIR']] print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running fetch_pypi...for Nexus {ip}") nexus_libs = nexus.fetch_pypi() print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Fetching extra libs on Nexus {ip}") extra_libs = project.compare_libs(nexus_libs=nexus_libs, mode='extra', extra_repos=extra_repos) if len(extra_libs) < 1: print(f"{bcolors.WARNING}DEBUG: {bcolors.ENDC} No extra libs found. Exiting for Nexus {ip}") continue print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Following libs are unused on Nexus {ip} ") for name, versions in extra_libs.items(): for version in versions: print(f"Name: {name} | Version: {version}") print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Deleting unused modules from Nexus {ip}") nexus.del_unused(extra_libs=extra_libs) print(f"{bcolors.OKGREEN}DEBUG: {bcolors.ENDC} Finished for Nexus {ip}") except Exception as e: print(f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Exception for device: {ip}.\n EXCEPTION: {e}") problems.append( f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Exception for device: {ip}.\n {bcolors.FAIL}EXCEPTION: {bcolors.ENDC}{e}") continue if len(problems) > 0: for problem in problems: print(problem) exit(1) if __name__ == "__main__": main()