Openroutertestpipeline/02_oms_sql_pipeline.py

522 lines
25 KiB
Python

from typing import List, Union, Generator, Iterator
from pydantic import BaseModel, Field
from sqlalchemy import create_engine
from sqlalchemy import text
from urllib.parse import urlencode
from openpyxl import load_workbook
from openpyxl.styles import Alignment, Font
from openpyxl.drawing.image import Image
from copy import copy
import logging
import os
import requests
import json
import time
import ast
logging.basicConfig(level=logging.DEBUG)
class Pipeline:
class Valves(BaseModel):
OR_MODEL: str
OR_URL: str
OR_KEY: str
DB_HOST: str
DB_PORT: str
DB_USER: str
DB_PASSWORD: str
DB_DATABASE: str
DB_TABLES: List[str]
KEYSTORE_URL: str
KEYSTORE_API: str
CLOUD_HOST: str
CLOUD_API_VERSION: str
CLOUD_EXCEL_LIBRARY: str
CLOUD_USER: str
CLOUD_PASS: str
CLOUD_EXCEL_TEMPLATE_FILENAME: str
TMP_STORAGE_DIR: str
def __init__(self):
self.name = "01 PolarPress Database RAG"
self.engine = None
self.nlsql_response = ""
self.last_emit_time = 0
self.valves = self.Valves(
**{
"pipelines": ["*"],
"OR_MODEL": os.getenv("OR_MODEL", "anthropic/claude-3.5-haiku:beta"),
"OR_URL": os.getenv("OR_URL", "https://openrouter.ai/api/v1/chat/completions"),
"OR_KEY": os.getenv("OR_KEY", "OPENROUTER_API_KEY"),
"DB_HOST": os.getenv("DB_HOST", "elrond.outlands.lan"),
"DB_PORT": os.getenv("DB_PORT", "3306"),
"DB_USER": os.getenv("DB_USER", "polarpress_demo_dba"),
"DB_PASSWORD": os.getenv("DB_PASSWORD", "YOUR_PASSWORD"),
"DB_DATABASE": os.getenv("DB_DATABASE", "pp_polarpress_demo_prod"),
"DB_TABLES": ["users", "club_memberships", "stripe_transactions", "vipps_transactions"],
"KEYSTORE_URL": os.getenv("KEYSTORE_URL", "https://keystore.outlands.no"),
"KEYSTORE_API": os.getenv("KEYSTORE_API", "123"),
"CLOUD_HOST": os.getenv("CLOUD_HOST", "outlands.no"),
"CLOUD_API_VERSION": os.getenv("CLOUD_API_VERSION", "api2"),
"CLOUD_EXCEL_LIBRARY": os.getenv("CLOUD_EXCEL_LIBRARY", "exports"),
"CLOUD_USER": os.getenv("CLOUD_USER", "username"),
"CLOUD_PASS": os.getenv("CLOUD_PASS", "password"),
"CLOUD_EXCEL_TEMPLATE_FILENAME": os.getenv("CLOUD_EXCEL_TEMPLATE_FILENAME", "oms_eksportmal.xls"),
"TMP_STORAGE_DIR": os.getenv("TMP_STORAGE_DIR", "/home/heno/tmp"),
}
)
def init_db(self):
try:
self.engine = create_engine(f"mysql+mysqldb://{self.valves.DB_USER}:{self.valves.DB_PASSWORD}@{self.valves.DB_HOST}:{self.valves.DB_PORT}/{self.valves.DB_DATABASE}")
print(f"Connection to MariaDB database {self.valves.DB_DATABASE} on host {self.valves.DB_HOST} established")
except Exception as e:
print(f"Error connecting to MariaDB: {e}")
return self.engine
async def on_startup(self):
self.init_db()
await self.sea_login()
await self.sea_check_library()
await self.sea_link()
async def on_shutdown(self):
pass
async def sea_login(self):
try:
headers= {
'Content-Type': 'application/x-www-form-urlencoded'
}
data = urlencode({'username': self.valves.CLOUD_USER, 'password': self.valves.CLOUD_PASS})
response = requests.post(
f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/auth-token/",
data=data,
headers=headers
)
try:
data = response.json()
except json.JSONDecodeError:
logging.error("Failed to decode json response connecting to the cloud")
return False
if 'non_field_errors' in data:
if data['non_field_errors'][0] == 'Unable to login with provided credentials.':
logging.error(f"Invalid username or password for user {self.valves.CLOUD_USER} on seafile host {self.valves.CLOUD_HOST} provided. Could not get auth token")
return False
elif 'token' in data:
self.token = data['token']
print(f"Login to seafile cloud host {self.valves.CLOUD_HOST} successful")
return True
else:
logging.error("Unexpected response from seafile server")
return False
except requests.RequestException as e:
logging.error(f"Unable to connect to seafile cloud {self.valves.CLOUD_HOST}. Error: {e}")
return False
async def sea_check_library(self) -> bool:
try:
headers = {
'Authorization': f"Token {self.token}",
'Accept': 'application/json; indent=4'
}
params = {
'type': 'mine'
}
response = requests.get(
f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/repos/",
headers=headers,
params=params
)
res = response.json()
library_exists = False
library_id = None
writable = True
for library in res:
if library['name'] == self.valves.CLOUD_EXCEL_LIBRARY:
library_exists = True
library_id = library['id']
if library.get('encrypted') or library.get('permission') == 'r':
writable = False
if not writable:
logging.error(f"The library {self.valves.CLOUD_EXCEL_LIBRARY} exists but is is not writable. Log in to {self.valves.CLOUD_HOST} from your browser and check it's permission settings")
return False
if not library_exists:
make_library_response = requests.post(
f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/repos/",
headers=headers,
json={
'name': self.valves.CLOUD_EXCEL_LIBRARY
}
)
if make_library_response.status_code == 400:
logging.error(f"Could not create neccessary library {self.valves.CLOUD_EXCEL_LIBRARY} on your {self.valves.CLOUD_HOST} account. Contact a system administrator")
return False
elif make_library_response.status_code == 520:
logging.error(f"Could not create neccessary library {self.valves.CLOUD_EXCEL_LIBRARY} on your {self.valves.CLOUD_HOST} account. Contact a system administrator")
return False
self.library_id = make_library_response.json()['repo_id']
print(f"The library {self.valves.CLOUD_EXCEL_LIBRARY} was created", fg='yellow')
return True
self.library_id = library_id
print(f"The library {self.valves.CLOUD_EXCEL_LIBRARY} exists, no need to create")
return True
except requests.RequestException as e:
logging.error(f"Unable to connect to cloud {self.valves.CLOUD_HOST}. Error: {e}")
return False
async def sea_link(self) -> bool:
try:
response = requests.get(
f"https://{self.valves.CLOUD_HOST}/{self.valves.CLOUD_API_VERSION}/repos/{self.library_id}/upload-link/?p=/",
headers={'Authorization': 'Token {token}'.format(token=self.token)}
)
if response.status_code == 403:
logging.error(f"Lacking permissions to upload to library '{self.library_id}'. Doublecheck permissions by logging into https://{self.valves.CLOUD_HOST} and go to the library to check it's settings")
return False
elif response.status_code == 500:
logging.error(f"Could not get permission to upload. Check that you have enough storage permissions left on your {self.valves.CLOUD_HOST} account, and if not try to delete some old files first.")
return False
self.upload_link = response.json()
print(f"Recieved go ahead for upload on {self.valves.CLOUD_HOST}")
return True
except requests.RequestException as e:
logging.error(f"Unable to connect to cloud {self.valves.CLOUD_HOST}. Error: {e}")
return False
def sea_upload(self, path: str, file_name: str) -> bool:
if not os.path.isfile(path):
print(f"Cannot read and upload '{path}'")
return False
try:
requests.post(
self.upload_link, data={'filename': file_name, 'parent_dir': '/'},
files={'file': open(path, 'rb')},
headers={'Authorization': 'Token {token}'.format(token=self.token)}
)
print(f"Excel file '{file_name}' successfully uploaded and stored to your {self.valves.CLOUD_HOST}")
return True
except requests.RequestException as e:
logging.error(f"Unable to upload file to {self.valves.CLOUD_HOST}. Error: {e}")
return False
def sea_share(self, file: str):
try:
headers = {
'Authorization': f'Token {self.token}',
'Content-Type': 'application/json',
'Accept': 'application/json; indent=4'
}
json_data = {
'repo_id': self.library_id,
'path': f'/{file}',
'permissions': {
'can_edit': False,
'can_download': True
},
'expire_days': 1
}
response = requests.post(
f"https://{self.valves.CLOUD_HOST}/api/v2.1/share-links/",
json=json_data,
headers=headers
)
res = response.json()
if response.status_code < 300:
print(f"Excel file '{file}' successfully shared: {res['link']}")
return res["link"]
else:
return False
except requests.RequestException as e:
logging.error(f"Unable to share {file}. Error: {e}")
return False
def run_llm_query(self, message: str):
try:
response = requests.post(
url = self.valves.OR_URL,
headers = {
"Authorization": f"Bearer {self.valves.OR_KEY}"
},
data = json.dumps({
"model": self.valves.OR_MODEL,
"messages": [
{
"role": "user",
"content": message
}
]
})
)
if response.status_code == 200:
response_data = response.json()
sql_statement = response_data.get("choices", [{}])[0].get("message", {}).get("content")
if sql_statement:
return {"success": True, "data": sql_statement}
else:
logging.error("Response did not contain SQL statement.")
return {"success": False, "data": "No SQL statement in response."}
else:
logging.error(f"Error response {response.status_code}: {response.text}")
return {"success": False, "data": f"Error: {response.status_code}"}
except requests.HTTPError as e:
logging.error(f"Clientresponse error: {e}")
return {"success": False, "data": "HTTP backend error"}
except Exception as e:
logging.error(f"Unexpected error: {e}")
return {"success": False, "data": f"Unexpected error: {e}"}
def run_mysql_query(self, query: str):
try:
with self.engine.connect() as connection:
result = connection.execute(text(query))
rows = result.fetchall()
return rows
except Exception as e:
return {"error": str(e)}
def give_excel_book_description(self, message: str, query: str, result: str):
llm_instructions = f"""
Based on the input question, MySQL query and query result, give a nice text summary in NORWEGIAN
of the data you have extracted from the database. The text will be used in a Excel spreadsheet export
Input question {message}
MySQLQuery: {query}
Query result: {result}
"""
return llm_instructions
def generate_table_headers(self, message: str, query: str, result: str):
llm_instructions = f"""
Based on the input question, MySQL query and query result, give the columns in Query result the appropriate
header names translated to NORWEGIAN.
IMPORTANT: Return a python list formatted as string with ONLY the column header names, not the resut of the query result.
DO NOT encapsulate your reply in markdown or any other formatting beside the python list formatted as a string.
Example of valid string to generate: ["ID", "Navn", "Epostadresse", "Dato opprettet"]
Input question {message}
MySQLQuery: {query}
Query result: {result}
Python list formatted string of table header names for columns as they appear in "Query result":
"""
return llm_instructions
def reformat_data(self, message: str, query: str, result: str):
llm_reformat_instructions = f"""
Given an input question, create a syntactically correct mysql query to run. You have 4 tables to work with:
1. users - the users table holds the user records and contain the columns id, name, email, created_at, updated_at
2. club_memberships - the user membership table containing the users club memberships and contain the columns id,
user_id, valid_from, valid_to, renew, cancelled, credited, price, payment_method, created_at, updated_at. The
valid_from and valid_to columns in the club_memberships table is a varchar with a date in the format 'dd.mm.YYYY'
and contains a timespan (from and to) when the membership is valid. The columns cancelled and credited are boolean
columns and if any of these are true, the membership is not valid. Cancelled means that their membership is
cancelled and credited means that the membership is refunded (and cancelled).
3. stripe_transactions - all the transactions of the users that has been processed by stripe. The table contains
the columns id, user_id, event_signup_id, processed, amount, phone_number, created_at and updated_at. The columns
processed is a boolean value and if this is set to false, the stripe_transaction is not valid (the user has not
been charged.) Amount is a double (8,2) containing the amount the user has been charged, phone_number contains
the users phone number. If the column event_signup_id is not null, the transaction does not pertain to a membership
payment
4. vipps_transactions - all the transactions of the users that has been processed by vipps. The table contains
the columns id, user_id, event_signup_id, processed, amount, phone_number, created_at and updated_at. The columns
are identical to stripe_transaction columns.
Always run queries with INNER JOIN on users.id and the other tables user_id in order to get which user that
belongs to the other tables. You should use DISTINCT statements to avoid returning duplicates wherever possible.
Pay attention to use only the column names that I have provided. Currency for the amount columns will always be
in NORWEGIAN (NOK). The input question can be in english or norwegian. Always reply in norwegian if the input
question is in norwegian or english if the input question is in english.
Input question: {message}
MySQLQuery: {query}
Query result: {result}
Excellent. We now have the result from the query. As you can see, the "Query result:" has been formatted in a Python
list of tuples. Take this string and reformat it in markdown. Please translate all the column names to Norwegian.
"""
return llm_reformat_instructions
def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]:
llm_initial_instructions = f"""
Given an input question, create a syntactically correct mysql query to run. You have 4 tables to work with:
1. users - the users table holds the user records and contain the columns id, name, email, created_at, updated_at
2. club_memberships - the user membership table containing the users club memberships and contain the columns id,
user_id, valid_from, valid_to, renew, cancelled, credited, price, payment_method, created_at, updated_at. The
valid_from and valid_to columns in the club_memberships table is a varchar with a date in the format 'dd.mm.YYYY'
and contains a timespan (from and to) when the membership is valid. The columns cancelled and credited are boolean
columns and if any of these are true, the membership is not valid. Cancelled means that their membership is
cancelled and credited means that the membership is refunded (and cancelled).
3. stripe_transactions - all the transactions of the users that has been processed by stripe. The table contains
the columns id, user_id, event_signup_id, processed, amount, phone_number, created_at and updated_at. The columns
processed is a boolean value and if this is set to false, the stripe_transaction is not valid (the user has not
been charged.) Amount is a double (8,2) containing the amount the user has been charged, phone_number contains
the users phone number. If the column event_signup_id is not null, the transaction does not pertain to a membership
payment
4. vipps_transactions - all the transactions of the users that has been processed by vipps. The table contains
the columns id, user_id, event_signup_id, processed, amount, phone_number, created_at and updated_at. The columns
are identical to stripe_transaction columns.
Always run queries with INNER JOIN on users.id and the other tables user_id in order to get which user that
belongs to the other tables. You should use DISTINCT statements to avoid returning duplicates wherever possible.
Pay attention to use only the column names that I have provided. Currency for the amount columns will always be
in NORWEGIAN (NOK). The input question can be in english or norwegian. Always reply in norwegian if the input
question is in norwegian or english if the input question is in english.
IMPORTANT: ONLY respond with a syntactically correct mysql query. DO NOT respond with a description of what
you have done. ONLY reply with the SQL query. DO NOT format the query with markup. ONLY provide a string
containing the SQL query you assemble from the Input question. DO NOT translate any of the column names
in the table for the query.
Input question: {user_message}
MySQLQuery:
"""
# Vi sjekker om body['stream'] er sann som indikerer at pipelinen kjører brukerens forespørsel
if body['stream']:
# Vi kjører første inferens med instruksjonene i llm_initial_instructions for å fortelle
# KI modellen at vi ønsker en SQL spørring til svar basert på user_message (det brukeren
# skrev/spurte i chatten)
initial = self.run_llm_query(llm_initial_instructions)
# Hvis inferensen er vellykket, fortsett til neste trinn
if initial["success"]:
# Query inneholder SQL spørringen som er generert av KI modellen
query = initial["data"]
# Vi kjører SQL spørrringen og lagrer resultatet og rådataene i en python toupple datasett
query_result = self.run_mysql_query(query)
# Instruer KI om å generere en beskrivelse for regnearket, samt kolonnenavn
instruction_result = self.run_llm_query(
self.give_excel_book_description(user_message, query, str(query_result))
)
reply_description = instruction_result["data"]
instruction_result = self.run_llm_query(
self.generate_table_headers(user_message, query, str(query_result))
)
header_names = instruction_result["data"]
# Legg til dataene i excel arket
original_file_path = f"{self.valves.TMP_STORAGE_DIR}/{self.valves.CLOUD_EXCEL_TEMPLATE_FILENAME}"
excel_start_row = 4
wb = load_workbook(original_file_path)
ws = wb.active
# Kopier logoen til ØMS til det nye regnearket
img = Image(f"{self.valves.TMP_STORAGE_DIR}/oms-glow.png")
img.height = 145
img.width = 139
img.anchor = 'A1'
ws.add_image(img)
# Sett overskrift
headline = "Østfold Milsim KI dataeksport"
ws.merge_cells('B1:E1')
cell = ws.cell(row=1, column=2, value=headline)
cell.alignment = Alignment(horizontal='left', vertical='top', wrap_text=True)
cell.font = Font(size=22, bold=True)
# Siden reply_description er en forhåndsvis stort volum med tekst, juster stilen og egenskapene til cellen
ws.merge_cells('A2:E2')
cell = ws.cell(row=2,column=1, value=reply_description)
cell.alignment = Alignment(horizontal='left', vertical='top', wrap_text=True)
cell.font = Font(size=10)
ws.row_dimensions[2].height = 300
ws.row_dimensions[1].height = 145
header_list = ast.literal_eval(header_names)
for i, row_data in enumerate(header_list, start=1):
cell = ws.cell(row=3, column=i, value=row_data)
cell.font = Font(bold=True)
for i, row_data in enumerate(query_result, start=excel_start_row):
for j, value in enumerate(row_data, start=1): # Column A corresponds to index 1
ws.cell(row=i, column=j, value=value)
# Lagre og last opp excel arket til skyen
timestamp = int(time.time() * 1000) # Datotidsstempel i UNIX tid med millisekunder for å gi filenavnet et unikt navn
new_file_name = f"ny_eksport_{timestamp}.xlsx"
new_file_path = f'{self.valves.TMP_STORAGE_DIR}/{new_file_name}'
wb.save(new_file_path)
self.sea_upload(new_file_path, new_file_name)
# Instruer skyen om å opprette en deling av filen
share_file = self.sea_share(new_file_name)
# Hvis det oppstod en syntaksfeil eller annen feil med spørringen, returner feilen i chatten
if isinstance(query_result, dict) and "error" in query_result:
return f"Error occurred: {query_result['error']}. Initial data: {initial['data']}"
# formatted vil inneholde andre trinns instruksjoner for KI modellen, inkludert:
# 1. user_message - brukerens innledende forespørsel til KI modellen
# 2. query - SQL spørringen KI modellen genererte i første inferens trinn
# 3. query_result - Rådata fra databasen
formatted = self.reformat_data(user_message, query, str(query_result))
# Vi kjører andre inferens med andre trinns instruksjoner for KI modellen
formatted_result = self.run_llm_query(formatted)
# Data vil inneholde KI modellens svar fra andre inferens trinn
data = formatted_result["data"]
# Legg ved linken til excel arket i chatten
if share_file:
data += f"\n\n## Eksport:\nI tilfelle resultatet fra spørringen overgår begrensningene i antall tokens i svaret, har jeg eksportert dataene du spurte om i et eget excel ark hvor du kan se hele datasettet. Du kan laste ned Excel arket her:\n1. [Regneark delelink på outlands.no]({share_file})"
# Hvis resultatet er vellykket returner andre inferens trinn svar i chatten
if formatted_result["success"]:
return data
return f"Error occured: {data}"
# Hvis første trinn av inferensen ikke var velykket, returner feilmelding i chatten
else:
data = initial["data"]
return f"Error occured: {data}"
# Hvis body['stream'] ikke er sann, indikerer det at open webui kjører inferens for å generere
# tittel for chatten, eller autocomplete. I så tilfelle vil vi bare kjøre inferens på
# brukerens melding og ikke selve databaseforespørselen
else:
response = self.run_llm_query(user_message)
return response["data"]