From f5c34d9b0f9936d68fc1c8910bc85c1cbbcc9ea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Helge-Mikael=20Nordg=C3=A5rd?= Date: Sat, 8 Feb 2025 13:35:59 +0100 Subject: [PATCH] Added Export to Excel, uploadable to Outlands cloud with share link --- 02_oms_sql_pipeline.py | 368 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 345 insertions(+), 23 deletions(-) diff --git a/02_oms_sql_pipeline.py b/02_oms_sql_pipeline.py index a80e386..55e7798 100644 --- a/02_oms_sql_pipeline.py +++ b/02_oms_sql_pipeline.py @@ -1,12 +1,18 @@ -from typing import List, Union, Generator, Iterator, Dict -from pydantic import BaseModel +from typing import List, Union, Generator, Iterator +from pydantic import BaseModel, Field from sqlalchemy import create_engine from sqlalchemy import text -import pylightxl as xl +from urllib.parse import urlencode +from openpyxl import load_workbook +from openpyxl.styles import Alignment, Font +from openpyxl.drawing.image import Image +from copy import copy import logging import os import requests import json +import time +import ast logging.basicConfig(level=logging.DEBUG) @@ -23,12 +29,23 @@ class Pipeline: DB_DATABASE: str DB_TABLES: List[str] - XL_TEMPLATE_PATH: str + KEYSTORE_URL: str + KEYSTORE_API: str + + CLOUD_HOST: str + CLOUD_API_VERSION: str + CLOUD_EXCEL_LIBRARY: str + CLOUD_USER: str + CLOUD_PASS: str + CLOUD_EXCEL_TEMPLATE_FILENAME: str + + TMP_STORAGE_DIR: str def __init__(self): - self.name = "ØMS Membership Database" + self.name = "01 PolarPress Database RAG" self.engine = None self.nlsql_response = "" + self.last_emit_time = 0 self.valves = self.Valves( **{ "pipelines": ["*"], @@ -41,7 +58,15 @@ class Pipeline: "DB_PASSWORD": os.getenv("DB_PASSWORD", "YOUR_PASSWORD"), "DB_DATABASE": os.getenv("DB_DATABASE", "pp_polarpress_demo_prod"), "DB_TABLES": ["users", "club_memberships", "stripe_transactions", "vipps_transactions"], - "XL_TEMPLATE_PATH": os.getenv("XL_TEMPLATE_PATH", "/var/support/openwebui/xl_templates/oms_dataexport.xlsx") + "KEYSTORE_URL": os.getenv("KEYSTORE_URL", "https://keystore.outlands.no"), + "KEYSTORE_API": os.getenv("KEYSTORE_API", "123"), + "CLOUD_HOST": os.getenv("CLOUD_HOST", "outlands.no"), + "CLOUD_API_VERSION": os.getenv("CLOUD_API_VERSION", "api2"), + "CLOUD_EXCEL_LIBRARY": os.getenv("CLOUD_EXCEL_LIBRARY", "exports"), + "CLOUD_USER": os.getenv("CLOUD_USER", "username"), + "CLOUD_PASS": os.getenv("CLOUD_PASS", "password"), + "CLOUD_EXCEL_TEMPLATE_FILENAME": os.getenv("CLOUD_EXCEL_TEMPLATE_FILENAME", "oms_eksportmal.xls"), + "TMP_STORAGE_DIR": os.getenv("TMP_STORAGE_DIR", "/home/heno/tmp"), } ) @@ -56,10 +81,181 @@ class Pipeline: async def on_startup(self): self.init_db() + await self.sea_login() + await self.sea_check_library() + await self.sea_link() async def on_shutdown(self): pass + async def sea_login(self): + try: + headers= { + 'Content-Type': 'application/x-www-form-urlencoded' + } + data = urlencode({'username': self.valves.CLOUD_USER, 'password': self.valves.CLOUD_PASS}) + + response = requests.post( + f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/auth-token/", + data=data, + headers=headers + ) + + try: + data = response.json() + except json.JSONDecodeError: + logging.error("Failed to decode json response connecting to the cloud") + return False + + if 'non_field_errors' in data: + if data['non_field_errors'][0] == 'Unable to login with provided credentials.': + logging.error(f"Invalid username or password for user {self.valves.CLOUD_USER} on seafile host {self.valves.CLOUD_HOST} provided. Could not get auth token") + return False + elif 'token' in data: + self.token = data['token'] + print(f"Login to seafile cloud host {self.valves.CLOUD_HOST} successful") + return True + else: + logging.error("Unexpected response from seafile server") + return False + except requests.RequestException as e: + logging.error(f"Unable to connect to seafile cloud {self.valves.CLOUD_HOST}. Error: {e}") + return False + + async def sea_check_library(self) -> bool: + try: + headers = { + 'Authorization': f"Token {self.token}", + 'Accept': 'application/json; indent=4' + } + + params = { + 'type': 'mine' + } + + response = requests.get( + f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/repos/", + headers=headers, + params=params + ) + + res = response.json() + library_exists = False + library_id = None + writable = True + + for library in res: + if library['name'] == self.valves.CLOUD_EXCEL_LIBRARY: + library_exists = True + library_id = library['id'] + if library.get('encrypted') or library.get('permission') == 'r': + writable = False + + if not writable: + logging.error(f"The library {self.valves.CLOUD_EXCEL_LIBRARY} exists but is is not writable. Log in to {self.valves.CLOUD_HOST} from your browser and check it's permission settings") + return False + + if not library_exists: + make_library_response = requests.post( + f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/repos/", + headers=headers, + json={ + 'name': self.valves.CLOUD_EXCEL_LIBRARY + } + ) + + if make_library_response.status_code == 400: + logging.error(f"Could not create neccessary library {self.valves.CLOUD_EXCEL_LIBRARY} on your {self.valves.CLOUD_HOST} account. Contact a system administrator") + return False + elif make_library_response.status_code == 520: + logging.error(f"Could not create neccessary library {self.valves.CLOUD_EXCEL_LIBRARY} on your {self.valves.CLOUD_HOST} account. Contact a system administrator") + return False + + self.library_id = make_library_response.json()['repo_id'] + print(f"The library {self.valves.CLOUD_EXCEL_LIBRARY} was created", fg='yellow') + return True + + self.library_id = library_id + print(f"The library {self.valves.CLOUD_EXCEL_LIBRARY} exists, no need to create") + return True + except requests.RequestException as e: + logging.error(f"Unable to connect to cloud {self.valves.CLOUD_HOST}. Error: {e}") + return False + + async def sea_link(self) -> bool: + try: + response = requests.get( + f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/repos/{self.library_id}/upload-link/?p=/", + headers={'Authorization': 'Token {token}'.format(token=self.token)} + ) + + if response.status_code == 403: + logging.error(f"Lacking permissions to upload to library '{self.library_id}'. Doublecheck permissions by logging into https://{self.valves.CLOUD_HOST} and go to the library to check it's settings") + return False + elif response.status_code == 500: + logging.error(f"Could not get permission to upload. Check that you have enough storage permissions left on your {self.valves.CLOUD_HOST} account, and if not try to delete some old files first.") + return False + + self.upload_link = response.json() + print(f"Recieved go ahead for upload on {self.valves.CLOUD_HOST}") + return True + except requests.RequestException as e: + logging.error(f"Unable to connect to cloud {self.valves.CLOUD_HOST}. Error: {e}") + return False + + def sea_upload(self, path: str, file_name: str) -> bool: + if not os.path.isfile(path): + print(f"Cannot read and upload '{path}'") + return False + + try: + requests.post( + self.upload_link, data={'filename': file_name, 'parent_dir': '/'}, + files={'file': open(path, 'rb')}, + headers={'Authorization': 'Token {token}'.format(token=self.token)} + ) + + print(f"Excel file '{file_name}' successfully uploaded and stored to your {self.valves.CLOUD_HOST}") + return True + except requests.RequestException as e: + logging.error(f"Unable to upload file to {self.valves.CLOUD_HOST}. Error: {e}") + return False + + def sea_share(self, file: str): + try: + headers = { + 'Authorization': f'Token {self.token}', + 'Content-Type': 'application/json', + 'Accept': 'application/json; indent=4' + } + + json_data = { + 'repo_id': self.library_id, + 'path': f'/{file}', + 'permissions': { + 'can_edit': False, + 'can_download': True + }, + 'expire_days': 1 + } + + response = requests.post( + f"https://{self.valves.CLOUD_HOST}/api/v2.1/share-links/", + json=json_data, + headers=headers + ) + + res = response.json() + + if response.status_code < 300: + print(f"Excel file '{file}' successfully shared: {res['link']}") + return res["link"] + else: + return False + except requests.RequestException as e: + logging.error(f"Unable to share {file}. Error: {e}") + return False + def run_llm_query(self, message: str): try: response = requests.post( @@ -103,10 +299,40 @@ class Pipeline: with self.engine.connect() as connection: result = connection.execute(text(query)) rows = result.fetchall() - return str(rows) + return rows except Exception as e: return {"error": str(e)} + def give_excel_book_description(self, message: str, query: str, result: str): + llm_instructions = f""" + Based on the input question, MySQL query and query result, give a nice text summary in NORWEGIAN + of the data you have extracted from the database. The text will be used in a Excel spreadsheet export + + Input question {message} + MySQLQuery: {query} + Query result: {result} + """ + + return llm_instructions + + def generate_table_headers(self, message: str, query: str, result: str): + llm_instructions = f""" + Based on the input question, MySQL query and query result, give the columns in Query result the appropriate + header names translated to NORWEGIAN. + + IMPORTANT: Return a python list formatted as string with ONLY the column header names, not the resut of the query result. + DO NOT encapsulate your reply in markdown or any other formatting beside the python list formatted as a string. + Example of valid string to generate: ["ID", "Navn", "Epostadresse", "Dato opprettet"] + + Input question {message} + MySQLQuery: {query} + Query result: {result} + + Python list formatted string of table header names for columns as they appear in "Query result": + """ + + return llm_instructions + def reformat_data(self, message: str, query: str, result: str): llm_reformat_instructions = f""" Given an input question, create a syntactically correct mysql query to run. You have 4 tables to work with: @@ -178,22 +404,118 @@ class Pipeline: MySQLQuery: """ - initial = self.run_llm_query(llm_initial_instructions) - if initial["success"]: - query = initial["data"] - query_result = self.run_mysql_query(query) - - if isinstance(query_result, dict) and "error" in query_result: - return f"Error occurred: {query_result['error']}. Initial data: {initial['data']}" - - formatted = self.reformat_data(user_message, query, query_result) - formatted_result = self.run_llm_query(formatted) + # Vi sjekker om body['stream'] er sann som indikerer at pipelinen kjører brukerens forespørsel + if body['stream']: - data = formatted_result["data"] - if formatted_result["success"]: - return data + # Vi kjører første inferens med instruksjonene i llm_initial_instructions for å fortelle + # KI modellen at vi ønsker en SQL spørring til svar basert på user_message (det brukeren + # skrev/spurte i chatten) + initial = self.run_llm_query(llm_initial_instructions) + + # Hvis inferensen er vellykket, fortsett til neste trinn + if initial["success"]: + + # Query inneholder SQL spørringen som er generert av KI modellen + query = initial["data"] + + # Vi kjører SQL spørrringen og lagrer resultatet og rådataene i en python toupple datasett + query_result = self.run_mysql_query(query) + + # Instruer KI om å generere en beskrivelse for regnearket, samt kolonnenavn + instruction_result = self.run_llm_query( + self.give_excel_book_description(user_message, query, str(query_result)) + ) + + reply_description = instruction_result["data"] + + instruction_result = self.run_llm_query( + self.generate_table_headers(user_message, query, str(query_result)) + ) + + header_names = instruction_result["data"] + + # Legg til dataene i excel arket + original_file_path = f"{self.valves.TMP_STORAGE_DIR}/{self.valves.CLOUD_EXCEL_TEMPLATE_FILENAME}" + excel_start_row = 4 + wb = load_workbook(original_file_path) + ws = wb.active + + # Kopier logoen til ØMS til det nye regnearket + img = Image(f"{self.valves.TMP_STORAGE_DIR}/oms-glow.png") + img.height = 145 + img.width = 139 + img.anchor = 'A1' + ws.add_image(img) + + + # Sett overskrift + headline = "Østfold Milsim KI dataeksport" + ws.merge_cells('B1:E1') + cell = ws.cell(row=1, column=2, value=headline) + cell.alignment = Alignment(horizontal='left', vertical='top', wrap_text=True) + cell.font = Font(size=22, bold=True) + + # Siden reply_description er en forhåndsvis stort volum med tekst, juster stilen og egenskapene til cellen + ws.merge_cells('A2:E2') + cell = ws.cell(row=2,column=1, value=reply_description) + cell.alignment = Alignment(horizontal='left', vertical='top', wrap_text=True) + cell.font = Font(size=10) + ws.row_dimensions[2].height = 300 + ws.row_dimensions[1].height = 145 + + header_list = ast.literal_eval(header_names) + for i, row_data in enumerate(header_list, start=1): + cell = ws.cell(row=3, column=i, value=row_data) + cell.font = Font(bold=True) + + for i, row_data in enumerate(query_result, start=excel_start_row): + for j, value in enumerate(row_data, start=1): # Column A corresponds to index 1 + ws.cell(row=i, column=j, value=value) + + # Lagre og last opp excel arket til skyen + timestamp = int(time.time() * 1000) # Datotidsstempel i UNIX tid med millisekunder for å gi filenavnet et unikt navn + new_file_name = f"ny_eksport_{timestamp}.xlsx" + new_file_path = f'{self.valves.TMP_STORAGE_DIR}/{new_file_name}' + wb.save(new_file_path) + self.sea_upload(new_file_path, new_file_name) + + # Instruer skyen om å opprette en deling av filen + share_file = self.sea_share(new_file_name) + + # Hvis det oppstod en syntaksfeil eller annen feil med spørringen, returner feilen i chatten + if isinstance(query_result, dict) and "error" in query_result: + return f"Error occurred: {query_result['error']}. Initial data: {initial['data']}" + + # formatted vil inneholde andre trinns instruksjoner for KI modellen, inkludert: + # 1. user_message - brukerens innledende forespørsel til KI modellen + # 2. query - SQL spørringen KI modellen genererte i første inferens trinn + # 3. query_result - Rådata fra databasen + formatted = self.reformat_data(user_message, query, str(query_result)) + + # Vi kjører andre inferens med andre trinns instruksjoner for KI modellen + formatted_result = self.run_llm_query(formatted) + + # Data vil inneholde KI modellens svar fra andre inferens trinn + data = formatted_result["data"] + + # Legg ved linken til excel arket i chatten + if share_file: + data += f"\n\n## Eksport:\nI tilfelle resultatet fra spørringen overgår begrensningene i antall tokens i svaret, har jeg eksportert dataene du spurte om i et eget excel ark hvor du kan se hele datasettet. Du kan laste ned Excel arket her:\n1. [Regneark delelink på outlands.no]({share_file})" + + # Hvis resultatet er vellykket returner andre inferens trinn svar i chatten + if formatted_result["success"]: + return data + + return f"Error occured: {data}" - return f"Error occured: {data}" + # Hvis første trinn av inferensen ikke var velykket, returner feilmelding i chatten + else: + data = initial["data"] + return f"Error occured: {data}" + + # Hvis body['stream'] ikke er sann, indikerer det at open webui kjører inferens for å generere + # tittel for chatten, eller autocomplete. I så tilfelle vil vi bare kjøre inferens på + # brukerens melding og ikke selve databaseforespørselen else: - data = initial["data"] - return f"Error occured: {data}" \ No newline at end of file + response = self.run_llm_query(user_message) + return response["data"]