172 lines
6.4 KiB
Python
172 lines
6.4 KiB
Python
from typing import Awaitable, Type, Optional, Tuple
|
|
from urllib.parse import urlencode
|
|
import json
|
|
import time
|
|
import requests
|
|
import asyncio
|
|
import aiohttp
|
|
|
|
from mautrix.client import Client, InternalEventType, MembershipEventDispatcher, SyncStream
|
|
from mautrix.types import (Event, StateEvent, EventID, UserID, EventType,
|
|
RoomID, RoomAlias, ReactionEvent, RedactionEvent,
|
|
Membership)
|
|
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
|
|
from maubot import Plugin, MessageEvent
|
|
from maubot.handlers import command, event
|
|
|
|
class Config(BaseProxyConfig):
|
|
def do_update(self, helper: ConfigUpdateHelper) -> None:
|
|
helper.copy("gitea_instance")
|
|
helper.copy("gitea_token")
|
|
helper.copy("gitea_owner")
|
|
helper.copy("gitea_repo")
|
|
helper.copy("about_text")
|
|
helper.copy("command_pm_error")
|
|
helper.copy("command_pm_help")
|
|
helper.copy("command_pm_success")
|
|
|
|
class DevopsBot(Plugin):
|
|
async def start(self) -> None:
|
|
await super().start()
|
|
self.config.load_and_update()
|
|
self.client.add_dispatcher(MembershipEventDispatcher)
|
|
self.pm_rooms = set()
|
|
|
|
def g_instance(self) -> str:
|
|
return self.config["gitea_instance"]
|
|
|
|
def g_token(self) -> str:
|
|
return self.config["gitea_token"]
|
|
|
|
def g_owner(self) -> str:
|
|
return self.config["gitea_owner"]
|
|
|
|
def g_repo(self) -> str:
|
|
return self.config["gitea_repo"]
|
|
|
|
def g_about(self) -> str:
|
|
return self.config["about_text"]
|
|
|
|
def g_c_self_error(self) -> str:
|
|
return self.config["command_pm_error"]
|
|
|
|
def g_c_self_help(self) -> str:
|
|
return self.config["command_pm_help"]
|
|
|
|
def g_c_self_success(self) -> str:
|
|
return self.config["command_pm_success"]
|
|
|
|
# Match all the registered pm rooms in pm_rooms with
|
|
# the username and return the room id
|
|
async def has_pm_room(self, user) -> str:
|
|
for room in self.pm_rooms:
|
|
member_events = await self.client.get_members(room, membership=Membership.JOIN)
|
|
participants = [event.state_key for event in member_events]
|
|
if user in participants:
|
|
return room
|
|
|
|
return 'NONE'
|
|
|
|
# Fetch issues from the configured repository
|
|
async def get_issues(self, state='all') -> list:
|
|
if state not in ['open', 'closed', 'all']:
|
|
raise ValueError("State må være enten 'open', 'closed', eller 'all")
|
|
|
|
params = {'state': state}
|
|
query_string = urlencode(params)
|
|
url = f"https://{self.g_instance()}/api/v1/repos/{self.g_owner()}/{self.g_repo()}/issues?{query_string}"
|
|
headers = {
|
|
"Authorization": f"token {self.g_token()}",
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=headers) as response:
|
|
response.raise_for_status()
|
|
return await response.json()
|
|
|
|
@command.new(name="saker", aliases=["cases"])
|
|
@command.argument("status", pass_raw=True, required=False)
|
|
async def saker(self, evt: MessageEvent, status: str) -> None:
|
|
# We want to see the commands ran in a open room, so
|
|
# we can get a sense of it's usage
|
|
if evt.room_id in self.pm_rooms:
|
|
await evt.reply(self.g_c_self_error())
|
|
return
|
|
|
|
# Send network http GET request with aiohttp
|
|
try:
|
|
if not status:
|
|
issues = await self.get_issues(state='open')
|
|
elif status == 'åpen':
|
|
issues = await self.get_issues(state='open')
|
|
elif status == 'lukket':
|
|
issues = await self.get_issues(state='closed')
|
|
elif status == 'alle':
|
|
issues = await self.get_issues(state='all')
|
|
else:
|
|
await evt.reply(f"Ukjent status {status}. Status kan enten være 'åpen', 'lukket', eller 'alle'")
|
|
return
|
|
|
|
output = """
|
|
|Saksnr | Tittel | Type(r) |
|
|
| :--- | :----: | ---: |
|
|
"""
|
|
|
|
for issue in issues:
|
|
output += f"\n| {issue['number']} | {issue['title']} | "
|
|
for label in issue['labels']:
|
|
output += f"{label['name']} "
|
|
output += '|'
|
|
|
|
# Check if the bot already has a PM session with the user
|
|
# so we don't stack up invites to new rooms for each
|
|
# command output
|
|
pm_room = await self.has_pm_room(evt.sender)
|
|
|
|
if pm_room is not 'NONE':
|
|
await self.client.send_text(pm_room, output)
|
|
else:
|
|
pm_room = await self.client.create_room(is_direct=True, invitees=[evt.sender])
|
|
self.pm_rooms.add(pm_room)
|
|
await self.client.send_text(pm_room, output)
|
|
|
|
await evt.reply(self.g_c_self_success())
|
|
|
|
except aiohttp.ClientError as e:
|
|
await evt.reply(f"Feil ved forespørsel: {e}")
|
|
except aiohttp.ContentTypeError as e:
|
|
await evt.reply(f"Feil ved syntaktisk analyse av JSON forespørsel: {e}")
|
|
except KeyError as e:
|
|
await evt.reply(f"En feil oppstod ved forsøk på å hente saksdata: {e}")
|
|
except Exception as e:
|
|
await evt.reply(f"En uventet feil oppstod: {e}")
|
|
|
|
# Command !om: Prints an introduction and help text
|
|
@command.new(name="om", aliases=["about", "help", "hjelp"])
|
|
async def om(self, evt: MessageEvent) -> None:
|
|
# We want to see the commands ran in a open room, so
|
|
# we can get a sense of it's usage
|
|
if evt.room_id in self.pm_rooms:
|
|
await evt.reply(self.g_c_self_error())
|
|
return
|
|
|
|
pm = self.g_about()
|
|
# Check if the bot already has a PM session with the user
|
|
# so we don't stack up invites to new rooms for each
|
|
# command output
|
|
pm_room = await self.has_pm_room(evt.sender)
|
|
|
|
if pm_room is not 'NONE':
|
|
await self.client.send_text(pm_room, html=pm)
|
|
else:
|
|
pm_room = await self.client.create_room(is_direct=True, invitees=[evt.sender])
|
|
self.pm_rooms.add(pm_room)
|
|
await self.client.send_text(pm_room, html=pm)
|
|
|
|
await evt.reply(self.g_c_self_help())
|
|
|
|
@classmethod
|
|
def get_config_class(cls) -> Type[BaseProxyConfig]:
|
|
return Config
|
|
|