106 lines
5.4 KiB
Python
106 lines
5.4 KiB
Python
import argparse
|
|
from module_fetch import Nexus, Project
|
|
import os
|
|
|
|
|
|
class bcolors:
|
|
HEADER = '\033[95m'
|
|
OKBLUE = '\033[94m'
|
|
OKCYAN = '\033[96m'
|
|
OKGREEN = '\033[92m'
|
|
WARNING = '\033[93m'
|
|
FAIL = '\033[91m'
|
|
ENDC = '\033[0m'
|
|
|
|
|
|
def main():
|
|
"""
|
|
Works just as arguments parses.
|
|
Parsed Arguments:
|
|
str: nexus_upload
|
|
str: nexus_vulnerability
|
|
str: nexus_cleanup
|
|
"""
|
|
nexus_ips = [os.environ['NEXUS_IP'], os.environ['NEXUS2_IP']]
|
|
problems = []
|
|
for ip in nexus_ips:
|
|
try:
|
|
parser = argparse.ArgumentParser(description="Run Nexus and Project operations for CI/CD pipeline")
|
|
parser.add_argument("action", choices=['nexus_upload', 'nexus_vulnerability', 'nexus_cleanup'],
|
|
help="Action to perform")
|
|
args = parser.parse_args()
|
|
project = Project()
|
|
nexus = Nexus(ip)
|
|
if args.action == "nexus_vulnerability":
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running fetch_pypi... for Nexus {ip}")
|
|
nexus_libs = nexus.fetch_pypi()
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running vulnerability scan... for Nexus {ip}")
|
|
vulnerabilities = nexus.check_vulnerabilities(packages=nexus_libs)
|
|
print(f"{bcolors.OKGREEN}DEBUG: {bcolors.ENDC} Finished for Nexus {ip}")
|
|
if vulnerabilities is not None:
|
|
for vuln in vulnerabilities:
|
|
print(f"{bcolors.FAIL}Vulnerability found: {bcolors.ENDC} {vuln}")
|
|
print(f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}")
|
|
problems.append(
|
|
f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}")
|
|
continue
|
|
|
|
elif args.action == "nexus_upload":
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running fetch_pypi...for Nexus {ip}")
|
|
nexus_libs = nexus.fetch_pypi()
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Fetching libs missing on Nexus {ip}...")
|
|
missing_libs = project.compare_libs(nexus_libs=nexus_libs, mode='missing')
|
|
if len(missing_libs) < 1:
|
|
print(f"{bcolors.WARNING}DEBUG: {bcolors.ENDC} No missing libs found. Exiting for Nexus {ip}")
|
|
continue
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Following libs are missing: ")
|
|
for name, versions in missing_libs.items():
|
|
for version in versions:
|
|
print(f"Name: {name} | Version: {version}")
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running vulnerability scan...")
|
|
vulnerabilities = nexus.check_vulnerabilities(packages=missing_libs)
|
|
|
|
if vulnerabilities is not None:
|
|
for vuln in vulnerabilities:
|
|
print(f"{bcolors.FAIL}Vulnerability found: {bcolors.ENDC} {vuln}")
|
|
print(f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}")
|
|
problems.append(
|
|
f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Vulnerabilities found. Exiting for Nexus {ip}")
|
|
continue
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Downloading WHLS")
|
|
project.download_whls(missing_libs=missing_libs)
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Uploading WHLS to Nexus {ip}")
|
|
nexus.upload_whls()
|
|
print(f"{bcolors.OKGREEN}DEBUG: {bcolors.ENDC} Finished for Nexus {ip}")
|
|
|
|
elif args.action == "nexus_cleanup":
|
|
extra_repos = [os.environ['EXTRA_REPO_DIR'], os.environ['EXTRA2_REPO_DIR']]
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Running fetch_pypi...for Nexus {ip}")
|
|
nexus_libs = nexus.fetch_pypi()
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Fetching extra libs on Nexus {ip}")
|
|
extra_libs = project.compare_libs(nexus_libs=nexus_libs, mode='extra', extra_repos=extra_repos)
|
|
if len(extra_libs) < 1:
|
|
print(f"{bcolors.WARNING}DEBUG: {bcolors.ENDC} No extra libs found. Exiting for Nexus {ip}")
|
|
continue
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Following libs are unused on Nexus {ip} ")
|
|
for name, versions in extra_libs.items():
|
|
for version in versions:
|
|
print(f"Name: {name} | Version: {version}")
|
|
|
|
print(f"{bcolors.OKBLUE}DEBUG: {bcolors.ENDC} Deleting unused modules from Nexus {ip}")
|
|
nexus.del_unused(extra_libs=extra_libs)
|
|
print(f"{bcolors.OKGREEN}DEBUG: {bcolors.ENDC} Finished for Nexus {ip}")
|
|
except Exception as e:
|
|
print(f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Exception for device: {ip}.\n EXCEPTION: {e}")
|
|
problems.append(
|
|
f"{bcolors.FAIL}DEBUG: {bcolors.ENDC} Exception for device: {ip}.\n {bcolors.FAIL}EXCEPTION: {bcolors.ENDC}{e}")
|
|
continue
|
|
if len(problems) > 0:
|
|
for problem in problems:
|
|
print(problem)
|
|
exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|