from typing import Awaitable, Type, Optional, Tuple from urllib.parse import urlencode import json import time import requests import asyncio import shlex import aiohttp from mautrix.client import Client, InternalEventType, MembershipEventDispatcher, SyncStream from mautrix.types import (Event, StateEvent, EventID, UserID, EventType, RoomID, RoomAlias, ReactionEvent, RedactionEvent, Membership) from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from maubot import Plugin, MessageEvent from maubot.handlers import command, event class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("gitea_instance") helper.copy("gitea_token") helper.copy("gitea_owner") helper.copy("gitea_repo") helper.copy("gitea_bug_label") helper.copy("gitea_feature_label") helper.copy("feature_issue_room") helper.copy("bug_issue_room") helper.copy("about_text") helper.copy("issue_bug_text") helper.copy("issue_feature_text") helper.copy("issue_disclaimer_text") helper.copy("command_pm_error") helper.copy("command_pm_help") helper.copy("command_pm_success") helper.copy("command_new_help") helper.copy("command_success") class DevopsBot(Plugin): async def start(self) -> None: await super().start() self.config.load_and_update() self.client.add_dispatcher(MembershipEventDispatcher) self.pm_rooms = set() def g_instance(self) -> str: return self.config["gitea_instance"] def g_token(self) -> str: return self.config["gitea_token"] def g_owner(self) -> str: return self.config["gitea_owner"] def g_repo(self) -> str: return self.config["gitea_repo"] def g_bug_label(self) -> int: return self.config["gitea_bug_label"] def g_feature_label(self) -> int: return self.config["gitea_feature_label"] def g_bug_room(self) -> str: return self.config["bug_issue_room"] def g_feature_room(self) -> str: return self.config["feature_issue_room"] def g_about(self) -> str: return self.config["about_text"] def g_bug_text(self) -> str: return self.config["issue_bug_text"] def g_feature_text(self) -> str: return self.config["issue_feature_text"] def g_disclaimer(self) -> str: return self.config["issue_disclaimer_text"] def g_c_self_error(self) -> str: return self.config["command_pm_error"] def g_c_self_help(self) -> str: return self.config["command_pm_help"] def g_c_self_success(self) -> str: return self.config["command_pm_success"] def g_c_new_help(self) -> str: return self.config["command_new_help"] def g_c_command_success(self) -> str: return self.config["command_success"] # Match all the registered pm rooms in pm_rooms with # the username and return the room id async def has_pm_room(self, user) -> str: for room in self.pm_rooms: member_events = await self.client.get_members(room, membership=Membership.JOIN) participants = [event.state_key for event in member_events] if user in participants: return room return 'NONE' # Fetch issues from the configured repository async def get_issues(self, state='all') -> list: if state not in ['open', 'closed', 'all']: raise ValueError("State må være enten 'open', 'closed', eller 'all") params = {'state': state, 'limit': 500} query_string = urlencode(params) url = f"https://{self.g_instance()}/api/v1/repos/{self.g_owner()}/{self.g_repo()}/issues?{query_string}" headers = { "Authorization": f"token {self.g_token()}", "Accept": "application/json" } async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: response.raise_for_status() return await response.json() # Post an issue to the configured repository async def post_issue(self, label: int, title: str, body: str) -> list: url = f"https://{self.g_instance()}/api/v1/repos/{self.g_owner()}/{self.g_repo()}/issues" headers = { "Authorization": f"token {self.g_token()}", "Accept": "application/json", "Content-Type": "application/json" } issue_data = { "title": title, "body": body, "labels": [ label ] } async with aiohttp.ClientSession() as session: async with session.post(url, headers=headers, json=issue_data) as response: response.raise_for_status() return await response.json() @command.new(name="ny", aliases=["new"]) @command.argument("args", pass_raw=True, required=False) async def ny(self, evt: MessageEvent, args: str = None) -> None: # We want to see the commands ran in a open room, so # we can get a sense of it's usage if evt.room_id in self.pm_rooms: await evt.reply(self.g_c_self_error()) return # Check correct formatting of the arguments first try: arg_list = shlex.split(args) except ValueError: await evt.reply(self.g_c_new_help().format(error="Feil i formatering: Forsikre deg om at du bruker gåseøyne separat for tittel og beskrivelse")) return if len(arg_list) < 3: await evt.reply(self.g_c_new_help().format(error="Feil i formatering. Ikke nok argumenter. Bruk !ny [type] \"[Tittel]\" \"[Beskrivelse]\"")) return if len(arg_list) > 3: await evt.reply(self.g_c_new_help().format(error="Feil i formatering. For mange argumenter. Bruk !ny [type] \"[Tittel]\" \"[Beskrivelse]\"")) return issue_type, title, description = arg_list if not title.strip() or not description.strip(): await evt.reply(self.g_c_new_help().format(error="Feil i formatering. Tittel og/eller beskrivelse kan ikke være tomme verdier")) return if len(title) > 250: await evt.reply(self.g_c_new_help().format(error="Feil i formatering. Tittelen kan ikke overskride 250 tegn.")) return if issue_type == 'bug': label = self.g_bug_label() elif issue_type == 'forbedring': label = self.g_feature_label() else: await evt.reply(f"Noe gikk feil med parsingen av argumentet 'type' (Mottok '{issue_type}' forventet 'bug' eller 'forbedring')") return # Append information about the sender to the description text before posting it description_without_signature = description # store description in a new variable for use on matrix description += f"\n\n--\n" description += self.g_disclaimer().format( nick=self.client.parse_user_id(evt.sender)[0], userId=evt.sender ) # Create issue on the git repository, and make a thread on this matrix server try: new_issue = await self.post_issue(label=label, title=title, body=description) if issue_type == 'bug': await self.client.send_notice(self.g_bug_room(), html=self.g_bug_text().format( user=self.client.parse_user_id(evt.sender)[0], userId=evt.sender, title=title, description=description_without_signature, link=new_issue["html_url"], casenr=new_issue["number"] )) else: await self.client.send_notice(self.g_feature_room(), html=self.g_feature_text().format( user=self.client.parse_user_id(evt.sender)[0], userId=evt.sender, title=title, description=description_without_signature, link=new_issue["html_url"], casenr=new_issue["number"] )) await evt.reply(self.g_c_command_success().format( result=f"Sak med saksnr {new_issue['number']} ble opprettet. Bruk !sak for å se detaljer" )) except aiohttp.ClientError as e: await evt.reply(f"Feil ved forespørsel: {e}") except aiohttp.ContentTypeError as e: await evt.reply(f"Feil ved syntaktisk analyse av JSON forespørsel: {e}") except KeyError as e: await evt.reply(f"En feil oppstod ved forsøk på å hente saksdata fra ny sak: {e}") except Exception as e: await evt.reply(f"En uventet feil oppstod: {e}") @command.new(name="saker", aliases=["cases"]) @command.argument("status", required=False) async def saker(self, evt: MessageEvent, status: str) -> None: # We want to see the commands ran in a open room, so # we can get a sense of it's usage if evt.room_id in self.pm_rooms: await evt.reply(self.g_c_self_error()) return # Send network http GET request with aiohttp try: if not status: issues = await self.get_issues(state='open') elif status == 'åpen': issues = await self.get_issues(state='open') elif status == 'lukket': issues = await self.get_issues(state='closed') elif status == 'alle': issues = await self.get_issues(state='all') else: await evt.reply(f"Ukjent status {status}. Status kan enten være 'åpen', 'lukket', eller 'alle'") return output = "
| Saksnr | Tittel | Type(r) |
|---|---|---|
| {issue['number']} | {issue['title']} | " labels = " ".join(label['name'] for label in issue['labels']) output += f"{labels} |