from typing import List, Union, Generator, Iterator from pydantic import BaseModel, Field from sqlalchemy import create_engine from sqlalchemy import text from urllib.parse import urlencode from openpyxl import load_workbook from openpyxl.styles import Alignment, Font from openpyxl.drawing.image import Image from copy import copy import pytz import logging import os import requests import json import time import datetime import ast logging.basicConfig(level=logging.DEBUG) class Pipeline: class Valves(BaseModel): OR_MODEL: str OR_URL: str OR_KEY: str DB_HOST: str DB_PORT: str DB_USER: str DB_PASSWORD: str DB_DATABASE: str DB_TABLES: List[str] KEYSTORE_URL: str KEYSTORE_API: str CLOUD_HOST: str CLOUD_API_VERSION: str CLOUD_EXCEL_LIBRARY: str CLOUD_USER: str CLOUD_PASS: str CLOUD_EXCEL_TEMPLATE_FILENAME: str TMP_STORAGE_DIR: str def __init__(self): self.name = "Østfold Milsim Medlemsdatabase" self.engine = None self.nlsql_response = "" self.last_emit_time = 0 self.valves = self.Valves( **{ "pipelines": ["*"], "OR_MODEL": os.getenv("OR_MODEL", "anthropic/claude-3.5-haiku:beta"), "OR_URL": os.getenv("OR_URL", "https://openrouter.ai/api/v1/chat/completions"), "OR_KEY": os.getenv("OR_KEY", "OPENROUTER_API_KEY"), "DB_HOST": os.getenv("DB_HOST", "elrond.outlands.lan"), "DB_PORT": os.getenv("DB_PORT", "3306"), "DB_USER": os.getenv("DB_USER", "polarpress_demo_dba"), "DB_PASSWORD": os.getenv("DB_PASSWORD", "YOUR_PASSWORD"), "DB_DATABASE": os.getenv("DB_DATABASE", "pp_polarpress_demo_prod"), "DB_TABLES": ["users", "club_memberships", "stripe_transactions", "vipps_transactions"], "KEYSTORE_URL": os.getenv("KEYSTORE_URL", "https://keystore.outlands.no"), "KEYSTORE_API": os.getenv("KEYSTORE_API", "123"), "CLOUD_HOST": os.getenv("CLOUD_HOST", "outlands.no"), "CLOUD_API_VERSION": os.getenv("CLOUD_API_VERSION", "api2"), "CLOUD_EXCEL_LIBRARY": os.getenv("CLOUD_EXCEL_LIBRARY", "exports"), "CLOUD_USER": os.getenv("CLOUD_USER", "username"), "CLOUD_PASS": os.getenv("CLOUD_PASS", "password"), "CLOUD_EXCEL_TEMPLATE_FILENAME": os.getenv("CLOUD_EXCEL_TEMPLATE_FILENAME", "oms_eksportmal.xls"), "TMP_STORAGE_DIR": os.getenv("TMP_STORAGE_DIR", "/home/heno/tmp"), } ) def init_db(self): try: self.engine = create_engine(f"mysql+mysqldb://{self.valves.DB_USER}:{self.valves.DB_PASSWORD}@{self.valves.DB_HOST}:{self.valves.DB_PORT}/{self.valves.DB_DATABASE}") print(f"Connection to MariaDB database {self.valves.DB_DATABASE} on host {self.valves.DB_HOST} established") except Exception as e: print(f"Error connecting to MariaDB: {e}") return self.engine async def on_startup(self): self.init_db() await self.sea_login() await self.sea_check_library() await self.sea_link() async def on_shutdown(self): pass async def sea_login(self): try: headers= { 'Content-Type': 'application/x-www-form-urlencoded' } data = urlencode({'username': self.valves.CLOUD_USER, 'password': self.valves.CLOUD_PASS}) response = requests.post( f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/auth-token/", data=data, headers=headers ) try: data = response.json() except json.JSONDecodeError: logging.error("Failed to decode json response connecting to the cloud") return False if 'non_field_errors' in data: if data['non_field_errors'][0] == 'Unable to login with provided credentials.': logging.error(f"Invalid username or password for user {self.valves.CLOUD_USER} on seafile host {self.valves.CLOUD_HOST} provided. Could not get auth token") return False elif 'token' in data: self.token = data['token'] print(f"Login to seafile cloud host {self.valves.CLOUD_HOST} successful") return True else: logging.error("Unexpected response from seafile server") return False except requests.RequestException as e: logging.error(f"Unable to connect to seafile cloud {self.valves.CLOUD_HOST}. Error: {e}") return False async def sea_check_library(self) -> bool: try: headers = { 'Authorization': f"Token {self.token}", 'Accept': 'application/json; indent=4' } params = { 'type': 'mine' } response = requests.get( f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/repos/", headers=headers, params=params ) res = response.json() library_exists = False library_id = None writable = True for library in res: if library['name'] == self.valves.CLOUD_EXCEL_LIBRARY: library_exists = True library_id = library['id'] if library.get('encrypted') or library.get('permission') == 'r': writable = False if not writable: logging.error(f"The library {self.valves.CLOUD_EXCEL_LIBRARY} exists but is is not writable. Log in to {self.valves.CLOUD_HOST} from your browser and check it's permission settings") return False if not library_exists: make_library_response = requests.post( f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/repos/", headers=headers, json={ 'name': self.valves.CLOUD_EXCEL_LIBRARY } ) if make_library_response.status_code == 400: logging.error(f"Could not create neccessary library {self.valves.CLOUD_EXCEL_LIBRARY} on your {self.valves.CLOUD_HOST} account. Contact a system administrator") return False elif make_library_response.status_code == 520: logging.error(f"Could not create neccessary library {self.valves.CLOUD_EXCEL_LIBRARY} on your {self.valves.CLOUD_HOST} account. Contact a system administrator") return False self.library_id = make_library_response.json()['repo_id'] print(f"The library {self.valves.CLOUD_EXCEL_LIBRARY} was created", fg='yellow') return True self.library_id = library_id print(f"The library {self.valves.CLOUD_EXCEL_LIBRARY} exists, no need to create") return True except requests.RequestException as e: logging.error(f"Unable to connect to cloud {self.valves.CLOUD_HOST}. Error: {e}") return False async def sea_link(self) -> bool: try: response = requests.get( f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/repos/{self.library_id}/upload-link/?p=/", headers={'Authorization': 'Token {token}'.format(token=self.token)} ) if response.status_code == 403: logging.error(f"Lacking permissions to upload to library '{self.library_id}'. Doublecheck permissions by logging into https://{self.valves.CLOUD_HOST} and go to the library to check it's settings") return False elif response.status_code == 500: logging.error(f"Could not get permission to upload. Check that you have enough storage permissions left on your {self.valves.CLOUD_HOST} account, and if not try to delete some old files first.") return False self.upload_link = response.json() print(f"Recieved go ahead for upload on {self.valves.CLOUD_HOST}") return True except requests.RequestException as e: logging.error(f"Unable to connect to cloud {self.valves.CLOUD_HOST}. Error: {e}") return False def sea_upload(self, path: str, file_name: str) -> bool: if not os.path.isfile(path): print(f"Cannot read and upload '{path}'") return False try: requests.post( self.upload_link, data={'filename': file_name, 'parent_dir': '/'}, files={'file': open(path, 'rb')}, headers={'Authorization': 'Token {token}'.format(token=self.token)} ) print(f"Excel file '{file_name}' successfully uploaded and stored to your {self.valves.CLOUD_HOST}") return True except requests.RequestException as e: logging.error(f"Unable to upload file to {self.valves.CLOUD_HOST}. Error: {e}") return False def sea_share(self, file: str): try: headers = { 'Authorization': f'Token {self.token}', 'Content-Type': 'application/json', 'Accept': 'application/json; indent=4' } json_data = { 'repo_id': self.library_id, 'path': f'/{file}', 'permissions': { 'can_edit': False, 'can_download': True }, 'expire_days': 1 } response = requests.post( f"https://{self.valves.CLOUD_HOST}/api/v2.1/share-links/", json=json_data, headers=headers ) res = response.json() if response.status_code < 300: print(f"Excel file '{file}' successfully shared: {res['link']}") return res["link"] else: return False except requests.RequestException as e: logging.error(f"Unable to share {file}. Error: {e}") return False # def check_cloud_token # # userid (str): Brukerens UUID # # Returnerer: bool # Returnerer True hvis brukerid ble funnet på keystore OG token # er lagret OG updated nøkkelen i token ikke er eldre enn 45 minutter # # Beskrivelse: # Denne funksjonen sjekker om brukeren har en Outlands tilgangstoken # for excel regneark opplasting og fildeling def check_cloud_token(self, userid: str) -> bool: try: headers = { 'Content-Type': 'application/json', 'Accept': 'application/json; indent=4', 'X-API-KEY': self.valves.KEYSTORE_API } response = requests.get( f"{self.valves.KEYSTORE_URL}/api/post/{userid}/store/seatoken", headers=headers ) if response.status_code == 404: return False res = response.json() if 'updated_at' in res: updated_str = res['updated'] updated_datetime = datetime.datetime.strptime(updated_str, '%Y-%m-%d %H:%M:%S') timezone = pytz.timezone('Europe/Oslo') updated_datetime = timezone.localize(updated_datetime) now = datetime.datetime.now(timezone) time_difference = now - updated_datetime return time_difference > datetime.timedelta(minutes=45) return False except requests.RequestException as e: logging.error(f"Error checking keystore for user auth token: {e}") return False # def new_cloud_token # # userid (str): Brukerens UUID # token (str): Seafile autentiseringstoken # # Returnerer True hvis vellykket # # Beskrivelse: # Denne funksjonen oppretter/oppdaterer # brukerens seafile autentiseringstoken # til nøkkelserveren def update_cloud_token(self, userid: str, value_data: dict) ->bool: try: headers = { 'Content-Type': 'application/json', 'Accept': 'application/json; indent=4', 'X-API-KEY': self.valves.KEYSTORE_API } json_data = { 'body': userid, 'key': 'sea_token', 'value': json.dumps(value_data) } response = requests.post( f"{self.valves.KEYSTORE_URL}/api/post/store", json=json_data, headers=headers ) if response.status_code == 200: return True return False except requests.RequestException as e: logging.error(f"Error updating keystore user auth token: {e}") return False # def check_cloud_token # # userid (str): Brukerens UUID # token (str): Seafile autentiseringstoken # # Returnerer True hvis seafile token er gyldig # # Beskrivelse: # Denne funksjonen sjekker seafile nettsky om brukerens # autentiseringstoken er gyldig og kan brukes ved å # hente brukerprofilen ved å bruke token def check_cloud_user_profile(self, token: str) ->bool: try: headers = { 'Authorization': f"Token {token}", 'Accept': 'application/json; indent=4' } response = requests.get( f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/user/", headers=headers ) if response.status_code == 200: return True return False except requests.RequestException as e: logging.error(f"Error trying user token on seafile server: {e}") return False # def get_cloud_token # # userid (str): Brukerens UUID # # Returnerer string # # Beskrivelse: # Denne funksjonen henter brukerens autentiseringstoken fra # nøkkelserveren def get_cloud_token(self, userid: str) ->str: try: headers = { 'Content-Type': 'application/json', 'Accept': 'application/json; indent=4', 'X-API-KEY': self.valves.KEYSTORE_API } response = requests.get( f"{self.valves.KEYSTORE_URL}/api/post/{userid}/store/sea_token", headers=headers ) if response.status_code == 404: return False res = response.json() if 'value' in res: value = json.loads(res['value']) return value['token'] return False except requests.RequestException as e: logging.error(f"Error checking keystore for user auth token: {e}") return False def run_llm_query(self, message: str): try: response = requests.post( url = self.valves.OR_URL, headers = { "Authorization": f"Bearer {self.valves.OR_KEY}" }, data = json.dumps({ "model": self.valves.OR_MODEL, "messages": [ { "role": "user", "content": message } ] }) ) if response.status_code == 200: response_data = response.json() sql_statement = response_data.get("choices", [{}])[0].get("message", {}).get("content") if sql_statement: return {"success": True, "data": sql_statement} else: logging.error("Response did not contain SQL statement.") return {"success": False, "data": "No SQL statement in response."} else: logging.error(f"Error response {response.status_code}: {response.text}") return {"success": False, "data": f"Error: {response.status_code}"} except requests.HTTPError as e: logging.error(f"Clientresponse error: {e}") return {"success": False, "data": "HTTP backend error"} except Exception as e: logging.error(f"Unexpected error: {e}") return {"success": False, "data": f"Unexpected error: {e}"} def run_mysql_query(self, query: str): try: with self.engine.connect() as connection: result = connection.execute(text(query)) rows = result.fetchall() return rows except Exception as e: return {"error": str(e)} def give_excel_book_description(self, message: str, query: str, result: str): llm_instructions = f""" Based on the input question, MySQL query and query result, give a nice text summary in NORWEGIAN of the data you have extracted from the database. The text will be used in a Excel spreadsheet export Input question {message} MySQLQuery: {query} Query result: {result} """ return llm_instructions def generate_table_headers(self, message: str, query: str, result: str): llm_instructions = f""" Based on the input question, MySQL query and query result, give the columns in Query result the appropriate header names translated to NORWEGIAN. IMPORTANT: Return a python list formatted as string with ONLY the column header names, not the resut of the query result. DO NOT encapsulate your reply in markdown or any other formatting beside the python list formatted as a string. Example of valid string to generate: ["ID", "Navn", "Epostadresse", "Dato opprettet"] Input question {message} MySQLQuery: {query} Query result: {result} Python list formatted string of table header names for columns as they appear in "Query result": """ return llm_instructions def reformat_data(self, message: str, query: str, result: str): llm_reformat_instructions = f""" Given an input question, create a syntactically correct mysql query to run. You have 4 tables to work with: 1. users - the users table holds the user records and contain the columns id, name, email, created_at, updated_at 2. club_memberships - the user membership table containing the users club memberships and contain the columns id, user_id, valid_from, valid_to, renew, cancelled, credited, price, payment_method, created_at, updated_at. The valid_from and valid_to columns in the club_memberships table is a varchar with a date in the format 'dd.mm.YYYY' and contains a timespan (from and to) when the membership is valid. The columns cancelled and credited are boolean columns and if any of these are true, the membership is not valid. Cancelled means that their membership is cancelled and credited means that the membership is refunded (and cancelled). 3. stripe_transactions - all the transactions of the users that has been processed by stripe. The table contains the columns id, user_id, event_signup_id, processed, amount, phone_number, created_at and updated_at. The columns processed is a boolean value and if this is set to false, the stripe_transaction is not valid (the user has not been charged.) Amount is a double (8,2) containing the amount the user has been charged, phone_number contains the users phone number. If the column event_signup_id is not null, the transaction does not pertain to a membership payment 4. vipps_transactions - all the transactions of the users that has been processed by vipps. The table contains the columns id, user_id, event_signup_id, processed, amount, phone_number, created_at and updated_at. The columns are identical to stripe_transaction columns. Always run queries with INNER JOIN on users.id and the other tables user_id in order to get which user that belongs to the other tables. You should use DISTINCT statements to avoid returning duplicates wherever possible. Pay attention to use only the column names that I have provided. Currency for the amount columns will always be in NORWEGIAN (NOK). The input question can be in english or norwegian. Always reply in norwegian if the input question is in norwegian or english if the input question is in english. Input question: {message} MySQLQuery: {query} Query result: {result} Excellent. We now have the result from the query. As you can see, the "Query result:" has been formatted in a Python list of tuples. Take this string and reformat it in markdown. Please translate all the column names to Norwegian. """ return llm_reformat_instructions def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]: llm_initial_instructions = f""" Given an input question, create a syntactically correct mysql query to run. You have 4 tables to work with: 1. users - the users table holds the user records and contain the columns id, name, email, created_at, updated_at 2. club_memberships - the user membership table containing the users club memberships and contain the columns id, user_id, valid_from, valid_to, renew, cancelled, credited, price, payment_method, created_at, updated_at. The valid_from and valid_to columns in the club_memberships table is a varchar with a date in the format 'dd.mm.YYYY' and contains a timespan (from and to) when the membership is valid. The columns cancelled and credited are boolean columns and if any of these are true, the membership is not valid. Cancelled means that their membership is cancelled and credited means that the membership is refunded (and cancelled). 3. stripe_transactions - all the transactions of the users that has been processed by stripe. The table contains the columns id, user_id, event_signup_id, processed, amount, phone_number, created_at and updated_at. The columns processed is a boolean value and if this is set to false, the stripe_transaction is not valid (the user has not been charged.) Amount is a double (8,2) containing the amount the user has been charged, phone_number contains the users phone number. If the column event_signup_id is not null, the transaction does not pertain to a membership payment 4. vipps_transactions - all the transactions of the users that has been processed by vipps. The table contains the columns id, user_id, event_signup_id, processed, amount, phone_number, created_at and updated_at. The columns are identical to stripe_transaction columns. Always run queries with INNER JOIN on users.id and the other tables user_id in order to get which user that belongs to the other tables. You should use DISTINCT statements to avoid returning duplicates wherever possible. Pay attention to use only the column names that I have provided. Currency for the amount columns will always be in NORWEGIAN (NOK). The input question can be in english or norwegian. Always reply in norwegian if the input question is in norwegian or english if the input question is in english. IMPORTANT: ONLY respond with a syntactically correct mysql query. DO NOT respond with a description of what you have done. ONLY reply with the SQL query. DO NOT format the query with markup. ONLY provide a string containing the SQL query you assemble from the Input question. DO NOT translate any of the column names in the table for the query. Input question: {user_message} MySQLQuery: """ keyserver_data = { 'user_name': body['user']['name'], 'user_email': body['user']['email'], 'user_role': body['user']['role'], 'token': self.token } # Vi sjekker om body['stream'] er sann som indikerer at pipelinen kjører brukerens forespørsel if body['stream']: # Først sjekker vi om vi har en gyldig autentiserings token på nettskyen vi kan lagre # Excel regnearket i if self.check_cloud_token(body['user']['id']): current_token = self.get_cloud_token(body['user']['id']) # Hent nåværende autentiserings token if self.check_cloud_user_profile(current_token): self.token = current_token # Den nåværende tokenen er gyldig, bruk denne til å laste opp og dele filer else: self.sea_login() keyserver_data['token'] = self.token self.update_cloud_token(body['user']['id'], keyserver_data) else: self.sea_login() keyserver_data['token'] = self.token self.update_cloud_token(body['user']['id'], keyserver_data) # Vi kjører første inferens med instruksjonene i llm_initial_instructions for å fortelle # KI modellen at vi ønsker en SQL spørring til svar basert på user_message (det brukeren # skrev/spurte i chatten) initial = self.run_llm_query(llm_initial_instructions) # Hvis inferensen er vellykket, fortsett til neste trinn if initial["success"]: # Query inneholder SQL spørringen som er generert av KI modellen query = initial["data"] # Vi kjører SQL spørrringen og lagrer resultatet og rådataene i en python toupple datasett query_result = self.run_mysql_query(query) # Instruer KI om å generere en beskrivelse for regnearket, samt kolonnenavn instruction_result = self.run_llm_query( self.give_excel_book_description(user_message, query, str(query_result)) ) reply_description = instruction_result["data"] instruction_result = self.run_llm_query( self.generate_table_headers(user_message, query, str(query_result)) ) header_names = instruction_result["data"] # Legg til dataene i excel arket original_file_path = f"{self.valves.TMP_STORAGE_DIR}/{self.valves.CLOUD_EXCEL_TEMPLATE_FILENAME}" excel_start_row = 4 wb = load_workbook(original_file_path) ws = wb.active # Kopier logoen til ØMS til det nye regnearket img = Image(f"{self.valves.TMP_STORAGE_DIR}/oms-glow.png") img.height = 145 img.width = 139 img.anchor = 'A1' ws.add_image(img) # Sett overskrift headline = "Østfold Milsim KI dataeksport" ws.merge_cells('B1:E1') cell = ws.cell(row=1, column=2, value=headline) cell.alignment = Alignment(horizontal='left', vertical='top', wrap_text=True) cell.font = Font(size=22, bold=True) # Siden reply_description er en forhåndsvis stort volum med tekst, juster stilen og egenskapene til cellen ws.merge_cells('A2:E2') cell = ws.cell(row=2,column=1, value=reply_description) cell.alignment = Alignment(horizontal='left', vertical='top', wrap_text=True) cell.font = Font(size=10) ws.row_dimensions[2].height = 300 ws.row_dimensions[1].height = 145 header_list = ast.literal_eval(header_names) for i, row_data in enumerate(header_list, start=1): cell = ws.cell(row=3, column=i, value=row_data) cell.font = Font(bold=True) for i, row_data in enumerate(query_result, start=excel_start_row): for j, value in enumerate(row_data, start=1): # Column A corresponds to index 1 ws.cell(row=i, column=j, value=value) # Lagre og last opp excel arket til skyen timestamp = int(time.time() * 1000) # Datotidsstempel i UNIX tid med millisekunder for å gi filenavnet et unikt navn new_file_name = f"ny_eksport_{timestamp}.xlsx" new_file_path = f'{self.valves.TMP_STORAGE_DIR}/{new_file_name}' wb.save(new_file_path) self.sea_upload(new_file_path, new_file_name) # Instruer skyen om å opprette en deling av filen share_file = self.sea_share(new_file_name) # Hvis det oppstod en syntaksfeil eller annen feil med spørringen, returner feilen i chatten if isinstance(query_result, dict) and "error" in query_result: return f"Error occurred: {query_result['error']}. Initial data: {initial['data']}" # formatted vil inneholde andre trinns instruksjoner for KI modellen, inkludert: # 1. user_message - brukerens innledende forespørsel til KI modellen # 2. query - SQL spørringen KI modellen genererte i første inferens trinn # 3. query_result - Rådata fra databasen formatted = self.reformat_data(user_message, query, str(query_result)) # Vi kjører andre inferens med andre trinns instruksjoner for KI modellen formatted_result = self.run_llm_query(formatted) # Data vil inneholde KI modellens svar fra andre inferens trinn data = formatted_result["data"] # Legg ved linken til excel arket i chatten if share_file: data += f"\n\n## Eksport:\nI tilfelle resultatet fra spørringen overgår begrensningene i antall tokens i svaret, har jeg eksportert dataene du spurte om i et eget excel ark hvor du kan se hele datasettet. Du kan laste ned Excel arket her:\n1. [Regneark delelink på outlands.no]({share_file})" # Debug data # data += f"\n\n\n```{body}```" print(str(body)) # Hvis resultatet er vellykket returner andre inferens trinn svar i chatten if formatted_result["success"]: return data return f"Error occured: {data}" # Hvis første trinn av inferensen ikke var velykket, returner feilmelding i chatten else: data = initial["data"] return f"Error occured: {data}" # Hvis body['stream'] ikke er sann, indikerer det at open webui kjører inferens for å generere # tittel for chatten, eller autocomplete. I så tilfelle vil vi bare kjøre inferens på # brukerens melding og ikke selve databaseforespørselen else: response = self.run_llm_query(user_message) return response["data"]