#!/usr/bin/env python3 # TODOs: # - Log for past runs (id/link to pipeline) # - Stop / Restart Pipeline # - Use toml config instead of StrEnums for easy extensibility import gitlab import typer import os import tomllib from enum import StrEnum from typing_extensions import Annotated from typing import List app = typer.Typer() gl = gitlab.Gitlab( url=os.environ.get("OD_GL_URL", ""), private_token=os.environ.get("OD_GL_TOKEN", "") ) MASTER_PASSWORD = os.environ.get("OD_MASTER_PASSWORD", "sovereign-workplace") USER = os.environ.get("OD_USER", "od-user") GL_USER = os.environ.get("OD_GL_USER", "od-gl-user") GL_PROJECT = os.environ.get("OD_GL_PROJECT", "1317") LOG_FILE = os.environ.get("OD_PIPELINE_LOG", "./__pipelines.log") class ExternalProviders(StrEnum): kubernetes = "kubernetes" stackit = "stackit" class Clusters(StrEnum): qa = "qa" run = "run" b1_stackit_butterfly = "b1-stackit-butterfly" one = "one" class Apps(StrEnum): all = "all" none = "none" migrations = "migrations" services = "services" ums = "ums" collabora = "collabora" cryptpad = "cryptpad" element = "element" ox = "ox" xwiki = "xwiki" nextcloud = "nextcloud" openproject = "openproject" jitsi = "jitsi" notes = "notes" dev_nextcloud = "dev_nextcloud" dev_bawu = "dev_bawu" @app.command() def pipelines(n=15, username=GL_USER): # gl.enable_debug() opendesk = gl.projects.get(1317) pipelines = opendesk.pipelines.list(iterator=True, username=username) # Show last N pipelines for i, p in enumerate(pipelines): if i > int(n): break match p.status: case "success": status = "✔" case "failed": status = "❌" case "running": status = "🕑" case _: status = p.status print( f"[{p.created_at[:-5].replace('T', ' ')}]-({p.ref}) {status}: {p.web_url}" ) @app.command() def pipeline(pid: str): opendesk = gl.projects.get() pipeline = opendesk.pipelines.get(pid) variables = pipeline.variables.list(get_all=True) print(variables) def _new_pipeline(ref: str, variables: str): parsed_variables = _parse_variables(variables) opendesk = gl.projects.get(1317) np = opendesk.pipelines.create({"ref": ref, "variables": parsed_variables}) log = f"[{np.created_at[:-5].replace('T', ' ')}] {np.web_url} ({ref}): {variables}\n" print(log) _write_to_pipeline_log(log) @app.command() def new_pipeline( ref: str, cluster: Annotated[Clusters, typer.Option(case_sensitive=False)], namespace: str = f"{USER}-py-ce", test: bool = False, test_branch: str = "develop", ee: bool = False, env_stop: bool = False, flush_external: bool = False, external_provider: Annotated[ ExternalProviders, typer.Option(case_sensitive=False) ] = ExternalProviders.stackit, debug: bool = True, default_accounts: bool = True, deploy: Annotated[List[Apps], typer.Option(case_sensitive=False)] = [Apps.none], ): if test: debug = False variables = [ f"CLUSTER:{cluster}", f"NAMESPACE:{namespace}", f"MASTER_PASSWORD_WEB_VAR:{MASTER_PASSWORD}", f"ENV_STOP_BEFORE:{_tf_to_yn(env_stop)}", f"FLUSH_EXTERNAL_SERVICES_BEFORE:{_tf_to_yn(flush_external)}", f"FLUSH_EXTERNAL_SERVICES_TYPE:{external_provider.upper()}", f"RUN_TESTS:{_tf_to_yn(test)}", f"TESTS_BRANCH:{test_branch}", f"DEBUG_ENABLED:{_tf_to_yn(debug)}", f"CREATE_DEFAULT_ACCOUNTS:{_tf_to_yn(default_accounts)}", f"OPENDESK_ENTERPRISE:{'true' if ee else 'false'}", ] if not env_stop and flush_external: print( f"WARNING: env_stop is {env_stop} AND flush_external is {flush_external}", "but flush_external only works if both are set to True", ) if Apps.none in deploy: pass elif Apps.all in deploy and len(deploy) == 1: variables.append("DEPLOY_ALL_COMPONENTS:yes") elif Apps.all in deploy and len(deploy) > 1: print("You cannot deploy 'all' but also specify specific apps at the same time") exit(1) elif Apps.dev_nextcloud in deploy and len(deploy) == 1: STACK = ["nextcloud", "collabora", "cryptpad", "ums", "services", "migrations"] variables += [f"DEPLOY_{app.upper()}:yes" for app in STACK] elif Apps.dev_nextcloud in deploy and len(deploy) > 1: print("You cannot deploy 'dev_nextcloud' but also specify specific apps at the same time") exit(1) elif Apps.dev_bawu in deploy and len(deploy) == 1: STACK = ["nextcloud", "collabora", "ums", "ox", "services", "migrations"] variables += [f"DEPLOY_{app.upper()}:yes" for app in STACK] elif Apps.dev_bawu in deploy and len(deploy) > 1: print("You cannot deploy 'dev_bawu' but also specify specific apps at the same time") exit(1) else: variables += [f"DEPLOY_{app.value.upper()}:yes" for app in deploy] _new_pipeline(ref, ",".join(variables)) def _parse_variables(var_str: str) -> list[dict[str, str]]: parts = var_str.split(",") return [{"key": k, "value": v} for k, v in (p.strip().split(":") for p in parts)] def _yn_to_tf(yn: str | bool) -> bool: if type(yn) is bool: return yn else: return True if yn == "yes" else False def _tf_to_yn(tf: bool) -> str: if type(tf) is not bool: return tf else: return "yes" if tf else "no" @app.command() def self_test(): _test_yn() _test_tf() def _test_yn(): should = [False, False, False, False, False, True, True] for yn, tf in zip([False, "no", "nope", "y", "n", "yes", True], should): assert _yn_to_tf(yn) == tf, f"{yn} != {tf} but is {_yn_to_tf(yn)}" def _test_tf(): should = ["no", "yes"] for tf, yn in zip([False, True], should): assert _tf_to_yn(tf) == yn, f"{tf} != {yn} but is {_tf_to_yn(tf)}" def _write_to_pipeline_log(log: str): with open(LOG_FILE, "a") as pipeline_log: pipeline_log.write(log) if __name__ == "__main__": gl.auth() app()