483 lines
19 KiB
Python
483 lines
19 KiB
Python
from typing import Awaitable, Type, Optional, Tuple
|
|
from urllib.parse import urlencode
|
|
import json
|
|
import time
|
|
import requests
|
|
import asyncio
|
|
import hmac
|
|
import hashlib
|
|
import shlex
|
|
import aiohttp
|
|
from aiohttp.web import Request, Response, json_response
|
|
|
|
from mautrix.client import Client, InternalEventType, MembershipEventDispatcher, SyncStream
|
|
from mautrix.types import (Event, StateEvent, EventID, UserID, EventType,
|
|
RoomID, RoomAlias, ReactionEvent, RedactionEvent,
|
|
Membership)
|
|
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
|
|
from maubot import Plugin, MessageEvent
|
|
from maubot.handlers import command, event, web
|
|
|
|
class Config(BaseProxyConfig):
|
|
def do_update(self, helper: ConfigUpdateHelper) -> None:
|
|
helper.copy("gitea_instance")
|
|
helper.copy("gitea_token")
|
|
helper.copy("webhook_token")
|
|
helper.copy("gitea_owner")
|
|
helper.copy("gitea_repo")
|
|
helper.copy("gitea_bug_label")
|
|
helper.copy("gitea_feature_label")
|
|
helper.copy("feature_issue_room")
|
|
helper.copy("bug_issue_room")
|
|
helper.copy("webhook_announcement_room")
|
|
helper.copy("about_text")
|
|
helper.copy("issue_bug_text")
|
|
helper.copy("issue_feature_text")
|
|
helper.copy("issue_disclaimer_text")
|
|
helper.copy("issue_details_text")
|
|
helper.copy("command_pm_error")
|
|
helper.copy("command_pm_help")
|
|
helper.copy("command_pm_success")
|
|
helper.copy("command_new_help")
|
|
helper.copy("command_case_error")
|
|
helper.copy("command_success")
|
|
helper.copy("webhook_announce_push")
|
|
|
|
class DevopsBot(Plugin):
|
|
async def start(self) -> None:
|
|
await super().start()
|
|
self.config.load_and_update()
|
|
self.client.add_dispatcher(MembershipEventDispatcher)
|
|
self.pm_rooms = set()
|
|
|
|
def g_instance(self) -> str:
|
|
return self.config["gitea_instance"]
|
|
|
|
def g_token(self) -> str:
|
|
return self.config["gitea_token"]
|
|
|
|
def g_wh_token(self) -> str:
|
|
return self.config["webhook_token"]
|
|
|
|
def g_owner(self) -> str:
|
|
return self.config["gitea_owner"]
|
|
|
|
def g_repo(self) -> str:
|
|
return self.config["gitea_repo"]
|
|
|
|
def g_bug_label(self) -> int:
|
|
return self.config["gitea_bug_label"]
|
|
|
|
def g_feature_label(self) -> int:
|
|
return self.config["gitea_feature_label"]
|
|
|
|
def g_bug_room(self) -> str:
|
|
return self.config["bug_issue_room"]
|
|
|
|
def g_webhook_room(self) -> str:
|
|
return self.config["webhook_announcement_room"]
|
|
|
|
def g_feature_room(self) -> str:
|
|
return self.config["feature_issue_room"]
|
|
|
|
def g_about(self) -> str:
|
|
return self.config["about_text"]
|
|
|
|
def g_bug_text(self) -> str:
|
|
return self.config["issue_bug_text"]
|
|
|
|
def g_feature_text(self) -> str:
|
|
return self.config["issue_feature_text"]
|
|
|
|
def g_disclaimer(self) -> str:
|
|
return self.config["issue_disclaimer_text"]
|
|
|
|
def g_issue_details(self) -> str:
|
|
return self.config["issue_details_text"]
|
|
|
|
def g_c_self_error(self) -> str:
|
|
return self.config["command_pm_error"]
|
|
|
|
def g_c_self_help(self) -> str:
|
|
return self.config["command_pm_help"]
|
|
|
|
def g_c_self_success(self) -> str:
|
|
return self.config["command_pm_success"]
|
|
|
|
def g_c_new_help(self) -> str:
|
|
return self.config["command_new_help"]
|
|
|
|
def g_c_case_error(self) -> str:
|
|
return self.config["command_case_error"]
|
|
|
|
def g_c_command_success(self) -> str:
|
|
return self.config["command_success"]
|
|
|
|
def g_w_announce_push(self) -> str:
|
|
return self.config["webhook_announce_push"]
|
|
|
|
# Match all the registered pm rooms in pm_rooms with
|
|
# the username and return the room id
|
|
async def has_pm_room(self, user) -> str:
|
|
for room in self.pm_rooms:
|
|
member_events = await self.client.get_members(room, membership=Membership.JOIN)
|
|
participants = [event.state_key for event in member_events]
|
|
if user in participants:
|
|
return room
|
|
|
|
return 'NONE'
|
|
|
|
# Fetch issues from the configured repository
|
|
async def get_issues(self, state='all') -> list:
|
|
if state not in ['open', 'closed', 'all']:
|
|
raise ValueError("State må være enten 'open', 'closed', eller 'all")
|
|
|
|
params = {'state': state, 'limit': 500}
|
|
query_string = urlencode(params)
|
|
url = f"https://{self.g_instance()}/api/v1/repos/{self.g_owner()}/{self.g_repo()}/issues?{query_string}"
|
|
headers = {
|
|
"Authorization": f"token {self.g_token()}",
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=headers) as response:
|
|
response.raise_for_status()
|
|
return await response.json()
|
|
|
|
# Post an issue to the configured repository
|
|
async def post_issue(self, label: int, title: str, body: str) -> list:
|
|
url = f"https://{self.g_instance()}/api/v1/repos/{self.g_owner()}/{self.g_repo()}/issues"
|
|
headers = {
|
|
"Authorization": f"token {self.g_token()}",
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
issue_data = {
|
|
"title": title,
|
|
"body": body,
|
|
"labels": [
|
|
label
|
|
]
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(url, headers=headers, json=issue_data) as response:
|
|
response.raise_for_status()
|
|
return await response.json()
|
|
|
|
# Fetch a single issue from the configured repository
|
|
async def get_issue(self, case) -> list:
|
|
url = f"https://{self.g_instance()}/api/v1/repos/{self.g_owner()}/{self.g_repo()}/issues/{case}"
|
|
headers = {
|
|
"Authorization": f"token {self.g_token()}",
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=headers) as response:
|
|
response.raise_for_status()
|
|
return await response.json()
|
|
|
|
@command.new(name="sak", aliases=["case"])
|
|
@command.argument("saksnr", pass_raw=False, required=False)
|
|
async def sak(self, evt: MessageEvent, saksnr: str = None) -> None:
|
|
# We want to see the commands ran in a open room, so
|
|
# we can get a sense of it's usage
|
|
if evt.room_id in self.pm_rooms:
|
|
await evt.reply(self.g_c_self_error())
|
|
return
|
|
|
|
# Check correct formatting of the arguments first
|
|
if not saksnr:
|
|
await evt.reply(self.g_c_case_error().format(
|
|
error="Feil: saksnr er påkrevd"
|
|
))
|
|
return
|
|
elif not saksnr.isdigit():
|
|
await evt.reply(self.g_c_case_error().format(
|
|
error="Feil: saksnr må være et heltall"
|
|
))
|
|
return
|
|
elif not int(saksnr) > 0:
|
|
await evt.reply(self.g_c_case_error().format(
|
|
error="Feil: saksnr må ha et tall høyere enn 0"
|
|
))
|
|
return
|
|
|
|
try:
|
|
issue = await self.get_issue(saksnr)
|
|
|
|
# Determine what type of issue we're dealing with
|
|
issueType = 'Ukjent'
|
|
|
|
for label in issue["labels"]:
|
|
if label["id"] == self.g_bug_label():
|
|
issueType = 'Bug'
|
|
break
|
|
if label["id"] == self.g_feature_label():
|
|
issueType = 'Forbedring'
|
|
break
|
|
|
|
if issueType == 'Bug':
|
|
issueContent = self.g_issue_details().format(
|
|
saksnr=issue["number"],
|
|
type="🪲 Bug",
|
|
title=issue["title"],
|
|
description=issue["body"],
|
|
link=issue["html_url"]
|
|
)
|
|
elif issueType == 'Forbedring':
|
|
issueContent = self.g_issue_details().format(
|
|
saksnr=issue["number"],
|
|
type="🎫 Forbedring",
|
|
title=issue["title"],
|
|
description=issue["body"],
|
|
link=issue["html_url"]
|
|
)
|
|
else:
|
|
issueContent = self.g_issue_details().format(
|
|
saksnr=issue["number"],
|
|
type="❓ Ukjent",
|
|
title=issue["title"],
|
|
description=issue["body"],
|
|
link=issue["html_url"]
|
|
)
|
|
|
|
# Check if the bot already has a PM session with the user
|
|
# so we don't stack up invites to new rooms for each
|
|
# command output
|
|
pm_room = await self.has_pm_room(evt.sender)
|
|
|
|
if pm_room != 'NONE':
|
|
await self.client.send_text(pm_room, html=issueContent)
|
|
else:
|
|
pm_room = await self.client.create_room(is_direct=True, invitees=[evt.sender])
|
|
self.pm_rooms.add(pm_room)
|
|
await self.client.send_text(pm_room, html=issueContent)
|
|
|
|
await evt.reply(self.g_c_self_success())
|
|
except aiohttp.ClientError as e:
|
|
await evt.reply(f"Feil ved forespørsel: {e}")
|
|
except aiohttp.ContentTypeError as e:
|
|
await evt.reply(f"Feil ved syntaktisk analyse av JSON forespørsel: {e}")
|
|
except KeyError as e:
|
|
await evt.reply(f"En feil oppstod ved forsøk på å hente saksdata fra ny sak: {e}")
|
|
except Exception as e:
|
|
await evt.reply(f"En uventet feil oppstod: {e}")
|
|
|
|
@command.new(name="ny", aliases=["new"])
|
|
@command.argument("args", pass_raw=True, required=False)
|
|
async def ny(self, evt: MessageEvent, args: str = None) -> None:
|
|
# We want to see the commands ran in a open room, so
|
|
# we can get a sense of it's usage
|
|
if evt.room_id in self.pm_rooms:
|
|
await evt.reply(self.g_c_self_error())
|
|
return
|
|
|
|
# Check correct formatting of the arguments first
|
|
|
|
try:
|
|
arg_list = shlex.split(args)
|
|
except ValueError:
|
|
await evt.reply(self.g_c_new_help().format(error="Feil i formatering: Forsikre deg om at du bruker gåseøyne separat for tittel og beskrivelse"))
|
|
return
|
|
|
|
if len(arg_list) < 3:
|
|
await evt.reply(self.g_c_new_help().format(error="Feil i formatering. Ikke nok argumenter. Bruk !ny [type] \"[Tittel]\" \"[Beskrivelse]\""))
|
|
return
|
|
if len(arg_list) > 3:
|
|
await evt.reply(self.g_c_new_help().format(error="Feil i formatering. For mange argumenter. Bruk !ny [type] \"[Tittel]\" \"[Beskrivelse]\""))
|
|
return
|
|
|
|
issue_type, title, description = arg_list
|
|
|
|
if not title.strip() or not description.strip():
|
|
await evt.reply(self.g_c_new_help().format(error="Feil i formatering. Tittel og/eller beskrivelse kan ikke være tomme verdier"))
|
|
return
|
|
|
|
if len(title) > 250:
|
|
await evt.reply(self.g_c_new_help().format(error="Feil i formatering. Tittelen kan ikke overskride 250 tegn."))
|
|
return
|
|
|
|
if issue_type == 'bug':
|
|
label = self.g_bug_label()
|
|
elif issue_type == 'forbedring':
|
|
label = self.g_feature_label()
|
|
else:
|
|
await evt.reply(f"Noe gikk feil med parsingen av argumentet 'type' (Mottok '{issue_type}' forventet 'bug' eller 'forbedring')")
|
|
return
|
|
|
|
# Append information about the sender to the description text before posting it
|
|
description_without_signature = description # store description in a new variable for use on matrix
|
|
|
|
description += f"\n\n--\n"
|
|
description += self.g_disclaimer().format(
|
|
nick=self.client.parse_user_id(evt.sender)[0],
|
|
userId=evt.sender
|
|
)
|
|
|
|
# Create issue on the git repository, and make a thread on this matrix server
|
|
|
|
try:
|
|
new_issue = await self.post_issue(label=label, title=title, body=description)
|
|
if issue_type == 'bug':
|
|
await self.client.send_notice(self.g_bug_room(), html=self.g_bug_text().format(
|
|
user=self.client.parse_user_id(evt.sender)[0],
|
|
userId=evt.sender,
|
|
title=title,
|
|
description=description_without_signature,
|
|
link=new_issue["html_url"],
|
|
casenr=new_issue["number"]
|
|
))
|
|
else:
|
|
await self.client.send_notice(self.g_feature_room(), html=self.g_feature_text().format(
|
|
user=self.client.parse_user_id(evt.sender)[0],
|
|
userId=evt.sender,
|
|
title=title,
|
|
description=description_without_signature,
|
|
link=new_issue["html_url"],
|
|
casenr=new_issue["number"]
|
|
))
|
|
|
|
await evt.reply(self.g_c_command_success().format(
|
|
result=f"Sak med saksnr {new_issue['number']} ble opprettet. Bruk !sak for å se detaljer"
|
|
))
|
|
except aiohttp.ClientError as e:
|
|
await evt.reply(f"Feil ved forespørsel: {e}")
|
|
except aiohttp.ContentTypeError as e:
|
|
await evt.reply(f"Feil ved syntaktisk analyse av JSON forespørsel: {e}")
|
|
except KeyError as e:
|
|
await evt.reply(f"En feil oppstod ved forsøk på å hente saksdata fra ny sak: {e}")
|
|
except Exception as e:
|
|
await evt.reply(f"En uventet feil oppstod: {e}")
|
|
|
|
@command.new(name="saker", aliases=["cases"])
|
|
@command.argument("status", required=False)
|
|
async def saker(self, evt: MessageEvent, status: str) -> None:
|
|
# We want to see the commands ran in a open room, so
|
|
# we can get a sense of it's usage
|
|
if evt.room_id in self.pm_rooms:
|
|
await evt.reply(self.g_c_self_error())
|
|
return
|
|
|
|
# Send network http GET request with aiohttp
|
|
try:
|
|
if not status:
|
|
issues = await self.get_issues(state='open')
|
|
elif status == 'åpen':
|
|
issues = await self.get_issues(state='open')
|
|
elif status == 'lukket':
|
|
issues = await self.get_issues(state='closed')
|
|
elif status == 'alle':
|
|
issues = await self.get_issues(state='all')
|
|
else:
|
|
await evt.reply(f"Ukjent status {status}. Status kan enten være 'åpen', 'lukket', eller 'alle'")
|
|
return
|
|
|
|
output = "<table><tr><th>Saksnr</th><th>Tittel</th><th>Type(r)</th></tr>"
|
|
|
|
for issue in issues:
|
|
output += f"<tr><td>{issue['number']}</td><td>{issue['title']}</td><td>"
|
|
labels = " ".join(label['name'] for label in issue['labels'])
|
|
output += f"{labels}</td></tr>"
|
|
|
|
output += "</table>"
|
|
|
|
# Check if the bot already has a PM session with the user
|
|
# so we don't stack up invites to new rooms for each
|
|
# command output
|
|
pm_room = await self.has_pm_room(evt.sender)
|
|
|
|
if pm_room != 'NONE':
|
|
await self.client.send_text(pm_room, html=output)
|
|
else:
|
|
pm_room = await self.client.create_room(is_direct=True, invitees=[evt.sender])
|
|
self.pm_rooms.add(pm_room)
|
|
await self.client.send_text(pm_room, html=output)
|
|
|
|
await evt.reply(self.g_c_self_success())
|
|
|
|
except aiohttp.ClientError as e:
|
|
await evt.reply(f"Feil ved forespørsel: {e}")
|
|
except aiohttp.ContentTypeError as e:
|
|
await evt.reply(f"Feil ved syntaktisk analyse av JSON forespørsel: {e}")
|
|
except KeyError as e:
|
|
await evt.reply(f"En feil oppstod ved forsøk på å hente saksdata: {e}")
|
|
except Exception as e:
|
|
await evt.reply(f"En uventet feil oppstod: {e}")
|
|
|
|
# Command !om: Prints an introduction and help text
|
|
@command.new(name="om", aliases=["about", "help", "hjelp"])
|
|
async def om(self, evt: MessageEvent) -> None:
|
|
# We want to see the commands ran in a open room, so
|
|
# we can get a sense of it's usage
|
|
if evt.room_id in self.pm_rooms:
|
|
await evt.reply(self.g_c_self_error())
|
|
return
|
|
|
|
pm = self.g_about()
|
|
# Check if the bot already has a PM session with the user
|
|
# so we don't stack up invites to new rooms for each
|
|
# command output
|
|
pm_room = await self.has_pm_room(evt.sender)
|
|
|
|
if pm_room != 'NONE':
|
|
await self.client.send_text(pm_room, html=pm)
|
|
else:
|
|
pm_room = await self.client.create_room(is_direct=True, invitees=[evt.sender])
|
|
self.pm_rooms.add(pm_room)
|
|
await self.client.send_text(pm_room, html=pm)
|
|
|
|
await evt.reply(self.g_c_self_help())
|
|
|
|
@web.post("/webhook")
|
|
async def post_webhook(self, request: Request) -> Response:
|
|
content_type = request.headers.get('Content-Type', '').lower().strip()
|
|
if 'application/json' not in content_type:
|
|
return Response(status=415, text="Unsupported or no media type in header")
|
|
|
|
try:
|
|
payload = await request.text()
|
|
except Exception as e:
|
|
self.log.warning(msg=f"Devbot recieved invalid payload {str(e)}")
|
|
return Response(status=400, text="Bad request")
|
|
|
|
if not payload:
|
|
return Response(status=400, text="Bad request")
|
|
|
|
header_signature = request.headers.get('X-Gitea-Signature', '')
|
|
|
|
if not header_signature:
|
|
self.log.warning(msg=f"Devbot got a signature without gitea header signature")
|
|
return Response(status=400, text="Bad request")
|
|
|
|
payload_signature = hmac.new(self.g_wh_token().encode(), payload.encode(), hashlib.sha256).hexdigest()
|
|
|
|
if header_signature != payload_signature:
|
|
self.log.warning(msg=f"Devbot got a webhook signal with an invalid signature")
|
|
return Response(status=403, text="Forbidden")
|
|
|
|
try:
|
|
decoded = json.loads(payload)
|
|
except json.JSONDecodeError as e:
|
|
self.log.warning(msg=f"Devbot cannot parse the payload from json - {str(e)}")
|
|
return Response(status=400, text="Bad request")
|
|
|
|
event_type = request.headers.get("X-Gitea-Event")
|
|
|
|
if event_type == 'push':
|
|
await self.client.send_notice(self.g_webhook_room(), html=self.g_w_announce_push().format(
|
|
repo=decoded['repository']['full_name'],
|
|
user=decoded['pusher']['full_name'],
|
|
branch=decoded['ref'].split('/')[-1],
|
|
message=decoded['commits'][0]['message']
|
|
))
|
|
|
|
return Response(status=200, text="Request OK")
|
|
|
|
@classmethod
|
|
def get_config_class(cls) -> Type[BaseProxyConfig]:
|
|
return Config
|
|
|