119 lines
4.8 KiB
Python
119 lines
4.8 KiB
Python
from typing import Awaitable, Type, Optional, Tuple
|
|
import json
|
|
import time
|
|
|
|
from mautrix.client import Client, InternalEventType, MembershipEventDispatcher, SyncStream
|
|
from mautrix.types import (Event, StateEvent, EventID, UserID, EventType,
|
|
RoomID, RoomAlias, ReactionEvent, RedactionEvent)
|
|
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
|
|
from maubot import Plugin, MessageEvent
|
|
from maubot.handlers import command, event
|
|
|
|
class Config(BaseProxyConfig):
|
|
def do_update(self, helper: ConfigUpdateHelper) -> None:
|
|
helper.copy("watch_rooms")
|
|
helper.copy("announcement_room")
|
|
helper.copy("notification_message")
|
|
helper.copy("seen_users")
|
|
helper.copy("pm_message")
|
|
helper.copy("en_pm")
|
|
helper.copy("no_pm")
|
|
helper.copy("invite_codes")
|
|
|
|
class WelcomeBot(Plugin):
|
|
async def start(self) -> None:
|
|
await super().start()
|
|
self.config.load_and_update()
|
|
self.client.add_dispatcher(MembershipEventDispatcher)
|
|
self.pm_rooms = set()
|
|
|
|
def get_watched_rooms(self) -> list:
|
|
return self.config["watch_rooms"]
|
|
|
|
def get_notification_message(self) -> str:
|
|
return self.config["notification_message"]
|
|
|
|
def get_notification_room(self) -> str:
|
|
return self.config["announcement_room"]
|
|
|
|
def get_seen_users(self) -> list:
|
|
return self.config["seen_users"]
|
|
|
|
def get_pm_message(self) -> str:
|
|
return self.config["pm_message"]
|
|
|
|
def get_en_pm(self) -> str:
|
|
return self.config["en_pm"]
|
|
|
|
def get_no_pm(self) -> str:
|
|
return self.config["no_pm"]
|
|
|
|
def get_invite_codes(self) -> list:
|
|
return self.config["invite_codes"]
|
|
|
|
async def process_pm(self, evt: MessageEvent) -> None:
|
|
# Get the content of the message
|
|
message = evt.content.body
|
|
|
|
# Check what type of message this is
|
|
|
|
if "english" in message.lower():
|
|
nick = self.client.parse_user_id(evt.sender)[0]
|
|
response = self.get_en_pm().format(user=nick)
|
|
elif "norsk" in message.lower():
|
|
nick = self.client.parse_user_id(evt.sender)[0]
|
|
response = self.get_no_pm().format(user=nick)
|
|
else:
|
|
response = "I'm sorry Dave, I'm afraid I can't do that"
|
|
hasInvite = False
|
|
inviteTo = ''
|
|
|
|
# Check if the message is an invite code for a channel or room
|
|
invites = {item['key']: item['value'] for item in self.get_invite_codes()}
|
|
for key, value in invites.items():
|
|
if key in message:
|
|
hasInvite = True
|
|
inviteTo = value
|
|
|
|
# If invite was found, invite the user to the room/channel and set the appropriate response
|
|
if hasInvite:
|
|
response = "Invite found for channel or room <code>{room}</code>. I have sent you the invite, please accept 🙌".format(room=inviteTo)
|
|
await self.client.invite_user(inviteTo, evt.sender, "Invited through invitation code")
|
|
|
|
await self.client.send_text(evt.room_id, html=response)
|
|
|
|
@event.on(EventType.ROOM_MESSAGE)
|
|
async def handle_pm(self, evt: MessageEvent) -> None:
|
|
# Check if this is direct chat (PM) we created and that the message is not from the bot itself
|
|
if evt.room_id in self.pm_rooms and evt.sender != self.client.mxid:
|
|
await self.process_pm(evt)
|
|
|
|
@event.on(InternalEventType.JOIN)
|
|
async def greet(self, evt: StateEvent) -> None:
|
|
if evt.room_id in self.get_watched_rooms():
|
|
if evt.source & SyncStream.STATE:
|
|
return
|
|
else:
|
|
# Check if the user is in seen users
|
|
if evt.sender in self.get_seen_users():
|
|
return # We don't want to send a welcome message to a user which has already recieved one
|
|
|
|
nick = self.client.parse_user_id(evt.sender)[0]
|
|
msg = self.get_notification_message().format(userName=nick, userId=evt.sender)
|
|
await self.client.send_notice(self.get_notification_room(), msg)
|
|
|
|
# Send PM to user
|
|
pm_room = await self.client.create_room(is_direct=True, invitees=[evt.sender])
|
|
self.pm_rooms.add(pm_room) # Add the id of the pm room to the list
|
|
pm = self.get_pm_message()
|
|
await self.client.send_text(pm_room, pm)
|
|
|
|
# In the end, we add the user to the seen_users list so to not duplicate the welcome
|
|
self.config["seen_users"].append(evt.sender)
|
|
self.config.save()
|
|
else:
|
|
self.log.warning(f"No {evt.room_id} in {self.get_watched_rooms()}")
|
|
|
|
@classmethod
|
|
def get_config_class(cls) -> Type[BaseProxyConfig]:
|
|
return Config |