208 lines
6.1 KiB
Python
Executable File
208 lines
6.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# TODOs:
|
|
# - Log for past runs (id/link to pipeline)
|
|
# - Stop / Restart Pipeline
|
|
# - Use toml config instead of StrEnums for easy extensibility
|
|
import gitlab
|
|
import typer
|
|
import os
|
|
import tomllib
|
|
|
|
from enum import StrEnum
|
|
from typing_extensions import Annotated
|
|
from typing import List
|
|
|
|
app = typer.Typer()
|
|
gl = gitlab.Gitlab(
|
|
url=os.environ.get("OD_GL_URL", ""), private_token=os.environ.get("OD_GL_TOKEN", "")
|
|
)
|
|
MASTER_PASSWORD = os.environ.get("OD_MASTER_PASSWORD", "sovereign-workplace")
|
|
USER = os.environ.get("OD_USER", "od-user")
|
|
GL_USER = os.environ.get("OD_GL_USER", "od-gl-user")
|
|
GL_PROJECT = os.environ.get("OD_GL_PROJECT", "1317")
|
|
LOG_FILE = os.environ.get("OD_PIPELINE_LOG", "./__pipelines.log")
|
|
|
|
|
|
class ExternalProviders(StrEnum):
|
|
kubernetes = "kubernetes"
|
|
stackit = "stackit"
|
|
|
|
|
|
class Clusters(StrEnum):
|
|
qa = "qa"
|
|
run = "run"
|
|
b1_stackit_butterfly = "b1-stackit-butterfly"
|
|
one = "one"
|
|
|
|
|
|
class Apps(StrEnum):
|
|
all = "all"
|
|
none = "none"
|
|
migrations = "migrations"
|
|
services = "services"
|
|
ums = "ums"
|
|
collabora = "collabora"
|
|
cryptpad = "cryptpad"
|
|
element = "element"
|
|
ox = "ox"
|
|
xwiki = "xwiki"
|
|
nextcloud = "nextcloud"
|
|
openproject = "openproject"
|
|
jitsi = "jitsi"
|
|
notes = "notes"
|
|
dev_nextcloud = "dev_nextcloud"
|
|
dev_bawu = "dev_bawu"
|
|
|
|
|
|
@app.command()
|
|
def pipelines(n=15, username=GL_USER):
|
|
# gl.enable_debug()
|
|
opendesk = gl.projects.get(1317)
|
|
pipelines = opendesk.pipelines.list(iterator=True, username=username)
|
|
|
|
# Show last N pipelines
|
|
for i, p in enumerate(pipelines):
|
|
if i > int(n):
|
|
break
|
|
match p.status:
|
|
case "success":
|
|
status = "✔"
|
|
case "failed":
|
|
status = "❌"
|
|
case "running":
|
|
status = "🕑"
|
|
case _:
|
|
status = p.status
|
|
print(
|
|
f"[{p.created_at[:-5].replace('T', ' ')}]-({p.ref}) {status}: {p.web_url}"
|
|
)
|
|
|
|
|
|
@app.command()
|
|
def pipeline(pid: str):
|
|
opendesk = gl.projects.get()
|
|
pipeline = opendesk.pipelines.get(pid)
|
|
variables = pipeline.variables.list(get_all=True)
|
|
print(variables)
|
|
|
|
|
|
def _new_pipeline(ref: str, variables: str):
|
|
parsed_variables = _parse_variables(variables)
|
|
opendesk = gl.projects.get(1317)
|
|
np = opendesk.pipelines.create({"ref": ref, "variables": parsed_variables})
|
|
log = f"[{np.created_at[:-5].replace('T', ' ')}] {np.web_url} ({ref}): {variables}\n"
|
|
print(log)
|
|
_write_to_pipeline_log(log)
|
|
|
|
|
|
@app.command()
|
|
def new_pipeline(
|
|
ref: str,
|
|
cluster: Annotated[Clusters, typer.Option(case_sensitive=False)],
|
|
namespace: str = f"{USER}-py-ce",
|
|
test: bool = False,
|
|
test_branch: str = "develop",
|
|
ee: bool = False,
|
|
env_stop: bool = False,
|
|
flush_external: bool = False,
|
|
external_provider: Annotated[
|
|
ExternalProviders, typer.Option(case_sensitive=False)
|
|
] = ExternalProviders.stackit,
|
|
debug: bool = True,
|
|
default_accounts: bool = True,
|
|
deploy: Annotated[List[Apps], typer.Option(case_sensitive=False)] = [Apps.none],
|
|
):
|
|
if test:
|
|
debug = False
|
|
|
|
variables = [
|
|
f"CLUSTER:{cluster}",
|
|
f"NAMESPACE:{namespace}",
|
|
f"MASTER_PASSWORD_WEB_VAR:{MASTER_PASSWORD}",
|
|
f"ENV_STOP_BEFORE:{_tf_to_yn(env_stop)}",
|
|
f"FLUSH_EXTERNAL_SERVICES_BEFORE:{_tf_to_yn(flush_external)}",
|
|
f"FLUSH_EXTERNAL_SERVICES_TYPE:{external_provider.upper()}",
|
|
f"RUN_TESTS:{_tf_to_yn(test)}",
|
|
f"TESTS_BRANCH:{test_branch}",
|
|
f"DEBUG_ENABLED:{_tf_to_yn(debug)}",
|
|
f"CREATE_DEFAULT_ACCOUNTS:{_tf_to_yn(default_accounts)}",
|
|
f"OPENDESK_ENTERPRISE:{'true' if ee else 'false'}",
|
|
]
|
|
|
|
if not env_stop and flush_external:
|
|
print(
|
|
f"WARNING: env_stop is {env_stop} AND flush_external is {flush_external}",
|
|
"but flush_external only works if both are set to True",
|
|
)
|
|
|
|
if Apps.none in deploy:
|
|
pass
|
|
elif Apps.all in deploy and len(deploy) == 1:
|
|
variables.append("DEPLOY_ALL_COMPONENTS:yes")
|
|
elif Apps.all in deploy and len(deploy) > 1:
|
|
print("You cannot deploy 'all' but also specify specific apps at the same time")
|
|
exit(1)
|
|
elif Apps.dev_nextcloud in deploy and len(deploy) == 1:
|
|
STACK = ["nextcloud", "collabora", "cryptpad", "ums", "services", "migrations"]
|
|
variables += [f"DEPLOY_{app.upper()}:yes" for app in STACK]
|
|
elif Apps.dev_nextcloud in deploy and len(deploy) > 1:
|
|
print("You cannot deploy 'dev_nextcloud' but also specify specific apps at the same time")
|
|
exit(1)
|
|
elif Apps.dev_bawu in deploy and len(deploy) == 1:
|
|
STACK = ["nextcloud", "collabora", "ums", "ox", "services", "migrations"]
|
|
variables += [f"DEPLOY_{app.upper()}:yes" for app in STACK]
|
|
elif Apps.dev_bawu in deploy and len(deploy) > 1:
|
|
print("You cannot deploy 'dev_bawu' but also specify specific apps at the same time")
|
|
exit(1)
|
|
else:
|
|
variables += [f"DEPLOY_{app.value.upper()}:yes" for app in deploy]
|
|
|
|
_new_pipeline(ref, ",".join(variables))
|
|
|
|
|
|
def _parse_variables(var_str: str) -> list[dict[str, str]]:
|
|
parts = var_str.split(",")
|
|
return [{"key": k, "value": v} for k, v in (p.strip().split(":") for p in parts)]
|
|
|
|
|
|
def _yn_to_tf(yn: str | bool) -> bool:
|
|
if type(yn) is bool:
|
|
return yn
|
|
else:
|
|
return True if yn == "yes" else False
|
|
|
|
|
|
def _tf_to_yn(tf: bool) -> str:
|
|
if type(tf) is not bool:
|
|
return tf
|
|
else:
|
|
return "yes" if tf else "no"
|
|
|
|
|
|
@app.command()
|
|
def self_test():
|
|
_test_yn()
|
|
_test_tf()
|
|
|
|
|
|
def _test_yn():
|
|
should = [False, False, False, False, False, True, True]
|
|
for yn, tf in zip([False, "no", "nope", "y", "n", "yes", True], should):
|
|
assert _yn_to_tf(yn) == tf, f"{yn} != {tf} but is {_yn_to_tf(yn)}"
|
|
|
|
|
|
def _test_tf():
|
|
should = ["no", "yes"]
|
|
for tf, yn in zip([False, True], should):
|
|
assert _tf_to_yn(tf) == yn, f"{tf} != {yn} but is {_tf_to_yn(tf)}"
|
|
|
|
|
|
def _write_to_pipeline_log(log: str):
|
|
with open(LOG_FILE, "a") as pipeline_log:
|
|
pipeline_log.write(log)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
gl.auth()
|
|
app()
|