Hey folks! 🚀
Now you understand how RAG works, vector embeddings, and how to build a RAG pipeline. Everything is working fine, but there’s one fundamental problem — the product doesn’t really feel “production-ready.”
Both phases are working well:
✅ Indexing phase
✅ Retrieval phase
But as you saw in the last video, it’s taking a lot of time to process PDFs. If another user comes and tries to process a PDF, they get blocked, and wait times increase. Right now, our system is totally synchronous, so we need to make a couple of changes for production.
Please take a look at the image below to understand how we can solve this problem 👇.
If you’re not sure what the issue is, I’ll explain it right after the image.
The Problem
Right now when you ask a question:
You send query
System processes everything (PDF search + AI)
You wait 2-3 minutes
You get answer
No one else can ask while you're waiting!
The Solution: Queue System
3 Simple Parts:
1. FastAPI Server (The Receptionist)
Takes your question
Immediately gives you a Job ID (like ticket number)
Says: "We're processing! Check back with your Job ID"
Does NOT make you wait
2. Redis Queue (The Waiting List)
Stores all questions in order
First come, first served
Holds jobs until workers are ready
3. Worker (The Chef in Kitchen)
Constantly checks the queue
Takes one job at a time
Does the actual work:
Searches PDFs
Calls OpenAI
Gets the answer
Stores result with your Job ID
Folder Structure:
install FAST API:
pip install "fastapi[standard]"
install RQ :
pip install RQ
create compose file for valky (replacement of redis but we can perform all operation like Redis )
services:
vector-db:
image: valkey/valkey
ports:
- "6379:6379"
setup Redish Queue :
create rq_client.py
from redis import Redis
from rq import Queue
queue = Queue(connection=Redis(
host='localhost',
port=6379,
))
setup worker :
worker.py
from dotenv import load_dotenv
load_dotenv()
from langchain_openai import OpenAIEmbeddings
from langchain_qdrant import QdrantVectorStore
from openai import OpenAI
openai_client = OpenAI()
embeddings = OpenAIEmbeddings(
model="text-embedding-3-large",
)
vector_store = QdrantVectorStore.from_existing_collection(
collection_name="demo_collection",
embedding=embeddings,
url="http://localhost:6333"
)
def process_query(query:str):
print(f"Processing query in workers: {query}")
search_result=vector_store.similarity_search(query=query)
context="\n\n\n".join([f"Page Content:{result.page_content}\nPage Number:{result.metadata['page_label']}\nFile Location:{result.metadata['source']}" for result in search_result ])
SYSTEM_PROMPT= f"""
You are a helpful AI Assistant who answers user_query based on the available context retrived from a PDF file along with page_contents and page number.
You Should only ans the user based on the following context and navigate the user to open the right page number to know more.
Context:
{context}
"""
response= openai_client.chat.completions.create(
model="gpt-5",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": query}
],
)
return response.choices[0].message.content
Setup server
create server.py
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI,Query
from rag_queue.queues.worker import process_query
from rag_queue.client.rq_client import queue
app= FastAPI()
@app.get("/")
def root():
return {"message": "Welcome to the RAG Queue API"}
@app.get("/chat")
def chat(
query: str= Query(..., description="The user query to process.")
):
job= queue.enqueue(process_query, query)
return { "status": "queued", "job_id": job.id }
@app.get("/job-status")
def get_result(
job_id: str= Query(..., description="The job ID to fetch the result for.")
):
job= queue.fetch_job(job_id)
result= job.return_value()
return {"status": "completed", "result": result}
create main.py
from .server import app
import uvicorn
def main():
uvicorn.run(app, host="localhost", port=8000)
main()
Start Server: Create muliple job at same time
Start multiple Worker :
rq worker --worker-class rq.worker.SimpleWorker
🎉 CONGRATULATIONS! 🚀
You've successfully built a production-ready RAG system with queue processing!
🏆 What You've Accomplished:
✅ Architecture Implemented:
🔗 FastAPI Server - Instant response receiver
📥 Valkey Queue - Job management system
⚙️ RQ Workers - Parallel processing engines
🧠 Qdrant + OpenAI - Intelligent response generation
✅ Problems Solved:
🚫 No more blocking - Multiple users can query simultaneously
⚡ Instant feedback - "Processing started!" message in milliseconds
📈 Scalability - Add more workers = faster processing
🛡️ Reliability - Failed jobs retry automatically
🔧 Your System Now Works Like:
text
👥 User 1 → 📡 FastAPI → 📥 Queue → ⚙️ Worker 1 → ✅ Result
👤 User 2 → 📡 FastAPI → 📥 Queue → ⚙️ Worker 2 → ✅ Result
👥 User 3 → 📡 FastAPI → 📥 Queue → ⚙️ Worker 3 → ✅ Result
ALL HAPPENING AT THE SAME TIME! 🎊
