Introduction
You possibly can simply create a easy utility that may chat with SQL Database. However right here’s the issue with that. You possibly can’t make it work seamlessly on the subject of dealing with and dealing with massive databases. If the database is big, it’s impractical to incorporate the whole record of columns and tables within the immediate context. This text explains the way to bypass this impediment by creating an AI utility that may chat with large SQL databases.
Creating an AI Utility to Chat with Huge SQL Databases
The next code initiates a easy Streamlit utility that permits customers to connect with an SQL database and chat with it.
import streamlit as st
import requests
import os
import pandas as pd
from uuid import uuid4
import psycopg2
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI, AzureOpenAI
from langchain.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
# Create crucial folders
folders_to_create = ['csvs']
for folder_name in folders_to_create:
if not os.path.exists(folder_name):
os.makedirs(folder_name)
print(f"Folder '{folder_name}' created.")
else:
print(f"Folder '{folder_name}' already exists.")
# Load the OpenAI API key
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
llm = OpenAI(openai_api_key=openai_api_key)
chat_llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
def get_basic_table_details(cursor):
question = """
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_name IN (
SELECT tablename FROM pg_tables WHERE schemaname="public"
);"""
cursor.execute(question)
return cursor.fetchall()
def save_db_details(db_uri):
unique_id = str(uuid4()).change("-", "_")
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
tables_and_columns = get_basic_table_details(cursor)
df = pd.DataFrame(tables_and_columns, columns=['table_name', 'column_name', 'data_type'])
filename_t = f'csvs/tables_{unique_id}.csv'
df.to_csv(filename_t, index=False)
cursor.shut()
connection.shut()
return unique_id
def generate_template_for_sql(question, table_info, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(content=f"You are an assistant that can write SQL Queries. Given the text below, write a SQL query that answers the user's question. DB connection string is {db_uri} Here is a detailed description of the table(s): {table_info} Prepend and append the SQL query with three backticks '```'"),
HumanMessagePromptTemplate.from_template("{text}")
])
reply = chat_llm(template.format_messages(textual content=question))
return reply.content material
def get_the_output_from_llm(question, unique_id, db_uri):
filename_t = f'csvs/tables_{unique_id}.csv'
df = pd.read_csv(filename_t)
table_info = ''
for desk in df['table_name'].distinctive():
table_info += f'Details about desk {desk}:n'
table_info += df[df['table_name'] == desk].to_string(index=False) + 'nn'
return generate_template_for_sql(question, table_info, db_uri)
def execute_the_solution(answer, db_uri):
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
_, final_query, _ = answer.cut up("```")
cursor.execute(final_query.strip())
outcome = cursor.fetchall()
return str(outcome)
def connect_with_db(uri):
st.session_state.db_uri = uri
st.session_state.unique_id = save_db_details(uri)
return {"message": "Database connection established!"}
def send_message(message):
answer = get_the_output_from_llm(message, st.session_state.unique_id, st.session_state.db_uri)
outcome = execute_the_solution(answer, st.session_state.db_uri)
return {"message": answer + "nn" + "Outcome:n" + outcome}
# Streamlit interface setup
st.subheader("Directions")
st.markdown("1. Enter your RDS Database URI beneath.n2. ")
The elemental technique for simplifying the immediate entails sending solely the related tables and column names that pertain to the person’s question. To attain this, we are able to generate embeddings for the desk and column names, dynamically retrieve essentially the most pertinent ones primarily based on the person’s enter, and embody these within the immediate. On this article, we’ll make the most of ChromaDB as our vector database. Nevertheless, options like Pinecone, Milvus, or any appropriate vector database can be utilized.
How you can Simplify the Immediate
Let’s start by putting in ChromaDB.
pip set up chromadb
First, we’ll arrange an extra folder named ‘vectors’ alongside the ‘csvs’ folder to retailer embeddings of desk and column names. This folder also can include different pertinent database particulars, such because the overseas keys that hyperlink totally different tables, and potential values for the WHERE clause.
def generate_embeddings(filename, storage_folder):
csv_loader = CSVLoader(file_path=filename, encoding="utf8")
dataset = csv_loader.load()
vector_database = Chroma.from_documents(dataset, embedding=embeddings, persist_directory=storage_folder)
vector_database.persist()
We may even first verify whether or not the person’s question wants any details about tables or if the person is as an alternative asking about simply the overall schema of the database.
def check_user_intent_for_database_info_or_sql(question):
# Outline a template for the dialog
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
"Based on the provided text, the user is asking a question about databases. "
"Determine if the user seeks information about the database schema or if they want to write a SQL query. "
"Respond with 'yes' if the user is seeking information about the database schema and 'no' if they intend to write a SQL query."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
# Generate a response utilizing a language mannequin
response = chat_llm(prompt_template.format_messages(textual content=question))
print(response.content material)
return response.content material
The person responds with both ‘sure’ or ‘no’. If the reply is ‘sure’, a immediate is generated.
def generate_sql_query_prompt(question, db_uri):
# Configure the chat template with system and human messages
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
"As an assistant tasked with writing SQL queries, create a SQL query based on the text below. "
"Enclose the SQL query within three backticks '```' for clarity. "
"Aim to use 'SELECT' queries as much as possible. "
f"The connection string for the database is {db_uri}."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
# Generate and print the reply utilizing a language mannequin
response = chat_llm.prompt_template.format_messages(textual content=question))
print(response.content material)
return response.content material
If the reply is ‘no’, it signifies that the person’s question particularly requires the names of tables and columns inside these tables. We are going to then establish essentially the most related tables & columns, and assemble a string from these to incorporate in our immediate.
Subsequent, we’ll confirm that our vectors have been efficiently created and that each one different elements are functioning appropriately. Under is the whole code up thus far.
import streamlit as st
import requests
import os
import pandas as pd
from uuid import uuid4
import psycopg2
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI, AzureOpenAI
from langchain.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
from langchain.vectorstores import Chroma
from langchain.document_loaders.csv_loader import CSVLoader
# Create crucial folders for information storage
folders_to_create = ['csvs', 'vectors']
for folder_name in folders_to_create:
if not os.path.exists(folder_name):
os.makedirs(folder_name)
print(f"Folder '{folder_name}' created.")
else:
print(f"Folder '{folder_name}' already exists.")
# Load API key from surroundings variable
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
# Initialize language fashions and embeddings
llm = OpenAI(openai_api_key=openai_api_key)
chat_llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
# Perform to retrieve fundamental desk particulars from the database
def get_basic_table_details(cursor):
cursor.execute("""
SELECT c.table_name, c.column_name, c.data_type
FROM information_schema.columns c
WHERE c.table_name IN (
SELECT tablename
FROM pg_tables
WHERE schemaname="public"
);""")
return cursor.fetchall()
# Perform to create vector databases from CSV recordsdata
def create_vectors(filename, persist_directory):
loader = CSVLoader(file_path=filename, encoding="utf8")
information = loader.load()
vectordb = Chroma.from_documents(information, embedding=embeddings, persist_directory=persist_directory)
vectordb.persist()
# Perform to save lots of database particulars and generate vectors
def save_db_details(db_uri):
unique_id = str(uuid4()).change("-", "_")
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
tables_and_columns = get_basic_table_details(cursor)
df = pd.DataFrame(tables_and_columns, columns=['table_name', 'column_name', 'data_type'])
filename_t="csvs/tables_" + unique_id + '.csv'
df.to_csv(filename_t, index=False)
create_vectors(filename_t, "./vectors/tables_"+ unique_id)
cursor.shut()
connection.shut()
return unique_id
# Perform to generate SQL question templates
def generate_template_for_sql(question, table_info, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"You are an assistant that can write SQL Queries. Given the text below, write a SQL query that answers the user's question. DB connection string is {db_uri}. Here is a detailed description of the table(s): {table_info} Prepend and append the SQL query with three backticks '```'"
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
return chat_llm(template.format_messages(textual content=question)).content material
# Perform to find out if person's question is about basic schema info or SQL
def check_if_users_query_want_general_schema_information_or_sql(question):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"In the given text, the user is asking a question about the database. Determine whether the user wants information about the database schema or wants to write a SQL query. Answer 'yes' for schema information and 'no' for SQL query."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
reply = chat_llm(template.format_messages(textual content=question))
print(reply.content material)
return reply.content material
# Perform to immediate when person desires basic database info
def prompt_when_user_want_general_db_information(question, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
"You are an assistant who writes SQL queries. Given the text below, write a SQL query that answers the user's question. Prepend and append the SQL query with three backticks '```' Write select query whenever possible Connection string to this database is {db_uri}"
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
reply = chat_llm(template.format_messages(textual content=question))
print(reply.content material)
return reply.content material
# Perform to course of person queries and generate outputs primarily based on whether or not it is about basic schema or particular SQL question
def get_the_output_from_llm(question, unique_id, db_uri):
filename_t="csvs/tables_" + unique_id + '.csv'
df = pd.read_csv(filename_t)
table_info = ''
for desk in df['table_name'].distinctive():
table_info += 'Details about desk ' + desk + ':n'
table_info += df[df['table_name'] == desk].to_string(index=False) + 'nnn'
answer_to_question_general_schema = check_if_users_query_want_general_schema_information_or_sql(question)
if answer_to_question_general_schema == "sure":
return prompt_when_user_want_general_db_information(question, db_uri)
return generate_template_for_sql(question, table_info, db_uri)
# Perform to execute SQL options
def execute_the_solution(answer, db_uri):
connection = psycopg2.join(db_uri)
cursor = connection.cursor()
_, final_query, _ = answer.cut up("```")
final_query = final_query.strip('sql')
cursor.execute(final_query)
outcome = cursor.fetchall()
return str(outcome)
# Streamlit app setup and interplay dealing with
if __name__ == "__main__":
st.subheader("Directions")
st.markdown("""
1. Enter the URI of your RDS Database within the textual content field beneath.
2. Click on the **Begin Chat** button to begin the chat.
3. Enter your message within the textual content field beneath and press **Enter** to ship the message to the API.
""")
chat_history = []
uri = st.text_input("Enter the RDS Database URI")
if st.button("Begin Chat"):
if not uri:
st.warning("Please enter a sound database URI.")
else:
st.information("Connecting to the API and beginning the chat...")
chat_response = connect_with_db(uri)
if "error" in chat_response:
st.error("Error: Failed to begin the chat. Please verify the URI and check out once more.")
else:
st.success("Chat began efficiently!")
st.subheader("Chat with the API")
if "messages" not in st.session_state:
st.session_state.messages = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if immediate := st.chat_input("What's up?"):
st.chat_message("person").markdown(immediate)
st.session_state.messages.append({"position": "person", "content material": immediate})
response = send_message(immediate)["message"]
with st.chat_message("assistant"):
st.markdown(response)
st.session_state.messages.append({"position": "assistant", "content material": response})
st.write("It is a easy Streamlit app for beginning a chat with an RDS Database.")
Within the subsequent step, we’ll carry out vector retrieval to establish essentially the most related tables. As soon as these tables are chosen, we’ll collect all of the column particulars to supply further context for our immediate. We are going to then compile this info right into a string to incorporate within the immediate.
# Initialize the vector database for storing desk embeddings
vectordb = Chroma(embedding_function=embeddings, persist_directory=f"./vectors/tables_{unique_id}")
retriever = vectordb.as_retriever()
docs = retriever.get_relevant_documents(question)
print(docs)
# Accumulating the related tables and their columns
relevant_tables = []
relevant_table_details = []
for doc in docs:
table_info = doc.page_content.cut up("n")
table_name = table_info[0].cut up(":")[1].strip()
column_name = table_info[1].cut up(":")[1].strip()
data_type = table_info[2].cut up(":")[1].strip()
relevant_tables.append(table_name)
relevant_table_details.append((table_name, column_name, data_type))
# Load information about all tables from a CSV file
filename_t = f'csvs/tables_{unique_id}.csv'
df = pd.read_csv(filename_t)
# Assemble a descriptive string for every related desk, together with all columns and their information sorts
table_info = ''
for desk in relevant_tables:
table_info += f'Details about desk {desk}:n'
table_info += df[df['table_name'] == desk].to_string(index=False) + 'nnn'
def create_sql_query_template(question, relevant_tables, table_info):
tables_list = ",".be a part of(relevant_tables)
chat_template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"As an assistant capable of composing SQL queries, please write a query that resolves the user's inquiry based on the text provided. "
f"Consider SQL tables named '{tables_list}'. "
f"Below is a detailed description of these table(s): "
f"{table_info}"
"Enclose the SQL query within three backticks '```' for proper formatting."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
response = chat_llm(chat_template.format_messages(textual content=question))
print(response.content material)
return response.content material
One remaining factor we are able to do is to offer details about overseas keys to the immediate.
import streamlit as st
import os
import pandas as pd
from uuid import uuid4
import psycopg2
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
from langchain.vectorstores import Chroma
from langchain.document_loaders.csv_loader import CSVLoader
# Guarantee crucial directories exist
folders_to_create = ['csvs', 'vectors']
for folder in folders_to_create:
os.makedirs(folder, exist_ok=True)
print(f"Listing '{folder}' checked or created.")
# Load surroundings and API keys
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
# Initialize language fashions and embeddings
language_model = OpenAI(openai_api_key=openai_api_key)
chat_language_model = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
def fetch_table_details(cursor):
sql = """
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema="public";
"""
cursor.execute(sql)
return cursor.fetchall()
def fetch_foreign_key_details(cursor):
sql = """
SELECT conrelid::regclass AS table_name, conname AS foreign_key,
pg_get_constraintdef(oid) AS constraint_definition
FROM pg_constraint
WHERE contype="f" AND connamespace="public"::regnamespace;
"""
cursor.execute(sql)
return cursor.fetchall()
def create_vector_database(information, listing):
loader = CSVLoader(information=information, encoding="utf8")
document_data = loader.load()
vector_db = Chroma(embeddings, persist_directory=listing)
vector_db.from_documents(document_data)
vector_db.persist()
def save_database_details(uri):
unique_id = str(uuid4()).change("-", "_")
conn = psycopg2.join(uri)
cur = conn.cursor()
particulars = fetch_table_details(cur)
df = pd.DataFrame(particulars, columns=['table_name', 'column_name', 'data_type'])
csv_path = f'csvs/tables_{unique_id}.csv'
df.to_csv(csv_path, index=False)
create_vector_database(df, f"./vectors/tables_{unique_id}")
foreign_keys = fetch_foreign_key_details(cur)
fk_df = pd.DataFrame(foreign_keys, columns=['table_name', 'foreign_key', 'constraint_definition'])
fk_csv_path = f'csvs/foreign_keys_{unique_id}.csv'
fk_df.to_csv(fk_csv_path, index=False)
cur.shut()
conn.shut()
return unique_id
def generate_sql_query_template(question, db_uri):
template = ChatPromptTemplate.from_messages([
SystemMessage(
content=(
f"You are an assistant capable of composing SQL queries. Use the details provided to write a relevant SQL query for the question below. DB connection string is {db_uri}."
"Enclose the SQL query with three backticks '```'."
)
),
HumanMessagePromptTemplate.from_template("{text}"),
])
response = chat_language_model(template.format_messages(textual content=question))
return response.content material
# Streamlit utility setup
st.title("Database Interplay Instrument")
uri = st.text_input("Enter the RDS Database URI")
if st.button("Hook up with Database"):
if uri:
attempt:
unique_id = save_database_details(uri)
st.success(f"Related to database and information saved with ID: {unique_id}")
besides Exception as e:
st.error(f"Failed to attach: {str(e)}")
else:
st.warning("Please enter a sound database URI.")
In comparable methods, we are able to hold enhancing this utility by including fallbacks. In every fallback, we are able to hold including further info.
Conclusion
This text presents a novel strategy to creating an AI utility able to seamlessly interacting with large SQL databases by way of chat. We’ve got addressed the problem of dealing with massive databases the place it’s impractical to incorporate the whole record of columns and tables within the immediate. Our proposed answer dynamically retrieves related desk and column names primarily based on person queries. We make sure the immediate consists of solely pertinent info, enhancing person expertise and effectivity. That is carried out by leveraging vector databases like ChromaDB for embedding technology and retrieval.
We’ve got demonstrated the way to streamline the interplay course of by way of step-by-step implementation and code examples. In the meantime, we additionally labored on constantly enhancing the appliance’s performance. With additional enhancements comparable to incorporating overseas keys and extra fallbacks, this utility holds promise for numerous database interplay eventualities.