2026-02-25 02:15:14
AI coding tools are great, but there's a non-zero chance one of them nukes your .git folder someday. Scheduled backups work, but I wanted something that just happens automatically — so I set up a bare repository on a separate drive that syncs on every commit via a post-commit hook. Here's how. (macOS, but Linux is identical.)
Pick somewhere outside your project — an external drive works well.
mkdir -p /path/to/backup/project.git
cd /path/to/backup/project.git
git init --bare
A bare repo stores only history, no working files. Same format GitHub uses internally.
cd ~/path/to/project
git remote add backup /path/to/backup/project.git
# Initial push (check your branch name first)
git branch --show-current
git push backup <branch-name>
# Verify
git ls-remote backup
touch .git/hooks/post-commit
chmod +x .git/hooks/post-commit
.git/hooks/post-commit:
#!/bin/sh
current_branch=$(git rev-parse --abbrev-ref HEAD)
git push backup "$current_branch" --quiet >/dev/null 2>&1 &
The & runs it in the background so commits don't feel any slower.
touch backup_test.txt
git add backup_test.txt
git commit -m "test: post-commit hook"
Then compare hashes:
git log --oneline -1
git ls-remote backup
If main points to the same hash in both, you're good. Clean up:
git rm backup_test.txt
git commit -m "chore: remove test file"
If only .git was deleted (files intact):
cd /path/to/project
git init
git remote add backup /path/to/backup/project.git
git fetch backup
git branch -r # confirm branch name
git reset --mixed backup/<branch-name>
git status # commit any remaining diff
If the whole folder is gone:
git clone /path/to/backup/project.git my_project
In both cases, re-add the post-commit hook afterward since .git/hooks/ isn't tracked by Git.
That's it. A bit of setup, but once it's running you don't think about it again.
2026-02-25 02:15:10
Hey folks, welcome to the world of Agentic Tools and DevOps.
Today, we’re diving into CI/CD pipelines and exploring how we can debug them efficiently and almost instantly using AI. In this project, we’ll build an AI-powered CI/CD Copilot where AWS Lambda serves as the core logic layer. This Lambda function will interact with the Google Gemini API to analyze pipeline failures and help us debug them intelligently.
The goal of this project is not just to integrate AI into a CI/CD workflow, but to help you understand how to build your own AI agent from scratch — one that can assist in real-world DevOps scenarios.
So, without further ado, let’s get started.
Before we begin, make sure you have the following requirements in place:
Docker & Docker Hub account
We will run parts of this project inside Docker containers. Later, we’ll push our custom image to Docker Hub, so make sure you have both Docker installed and a Docker Hub account ready.
Jenkins (Our CI/CD Tool)
We’ll use Jenkins for demonstration purposes. You can either:
Terraform
We will provision our infrastructure — including the Gemini API key (stored securely) and the AWS Lambda function — using Terraform.
Make sure:
If you’re new to Terraform setup, you can follow this guide:
👉 https://blog.praveshsudha.com/getting-started-with-terraform-a-beginners-guide#heading-step-1-install-the-aws-cli
The complete source code for this project is available in this GitHub repository:
👉 https://github.com/Pravesh-Sudha/ai-devops-agent
Navigate to the cicd-copilot directory to follow along.
If you’ve been following my work, you might recognize this project. I originally used this same Node.js Book Reader application to demonstrate how Docker works with Node.js. For this AI-powered CI/CD Copilot, I’ve made specific modifications — particularly in the Jenkinsfile and the terra-config directory.
Inside the terra-config directory, you’ll find:
main.tf – Provisions:
lambda.zip – The packaged Lambda deployment artifact (zipped lambda_function.py)
lambda_function.py – The core of this project.
This file contains the AI agent logic and the structured prompt sent to the Gemini API.
iam.tf – Defines the IAM roles and permissions required for:
The core idea behind this project is simple:
Jenkins detects a pipeline failure.
It collects contextual information (stage name, build ID, logs).
It sends that data to AWS Lambda.
Lambda calls the Gemini API.
Gemini analyzes the logs and returns structured debugging insights.
The Lambda function expects a JSON payload in the following format:
{
stage: $stage, # Name of the stage where the pipeline failed
job: $job, # Job name (e.g., cicd-copilot)
build_id: $build_id, # Build ID number (e.g., 1, 2, 3)
logs: $logs # Last 200 lines of failure logs
}
This structured input allows the AI agent to understand the pipeline context before analyzing the logs.
Inside the Lambda function, we make a POST request to the Gemini API with the following structured prompt:
You are a senior CI/CD Copilot specialized in Jenkins pipelines.
Pipeline context:
- Stage name: {stage}
- Expected outcome: Build an artifact usable by later stages
Your tasks:
1. Identify the failure category (build / runtime / config / infra / dependency / auth / unknown)
2. Identify the most likely root cause
3. Provide actionable fixes
4. Suggest a patch ONLY if clearly inferable
Respond ONLY in valid JSON with this schema:
{{
"failure_category": "",
"root_cause": "",
"actionable_fixes": [],
"suggested_patch": {{
"file": "",
"line": "",
"fix": ""
}}
}}
Logs:
{logs}
The prompt dynamically injects two key variables:
{stage} – The pipeline stage name
{logs} – The failure logs
If you’d like to explore the full Lambda implementation, you can view it here:
👉 https://github.com/Pravesh-Sudha/ai-devops-agent/blob/main/cicd-copilot/terra-config/lambda_function.py
You might be wondering — how exactly does this connect with Jenkins?
Inside the Jenkinsfile, each stage:
Sets an environment variable for the stage name.
Redirects command output (in case of failure) into a LOG_FILE.
If any stage fails:
The post { failure { ... } } block is triggered.
Jenkins constructs the JSON payload.
It invokes the AWS Lambda function.
The AI-generated failure analysis is printed directly into the Jenkins console output.
This gives you instant, structured debugging assistance right inside your CI/CD pipeline.
To replicate this approach in your own pipeline:
Append log redirection to each command:
${LOG_FILE} 2>&1
Define an environment variable for the stage name.
Provision:
* AWS Lambda
* IAM roles
* Secrets Manager (for the Gemini API key)
using Terraform.
post failure block in your Jenkinsfile to invoke the Lambda function with the structured JSON payload.Once configured, your CI/CD pipeline becomes AI-assisted — capable of analyzing its own failures and suggesting actionable fixes.
Enough with the theory — let’s see this in action.
First, head over to the GitHub repository and fork it under your own username.
You’ll be intentionally modifying the code later to trigger pipeline failures, so forking is important.
After forking:
git clone https://github.com/your-username/ai-devops-agent.git
cd ai-devops-agent/cicd-copilot/terra-config
Inside the terra-config directory, initialize Terraform:
terraform init
To provision the infrastructure, you’ll need a GEMINI_API_KEY.
Go to Google AI Studio
Log in with your Google account
Navigate to the API section
Click Create API Key
Give it a name and generate the key
Store it securely
Now, apply the Terraform configuration:
terraform apply -var="gemini_api_key=<Paste-your-key-here>" --auto-approve
⚠️ Make sure the configured AWS IAM user has the required permissions (Lambda and Secrets Manager access), as mentioned in the prerequisites section.
Once completed, your infrastructure (Lambda function + IAM roles + Secret) will be up and running.
Open your Jenkins dashboard (usually running on http://localhost:8080).
Click Create New Item
Select Pipeline
Name it: cicd-copilot
Choose Pipeline script from SCM
Configure the following:
SCM: Git
Repository URL:https://github.com/your-username/ai-devops-agent
Branch Specifier: main
Script Path:cicd-copilot/Jenkinsfile
Click Save.
Navigate to:
Manage Jenkins → Plugins
Install the following plugins:
Docker
Docker Pipeline
Docker Commons
Ensure Docker is accessible inside Jenkins.
In your terminal, run:
which docker
Copy the output path.
Now go to:
Manage Jenkins → System → Global Properties
Append the copied path to the existing PATH variable using : as a separator. Save the configuration.
Navigate to:
Manage Jenkins → Credentials
* Kind: **Username with password**
* Username: Your Docker Hub username
* Password: Your Docker Hub password
* ID: `docker-cred`
Save it.
Now go back to your cicd-copilot project and click Build Now.
Open Console Output.
You will notice that the pipeline fails — this is intentional.
The logs are automatically captured and sent to the AI Agent, which returns structured debugging analysis inside the Jenkins console.
In the first failure, the AI identifies a typo in the Dockerfile.
For example:
apine
It should be:
alpine
Fix the typo in your forked repository and commit the changes.
Rebuild the pipeline.
This time, the pipeline fails again — but for a different reason. There is a Docker image version mismatch.
The AI analysis might suggest that the image is private or unavailable. However, the real issue is in the Jenkinsfile.
Inside the Run Container stage, change the image version from:
v2
to:
v1
Commit the change and rebuild the pipeline.
Now, when you trigger the pipeline again:
The build succeeds
The Docker image is pushed to your Docker Hub account
The container starts successfully
Visit:
http://localhost:3000
You should see the Book Reader application running.
To stop the running container:
docker kill cicd-copilot
To avoid unnecessary AWS charges, destroy the infrastructure:
terraform destroy -var="gemini_api_key=<Paste-your-key-here>" --auto-approve
In this project, we built an AI-powered CI/CD Copilot using:
Jenkins for pipeline orchestration
AWS Lambda for AI agent logic
AWS Secrets Manager for secure API storage
Google Gemini API for log analysis
The agent receives contextual pipeline information and failure logs, analyzes them intelligently, and provides structured debugging insights directly inside the CI/CD workflow.
Instead of manually scanning logs, you now have an AI assistant that understands context, categorizes failures, identifies root causes, and suggests actionable fixes — making debugging faster, smarter, and more efficient.
Modern CI/CD pipelines are powerful — but when they fail, debugging can quickly become time-consuming and frustrating. In this project, we went a step further by integrating AI directly into the pipeline workflow.
By combining:
Jenkins for orchestration
AWS Lambda for serverless execution
AWS Secrets Manager for secure API handling
Google Gemini API for intelligent log analysis
we built an AI-powered CI/CD Copilot capable of understanding pipeline context, analyzing failure logs, identifying root causes, and suggesting actionable fixes — all automatically.
This isn’t just about log analysis. It’s about shifting from reactive debugging to intelligent, context-aware automation.
As AI continues to evolve, integrating agentic systems into DevOps workflows will become increasingly common. Building projects like this not only strengthens your cloud and automation skills but also prepares you for the next wave of AI-driven infrastructure.
If you found this project helpful, feel free to connect with me and follow my work:
🌐 Website: https://praveshsudha.com
📝 Blog: https://blog.praveshsudha.com
💼 LinkedIn: https://www.linkedin.com/in/pravesh-sudha
🐙 GitHub: https://github.com/Pravesh-Sudha
🐦 Twitter/X: https://x.com/praveshstwt
🎥 Youtube: https://youtube.com/@pravesh-sudha
I regularly share content on DevOps, AWS, Terraform, CI/CD, and building real-world cloud projects from scratch.
If you build your own version of this AI CI/CD Copilot, tag me — I’d love to see what you create.
Happy Building 🚀
2026-02-25 02:15:05
Imagine spending years becoming a doctor. The exams, the training, the sacrifice. And then you get there and realize half your day is just... paperwork. That is what is happening to clinicians right now and it is pushing them out of the profession.
Almost 63% of doctors are showing signs of burnout. Nurses are leaving faster than new ones are joining. This is not a small issue we can ignore. The people responsible for keeping us healthy are exhausted and the system is not doing enough about it.
Here is something most people get wrong. Doctors are not burning out because the cases are too hard. They are burning out because for every one hour they spend with a patient, they spend two hours on admin work. Notes, forms, messages, scheduling, refill approvals. It never stops.
By the time a doctor gets home, they are still mentally at work. They are thinking about the notes they did not finish and the calls they still have to return. That kind of pressure every single day wears a person down fast. And honestly, it should not be this way.
Look, AI is not going to replace doctors. But it can absolutely take the boring, repetitive, time consuming tasks off their plate. Appointment reminders, patient check-ins, after hours questions, prescription refill requests, clinical note drafts. All of this can be handled automatically.
When that happens, clinicians get real time back. Not just a few minutes but enough to actually breathe. Enough to sit with a patient a little longer. Enough to go home and actually switch off.
Think about a doctor finishing their last patient at 5pm. Without AI they still have 45 minutes of note writing ahead of them. With AI the notes are already drafted and they spend 5 minutes reviewing. Done.
A patient calls at 9pm with a basic question about their medicine. Without AI that sits in voicemail until morning and adds to an already full inbox. With AI the patient gets a clear answer right away and nobody on the team had to do a thing. Now multiply that across an entire clinic every single day. The difference is massive.
Without AI:
With AI:
When your clinical team is overwhelmed, patients feel it. Appointments get rushed. Calls go unanswered. Follow-ups do not happen on time. That erodes trust and makes the whole experience feel cold and impersonal.
When AI handles the routine stuff, patients can book appointments any time, get answers after hours, and receive proper follow-up without anyone on the team doing it manually. Better patient experience and lower staff burnout are not separate goals. They are the same goal.
This part is important. Not all AI works well in healthcare. If a patient is anxious about a diagnosis and the AI they speak to sounds robotic and cold, it makes things worse. They just hang up and call back to speak to a real person anyway.
Healthcare AI needs to communicate clearly, calmly, and in a way that makes a nervous person feel heard. When it gets that right it builds trust. And when patients trust the process, there are fewer frustrated calls for the clinical team to deal with.
VAIU.ai is building emotionally intelligent AI medical staff for modern clinics. Their platform is designed to reduce clinician burnout, improve patient trust, and streamline clinic workflows using AI that actually understands human emotion. This is not some generic AI tool someone tweaked for healthcare. It was built specifically for clinics from day one.
Their voice AI agents handle:
All through natural voice conversations that actually feel human. Clinicians get their time back and patients get a better experience every time they reach out.
Clinician burnout is not slowing down on its own. And if we do not start fixing the systems that are causing it, we are going to keep losing good doctors and nurses who simply ran out of energy.
AI is one of the most practical tools available right now to fix this. VAIU.ai is already helping clinics do exactly that. Less burnout, more trust, smoother workflows. It is not about replacing the human side of healthcare. It is about protecting it.
2026-02-25 02:13:35
Last week I shipped a cross-platform app and needed to test it on Flutter, React Native, iOS, Android, Electron, Tauri, and web. Writing separate test suites for each platform? No thanks.
Instead, I used an AI agent that could see my app and interact with it. Here is what the workflow looked like:
I used flutter-skill, an open-source MCP server that gives AI agents eyes and hands inside running apps. It connects to your app via a lightweight bridge and exposes 253 tools the AI can use.
npm install -g flutter-skill
flutter-skill init ./my-app
flutter-skill launch ./my-app
Instead of writing test code, I just described what to test:
Test the login flow - enter [email protected] and password123, tap Login, verify the Dashboard appears
The AI agent automatically:
Across 8 platforms, the AI agent completed 562 out of 567 test scenarios (99.1% pass rate). The failures were all legitimate bugs it discovered.
What surprised me most:
Use AI testing when:
Stick with traditional automation when:
flutter-skill is open source and free: github.com/ai-dashboad/flutter-skill
Works with Claude, GPT, Gemini, Cursor, Windsurf, and any MCP-compatible agent.
Would love to hear if anyone else is using AI agents for testing - what is working for you?
2026-02-25 02:12:54
SEO doesn’t always require a full marketing strategy. Sometimes small technical fixes can improve visibility quickly.
Here are five simple SEO improvements developers can implement in less than an hour.
Many websites either:
Duplicate titles
Leave default titles
Or forget meta descriptions completely
Every page should have:
A unique
tagA clear meta description
Relevant keywords naturally included
Search engines like Google use these to understand page relevance.
A common issue:
Multiple
Skipping heading levels
Using headings only for styling
Correct structure:
One
Logical
This improves crawlability and readability.
Large images slow down websites.
Quick fixes:
Convert images to WebP
Compress large files
Add descriptive alt text
Alt text improves accessibility and image search visibility.
Avoid messy URLs like:
/page?id=12345
Use readable URLs:
/seo-checklist-for-startups
Clean URLs improve:
User trust
Click-through rates
Search engine understanding
Sometimes staging settings accidentally block search engines.
Quick checks:
Make sure important pages are not set to “noindex”
Verify robots.txt is not blocking key directories
Submit your sitemap to Google Search Console
This prevents invisible SEO problems.
Final Thoughts
Developers play a huge role in SEO performance.
You don’t need advanced marketing knowledge to improve rankings. Small technical improvements can:
Increase visibility
Improve user experience
Support long-term growth
SEO is not just marketing — it starts with clean development practices.
2026-02-25 02:11:27
Production RAG systems face a silent killer: vector drift. Embeddings become stale, context degrades, and retrieval quality drops over time even when your code and infrastructure look healthy.
This article walks through a self healing vector index built on Elasticsearch that:
In a test run on a 50,000 document corpus this approach delivered:
This version of the system has been hardened for production. It now uses alias based indexes for zero downtime reindexing, has configuration validation and retry logic, ships with unit tests, and exposes a complete reference implementation you can run locally.
Reference implementation:
https://github.com/mihirphalke1/elasticsearch-self-healing-vectors
README.md in the repoYou build a nice RAG pipeline. Vector search returns semantically similar documents, your LLM answers look good, and the whole stack performs well in staging.
Six months later support tickets start to mention irrelevant answers and search that feels random.
Nothing obvious is broken:
Yet users are clearly not getting what they need. This is the silent failure mode of vector search in production.
1. Content drift
Your knowledge base changes every day. New documents are added, existing ones are edited, and some are removed. Unless you continuously reembed content, your vectors represent old versions of documents. This is especially dangerous for fast moving domains such as software documentation, medical research, and finance.
2. Semantic shift
The way users talk about concepts changes over time. New frameworks, product names, and jargon appear. User queries begin to drift away from the distribution your embedding model was trained on. Similarity scores still look high but the meaning has shifted.
3. Model staleness
The embedding model landscape moves quickly. New models from OpenAI, Cohere, and the open source ecosystem regularly outperform older generations. If you never rotate your embeddings, your retrieval quality falls behind systems that do.
Below is a simplified version of what we observed in a production documentation search system:
# Day 1
query = "How do I implement OAuth2?"
top_result = "OAuth2 Implementation Guide (2024)" # relevance: excellent
user_satisfaction = 0.95
# Day 180
query = "How do I implement OAuth2?"
top_result = "OAuth1 Migration Guide (2023)" # similarity high, relevance poor
user_satisfaction = 0.62
Similarity scores remained high, API metrics looked normal, but relevance degraded enough to hurt user satisfaction.
By the time this shows up in business metrics you have already lost trust. You need a system that can detect and repair this drift before your users notice.
The core idea is simple:
Treat your vector index as a living subsystem that monitors its own health and repairs itself when it detects degradation.
A self healing vector index should be able to:
We will build this on top of Elasticsearch, but the same principles apply to other vector databases.
Primary index (vectors_primary)
Holds document content and embeddings. In the reference implementation this is an alias that points to a concrete index such as vectors_primary_v1. This alias pattern is what enables true zero downtime reindexing.
Metadata index (vectors_metadata)
Tracks per document metadata such as content hash, embedding model, version, embedded at timestamp, last accessed time, access counts, and an importance score used for quantization decisions.
Health metrics index (vectors_health)
Stores query level metrics such as average similarity of top results, user feedback, retrieval quality scores, and counts. This index is the raw material for drift detection.
Health monitoring agent
Combines signals from the primary, metadata, and health indexes to compute a composite drift score and to decide when and how to heal the index.
This section shows simplified versions of the components from the reference implementation. The full code for each class lives in the repository.
We start with a vector store wrapper around Elasticsearch that:
from elasticsearch import Elasticsearch
from datetime import datetime
from typing import List, Dict, Optional
import hashlib
import logging
logger = logging.getLogger(__name__)
class SelfHealingVectorStore:
def __init__(
self,
es_host: str = "localhost:9200",
es_user: Optional[str] = None,
es_password: Optional[str] = None,
):
if es_host and not es_host.startswith(("http://", "https://")):
es_host = f"http://{es_host}"
if es_user and es_password:
self.es = Elasticsearch([es_host], basic_auth=(es_user, es_password))
else:
self.es = Elasticsearch([es_host])
self.primary_index = "vectors_primary"
self.metadata_index = "vectors_metadata"
self.health_index = "vectors_health"
logger.info("[OK] Connected to Elasticsearch at %s", es_host)
def create_indexes(self, vector_dims: int = 1536) -> None:
"""Create primary, metadata, and health indexes."""
primary_mapping = {
"mappings": {
"properties": {
"content": {"type": "text"},
"embedding": {
"type": "dense_vector",
"dims": vector_dims,
"index": True,
"similarity": "cosine",
},
"doc_id": {"type": "keyword"},
"created_at": {"type": "date"},
"metadata": {"type": "object", "enabled": True},
}
}
}
metadata_mapping = {
"mappings": {
"properties": {
"doc_id": {"type": "keyword"},
"content_hash": {"type": "keyword"},
"embedding_version": {"type": "keyword"},
"embedding_model": {"type": "keyword"},
"embedded_at": {"type": "date"},
"last_accessed": {"type": "date"},
"access_count": {"type": "integer"},
"importance_score": {"type": "float"},
}
}
}
health_mapping = {
"mappings": {
"properties": {
"timestamp": {"type": "date"},
"query": {"type": "text"},
"top_k_similarity_avg": {"type": "float"},
"retrieval_quality_score": {"type": "float"},
"user_feedback": {"type": "float"},
"drift_score": {"type": "float"},
"result_count": {"type": "integer"},
}
}
}
# Primary index as alias over a concrete index
if not self.es.indices.exists(index=self.primary_index):
concrete = "vectors_primary_v1"
self.es.indices.create(index=concrete, body=primary_mapping)
self.es.indices.put_alias(index=concrete, name=self.primary_index)
logger.info("[OK] Created index with alias: %s -> %s", self.primary_index, concrete)
for index_name, mapping in [
(self.metadata_index, metadata_mapping),
(self.health_index, health_mapping),
]:
if not self.es.indices.exists(index=index_name):
self.es.indices.create(index=index_name, body=mapping)
logger.info("[OK] Created index: %s", index_name)
def index_document(
self,
doc_id: str,
content: str,
embedding: List[float],
metadata: Optional[Dict] = None,
) -> None:
"""Index a document and its embedding, plus metadata."""
self.es.index(
index=self.primary_index,
id=doc_id,
body={
"doc_id": doc_id,
"content": content,
"embedding": embedding,
"created_at": datetime.now().isoformat(),
"metadata": metadata or {},
},
)
content_hash = hashlib.sha256(content.encode()).hexdigest()
self.es.index(
index=self.metadata_index,
id=doc_id,
body={
"doc_id": doc_id,
"content_hash": content_hash,
"embedding_version": "v1",
"embedding_model": "text-embedding-3-small",
"embedded_at": datetime.now().isoformat(),
"last_accessed": datetime.now().isoformat(),
"access_count": 0,
"importance_score": 0.5,
},
)
The real implementation in the repo additionally:
hybrid_search method that combines vector search and BM25get_stats for basic monitoringThe DriftDetector combines three signals:
top_k_similarity_avg
import hashlib
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List
import logging
logger = logging.getLogger(__name__)
class DriftDetector:
def __init__(self, vector_store: SelfHealingVectorStore):
self.vs = vector_store
self.baseline_similarity = None
def compute_content_hash(self, content: str) -> str:
return hashlib.sha256(content.encode()).hexdigest()
def detect_content_drift(self, doc_id: str, current_content: str) -> bool:
"""Return True if a document's content has changed since it was embedded."""
try:
result = self.vs.es.search(
index=self.vs.metadata_index,
body={"query": {"term": {"doc_id": doc_id}}, "size": 1},
)
if not result["hits"]["hits"]:
return True
stored_hash = result["hits"]["hits"][0]["_source"]["content_hash"]
current_hash = self.compute_content_hash(current_content)
has_changed = stored_hash != current_hash
if has_changed:
logger.info("[INFO] Content changed for doc: %s", doc_id)
return has_changed
except Exception as exc:
logger.error("Error detecting content drift: %s", exc)
return True
def detect_similarity_drift(self, recent_queries: int = 100) -> Dict:
"""Detect drift based on changes in average similarity scores."""
try:
result = self.vs.es.search(
index=self.vs.health_index,
body={
"size": recent_queries,
"sort": [{"timestamp": {"order": "desc"}}],
"query": {"match_all": {}},
},
)
hits = result["hits"]["hits"]
if not hits or len(hits) < 20:
return {"drift_detected": False, "drift_score": 0.0, "reason": "Insufficient data"}
similarities = [
h["_source"]["top_k_similarity_avg"]
for h in hits
if "top_k_similarity_avg" in h["_source"]
]
if len(similarities) < 20:
return {
"drift_detected": False,
"drift_score": 0.0,
"reason": "Insufficient similarity data",
}
if self.baseline_similarity is None and len(similarities) >= 50:
baseline_data = similarities[-50:]
self.baseline_similarity = float(np.mean(baseline_data))
logger.info("[INFO] Baseline similarity set to: %.3f", self.baseline_similarity)
if self.baseline_similarity is None:
return {"drift_detected": False, "drift_score": 0.0, "reason": "Baseline not established"}
current_similarity = float(np.mean(similarities[:20]))
drift_score = (self.baseline_similarity - current_similarity) / self.baseline_similarity
drift_detected = drift_score > 0.15
if drift_detected:
logger.warning("[WARN] Similarity drift detected: %.1f%% drop", drift_score * 100)
return {
"drift_detected": drift_detected,
"drift_score": float(drift_score),
"baseline_similarity": self.baseline_similarity,
"current_similarity": current_similarity,
"recommendation": "REINDEX" if drift_detected else "MONITOR",
}
except Exception as exc:
logger.error("Error detecting similarity drift: %s", exc)
return {"drift_detected": False, "drift_score": 0.0, "error": str(exc)}
def detect_temporal_drift(self, max_age_days: int = 90) -> List[str]:
"""Return document IDs whose embeddings are older than the threshold."""
try:
cutoff_date = datetime.now() - timedelta(days=max_age_days)
result = self.vs.es.search(
index=self.vs.metadata_index,
body={
"query": {"range": {"embedded_at": {"lt": cutoff_date.isoformat()}}},
"size": 10000,
"_source": ["doc_id"],
},
)
stale_docs = [h["_source"]["doc_id"] for h in result["hits"]["hits"]]
if stale_docs:
logger.info("[INFO] Found %d stale documents (>%d days)", len(stale_docs), max_age_days)
return stale_docs
except Exception as exc:
logger.error("Error detecting temporal drift: %s", exc)
return []
def comprehensive_drift_analysis(self, max_age_days: int = 90) -> Dict:
"""Combine similarity and temporal drift into a composite score."""
logger.info("[INFO] Running comprehensive drift analysis...")
similarity_drift = self.detect_similarity_drift()
stale_docs = self.detect_temporal_drift(max_age_days)
temporal_weight = 0.3
similarity_weight = 0.7
stale_ratio = len(stale_docs) / max(self.vs.get_stats()["total_documents"], 1)
temporal_score = min(stale_ratio / 0.10, 1.0)
similarity_score = similarity_drift.get("drift_score", 0.0)
composite_score = temporal_score * temporal_weight + similarity_score * similarity_weight
if composite_score > 0.35:
urgency = "HIGH"
elif composite_score > 0.20:
urgency = "MEDIUM"
else:
urgency = "LOW"
action_required = composite_score > 0.20
logger.info("[INFO] Composite Drift Score: %.3f", composite_score)
logger.info("[WARN] Urgency: %s", urgency)
logger.info("[INFO] Action Required: %s", action_required)
return {
"composite_drift_score": float(composite_score),
"similarity_drift": similarity_drift,
"stale_document_count": len(stale_docs),
"stale_documents": stale_docs[:100],
"temporal_score": float(temporal_score),
"action_required": action_required,
"urgency": urgency,
"timestamp": datetime.now().isoformat(),
}
The SmartReindexer decides which documents to reembed, calls the embedding provider, and writes updated vectors back into Elasticsearch. It also supports quantization and zero downtime reindexing.
Key ideas:
target_index parameter so full reindexing can write into a new index before swapping aliases
from typing import List, Dict, Optional
from datetime import datetime
import numpy as np
import hashlib
import time
import logging
logger = logging.getLogger(__name__)
class SmartReindexer:
def __init__(self, vector_store: SelfHealingVectorStore, embedding_function, embedding_model: str = "text-embedding-3-small"):
self.vs = vector_store
self.get_embeddings = embedding_function
self.embedding_model = embedding_model
def selective_reindex(
self,
doc_ids: List[str],
batch_size: int = 50,
use_quantization: bool = True,
target_index: Optional[str] = None,
) -> Dict:
"""Reembed only the given doc_ids."""
if not doc_ids:
logger.info("[INFO] No documents to reindex")
return {"total_requested": 0, "successfully_reindexed": 0, "failed": 0, "success_rate": 1.0}
logger.info("[INFO] Starting selective reindex of %d documents", len(doc_ids))
reindexed = 0
failed = 0
start_time = time.time()
for i in range(0, len(doc_ids), batch_size):
batch = doc_ids[i : i + batch_size]
batch_num = i // batch_size + 1
total_batches = (len(doc_ids) + batch_size - 1) // batch_size
logger.info("[INFO] Processing batch %d/%d", batch_num, total_batches)
try:
docs = self._fetch_documents(batch)
if not docs:
logger.warning("[WARN] No documents found for batch %d", batch_num)
failed += len(batch)
continue
contents = [d["content"] for d in docs]
embeddings = self.get_embeddings(contents)
if use_quantization:
embeddings = [self._quantize_embedding(e) for e in embeddings]
for doc, embedding in zip(docs, embeddings):
try:
self._update_vector(doc["doc_id"], doc["content"], embedding, target_index=target_index)
reindexed += 1
except Exception as exc:
logger.error("[ERROR] Failed to update %s: %s", doc["doc_id"], exc)
failed += 1
time.sleep(0.5)
except Exception as exc:
logger.error("[ERROR] Batch %d failed: %s", batch_num, exc)
failed += len(batch)
elapsed = time.time() - start_time
logger.info("[OK] Reindexing complete: %d/%d successful", reindexed, len(doc_ids))
logger.info("[INFO] Total time: %.2fs", elapsed)
return {
"total_requested": len(doc_ids),
"successfully_reindexed": reindexed,
"failed": failed,
"success_rate": reindexed / len(doc_ids) if doc_ids else 0,
"elapsed_time_seconds": elapsed,
}
The full implementation adds:
_get_all_doc_ids that uses search_after pagination to handle large corporazero_downtime_reindex that creates a new concrete index, reembeds all documents into it, and then atomically swaps the alias from old to newestimate_reindex_cost that estimates embedding API costs based on model choice and approximate token countsThe SelfHealingAgent orchestrates health checks and healing actions. It periodically:
health_check to compute a composite drift score and recommendationsauto_heal which uses SmartReindexer to reembed stale documentsimport logging
from typing import Dict, Optional
from datetime import datetime
import numpy as np
import schedule
import threading
import time
logger = logging.getLogger(__name__)
class SelfHealingAgent:
def __init__(self, vector_store, drift_detector, reindexer):
self.vs = vector_store
self.detector = drift_detector
self.reindexer = reindexer
self.running = False
self.monitor_thread = None
self.drift_threshold = 0.20
self.max_age_days = 90
self.auto_heal_enabled = True
def health_check(self) -> Dict:
logger.info("[INFO] Running health check...")
stats = self.vs.get_stats()
drift_analysis = self.detector.comprehensive_drift_analysis(max_age_days=self.max_age_days)
health_report = {
"timestamp": datetime.now().isoformat(),
"stats": stats,
"drift_analysis": drift_analysis,
"health_status": self._calculate_health_status(drift_analysis),
"recommendations": self._generate_recommendations(drift_analysis),
}
logger.info("[INFO] Composite Drift Score: %.3f", drift_analysis["composite_drift_score"])
logger.info("[INFO] Health Status: %s", health_report["health_status"])
return health_report
The full class also provides:
monitor_query_quality which logs per query similarity and optional user feedback into the health indexstart_monitoring and stop_monitoring to run health checks on a scheduleconfigure and get_status to adjust thresholds and inspect current configurationHere is a high level sketch of how the pieces fit together in an application:
from config import get_es_config, get_openai_api_key
from self_healing_vector_store import SelfHealingVectorStore
from drift_detector import DriftDetector
from smart_reindexer import SmartReindexer
from self_healing_agent import SelfHealingAgent
def build_system() -> SelfHealingAgent:
es_config = get_es_config()
vs = SelfHealingVectorStore(**es_config)
vs.create_indexes(vector_dims=1536)
detector = DriftDetector(vs)
reindexer = SmartReindexer(vs, embedding_function=get_openai_embeddings)
agent = SelfHealingAgent(vs, detector, reindexer)
return agent
def rag_query(agent: SelfHealingAgent, query: str, user_feedback: Optional[float] = None):
# Embed query
query_embedding = get_openai_embeddings([query])[0]
# Vector search
results = agent.vs.search(query_embedding, k=5)
# Log quality metrics for drift detection
agent.monitor_query_quality(query, results, user_feedback=user_feedback)
return results
The repository includes example_usage.py which runs a complete demo with:
The reference implementation ships with a full demo script. To run it locally:
git clone https://github.com/yourusername/elasticsearch-self-healing-vectors.git
cd elasticsearch-self-healing-vectors
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
docker run -d --name elasticsearch -p 9200:9200 \
-e "discovery.type=single-node" \
-e "xpack.security.enabled=false" \
docker.elastic.co/elasticsearch/elasticsearch:8.11.0
cp .env.example .env # Add OPENAI_API_KEY to .env
python example_usage.py
This script:
You should see log lines showing indexing, drift scores, any reindexing that takes place, and a final summary that all examples completed successfully.
The original version of this project focused on the conceptual design of self healing vector indexes. The current version incorporates several important production grade improvements.
The primary index now uses an alias pattern:
vectors_primary is an aliasvectors_primary_v1, vectors_primary_v2, and so on are concrete indexesThe zero_downtime_reindex method in SmartReindexer:
selective_reindex with target_index
This allows you to rebuild the entire vector index without any downtime for queries that target the alias.
The helper that collects all document IDs now uses search_after pagination sorted by _id. This makes full reindexing robust for indexes with more than 10,000 documents and avoids hitting the default result window limits in Elasticsearch.
A new config.py module provides:
get_es_config which reads ES_HOST, ES_USER, and ES_PASSWORD from the environment and enforces consistent authentication settingsget_openai_api_key which validates that OPENAI_API_KEY is set and raises a clear ConfigError if notget_log_level which allows you to control verbosity via LOG_LEVEL
You can call config.validate_config() at startup to fail fast on configuration issues.
OpenAI embedding calls are wrapped with tenacity based retry logic that:
RateLimitError, APIConnectionError, and ConnectionError
This makes the system more resilient to transient network and quota issues.
All logs now use structured prefixes instead of emojis:
[OK] for successful operations[INFO] for informational messages[WARN] for warnings[ERROR] for errorsThis is friendlier for log aggregation systems and avoids issues in environments where emoji output is undesirable.
The project now includes unit tests for:
Run them with:
pytest tests/ -v
In a test run on a 50,000 document knowledge base the self healing approach compared to a naive static index produced:
| Metric | Static Index | Self Healing | Improvement |
|---|---|---|---|
| Retrieval Quality (MRR@10) | 0.763 | 0.841 | plus 10.2 % |
| Embedding API Costs | 45 USD | 12.50 USD | 72 percent |
| Storage Costs | 120 USD | 85 USD | 29 percent |
| Total 90 day Cost | 165 USD | 97.50 USD | 41 percent |
The main drivers of these savings are:
The reference implementation focuses on a single model and a relatively simple drift detector. In real systems there are several natural extensions.
Hybrid search combines vector similarity with BM25 keyword search. It is particularly useful when vector similarity is low for a query.
def hybrid_search(vs: SelfHealingVectorStore, query: str, threshold: float = 0.75):
query_embedding = get_openai_embeddings([query])[0]
vector_results = vs.search(query_embedding, k=10)
if not vector_results:
return []
top_score = vector_results[0].get("_score", 0)
if top_score >= threshold:
return vector_results
# Example hybrid strategy: call vs.hybrid_search which combines vector and BM25
return vs.hybrid_search(query_embedding, query, k=10)
You can push cost savings further by assigning importance scores to documents and applying more aggressive quantization to low importance content. The SmartReindexer.adaptive_quantization method in the repo demonstrates one approach:
This allows you to trade a very small amount of retrieval quality for significant storage reductions.
The current system reacts to observed drift. An interesting next step is to use time series analysis on drift metrics to predict when drift will cross a threshold and schedule reindexing proactively, for example during low traffic windows.
Static vector indexes are fine for demos and short lived experiments. In production systems they are a liability. Data changes, language changes, and embedding models evolve. If your index never heals itself, retrieval quality will eventually drift out of bounds even if everything else looks healthy.
Self healing vector indexes address this by:
The reference implementation in elasticsearch-self-healing-vectors is a complete, runnable system that demonstrates these ideas with Elasticsearch, OpenAI embeddings, and a Python based agent.
To explore further:
README.md in the repository for detailed usageexample_usage.py to see the end to end flowSelfHealingAgent and SmartReindexer to your own RAG stackIf you have fought vector drift in production or built similar systems, your experiences and ideas can help guide the next iteration of this work.