2025-11-27 16:32:26
Visit to https://config.office.com/deploymentsettings
- After downloading Configuration.xml file replace this file in ODT folder.
2025-11-27 16:18:09
In one of my recent projects, I needed to replicate data from an AWS Kinesis stream into an Apache Kafka topic. The goal was to allow downstream systems to consume the same events from Kafka that were originally being produced to Kinesis.
The core question here is: How can we replicate data from AWS Kinesis stream to a Kafka topic in real-time and with minimal latency, and with the highest reliability?
I explored different solutions, including Kafka Connect, MirrorMaker, and other integration tools. However, most of them came with complex configurations and limitations that didn’t fit our infrastructure setup.
So, I decided to build a custom Kinesis-to-Kafka bridge that will be flexible, lightweight, highly configurable, and perfectly plug into our infrastructure setup.
If you’re new to Amazon Kinesis, here’s a quick overview of it (feel free to skip this section if you’re already familiar):
Amazon Kinesis is a group of streaming services within Amazon Web Services (AWS) created to stream real-time data at scale. It contains different services like:
In this article, we will focus on Kinesis Data Streams, which is a serverless (fully managed by AWS) streaming service designed to handle high-throughput, low-latency streaming use-cases like events, logs, and clickstreams.
If you’re also new to Apache Kafka, here’s a quick overview(feel free to skip this section if you’re already familiar):
Apache Kafka is a distributed event streaming platform that handles large volumes of real-time data. It provides durability, scalability, and strong ordering guarantees. It is usually used to build data pipelines and streaming applications at different scales, and it is very dominant in event-driven architectures.
The replicator consists of three core stages, as shown in the following diagram:
┌──────────────────────────┐
│ Kinesis Consumer │ ← Consumes records from AWS Kinesis stream
└────────────┬─────────────┘
│
▼
┌──────────────────────────┐
│ Data Processing │ ← Applies business logic to transform/validate/parse the records
└────────────┬─────────────┘
│
▼
┌──────────────────────────┐
│ Producing to Kafka │ ← Produces processed events to Kafka topic/topics
└──────────────────────────┘
Kinesis Consumer: Consuming data from a Kinesis stream requires more than just reading records; it requires manual management for offsets since Kinesis doesn’t provide a built-in offset tracking mechanism for consumers. Instead, consumers are responsible for tracking their own progress, typically by storing checkpoint data in a DynamoDB table, often referred to as a lease table.
This approach, called leasing, allows multiple consumers to coordinate shard (the basic unit of Kinesis stream) access and avoid processing the same records. You can implement this logic manually (very complex) or use a ready-to-use library such as Amazon Kinesis Client Library (KCL), which handles the lease management, DynamoDB tables creation, and offset tracking out of the box.
In my case, I used KCL for Python (KCLPY) to consume from Kinesis so that I can avoid re-implementing low-level coordination and state management logic.
Data Processing: This stage is where your core business logic lives. After data is fetched from Kinesis but before it’s forwarded to Kafka, you may need to:
If no transformation is needed, the data can be streamed directly to Kafka.
Producing to Kafka: This is the final stage, in which records are being published to the designated Kafka topic.
At the implementation level, this is typically achieved by using one of the Kafka SDKs. In my case, I used the confluent_kafka client library for Python.
After this stage, the records are serialized and published to Kafka, which allows the downstream systems to consume the data in real-time.
This project was developed using Python, kclpy, confluent_kafka, and Docker.
Below is an overview of the project structure:
kinesis-to-kafka-replicator/
├── kinesis_replicator/
│ ├── __init__.py
│ └── record_processor.py
│
├── config/
│ ├── kcl.properties.template
│ └── replicator_configs.template
│
├── amazon_kclpy/
│ └── jars/
│
├── run.sh
├── Dockerfile
├── requirements.txt
└── README.md
The configuration is separated into two parts:
KCLPY Configuration: Located in kcl.properties.template and contains all the required settings for the Kinesis client library (KCL), such as streamName, initialPositionInStream, and the AWS authentication parameters. A more detailed list can be found here.
Replicator Configuration: Located in replicator_configs.template and includes the kafka_specific settings, such as bootstrap_servers, kafka_topic, and client_id, along with any global parameters required for tuning the replicator, such as sleep_seconds.
All Python dependencies, such as boto3, confluent_kafka, and amazon_kclpy, are listed in requirements.txt.
This is the heart code base of the replicator.
The RecordProcessor class is responsible for:
A more detailed sample can be found here
Following a simplified breakdown of what’s happening:
➤ Processing Kinesis Records
# Example for KCL process_records method with leasing functionality (checkpointing)
def process_records(self, process_records_input):
"""
Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
from the records to indicate where in the stream to checkpoint.
:param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
records.
"""
try:
for record in process_records_input.records:
data = record.binary_data
seq = int(record.sequence_number)
sub_seq = record.sub_sequence_number
key = record.partition_key
self.process_record(data, key, seq, sub_seq) # Business logic is being imlemented in this methos
if self.should_update_sequence(seq, sub_seq):
self._largest_seq = (seq, sub_seq)
# Checkpoints every N seconds
if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
self._last_checkpoint_time = time.time()
except Exception as e:
self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))
➤ Producing to Kafka
# A code snippet from the process_record method mentioned above that shows how to produce to Kafka
try:
self.kafka_producer.produce(
topic=self.kafka_topic,
value=data,
key=partition_key,
callback=callback
)
except BufferError:
logger.warning(f"Kafka producer queue full, draining and retrying for record {sequence_number}")
self.kafka_producer.poll(1.0)
self.kafka_producer.produce(
topic=self.kafka_topic,
value=data,
key=partition_key,
callback=callback
)
This ensures the record is safely published to Kafka, with retry logic in case of temporary backpressure.
amazon_kclpy/jars/: This folder contains the Java dependencies used by KCLPY. These are required to enable the bridge between Python and the underlying KCL lease coordination.
Jar files can be generated by following the instructions written here.
To ensure portability and seamless deployment, the replicator is contranized using Docker. Here is a breakdown of how to build and run it.
Dockerfile: Defines how to build a container image for the replicator that includes the required Python environment, libraries, and the Kinesis Client Library (KCL) dependencies.
➤ Sample Dockerfile
FROM python:3.9-slim
# Install system dependencies
RUN apt-get update && apt-get install -y openjdk-11-jre curl unzip && \
rm -rf /var/lib/apt/lists/*
# Set workdir and copy code
WORKDIR /app
COPY . /app
# Install Python requirements
RUN pip install --no-cache-dir -r requirements.txt
# Set entrypoint
ENTRYPOINT ["./run.sh"]
run.sh: The run.sh script acts as the entrypoint to launch the replicator. First, the script renders the configuration template files into typical configuration files, then it bootstraps the KCL MultiDaemon with that configuration.
The run.sh script includes:
➤ Sample run.sh
#!/bin/bash
set -e
# Rendering the configuration templates
envsubst < config/replicator_configs.template > config/replicator_configs.json
envsubst < config/kcl.properties.template > config/kcl.properties
# Start KCL daemon
java -cp "amazon_kclpy/jars/*" software.amazon.kinesis.multilang.MultiLangDaemon config/kcl.properties
Build and Run
Pre-requisites:
1) Build the Docker image
# Go to the Dockerfile path and run the following command
docker build -t kinesis-to-kafka-replicator .
2) Prepare the required environment variables
# Example for KCL configuration
KCL_STREAM_NAME=kafka-kinesis-replicator-test
KCL_APPLICATION_NAME=replicator-app-name
KCL_REGION=eu-west-1
KCL_INITIAL_POSITION=LATEST
KCL_LOG_LEVEL=DEBUG
KCL_AWS_CREDENTIALS_PROVIDER=DefaultCredentialsProvider
# Example for Kafka and global configuration
SLEEP_SECONDS=5
CHECKPOINT_RETRIES=5
CHECKPOINT_FREQ_SECONDS=30
KAFKA_TOPIC=kinesis_kafka_replication_test
KAFKA_BOOTSTRAP_SERVERS=bootstrap-server:9094
KAFKA_SASL_USERNAME=kafka-user
KAFKA_SASL_PASSWORD=kafka-password
KAFKA_SASL_MECHANISM=SCRAM-SHA-512
KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
KAFKA_CLIENT_ID=replicator-client
NUMBER_OF_SHARDS=4
AWS_ACCESS_KEY_ID=example-key-id
AWS_SECRET_ACCESS_KEY=example-secret-key
AWS_SESSION_TOKEN=example-token
Note: The above environment variables are saved into a kinesis_kafka_replicator.env file and will be used in the following Docker run command to be passed to the container.
3) Run the container
docker run --env-file kinesis_kafka_replicator.env -it kinesis-to-kafka-replicator
Verify the container is in a running state.
Once the replicator container is up and running, we can observe some logs to make sure that the app inside the container is working properly.
1) The application started successfully: The highlighted log indicates that the replicator started successfully, and it is processing the correct stream.
2) Monitor the DDB tables creation: The following images confirm that the replicator can recognize the pre-created tables and will not create them again.
3) KCL leader election: The replicator elected a worker leader and assigned it to lead the consumption from a shard.
4) KCL Kinesis consumer: The replicator started to consume the records from the Kinesis stream and push them to Kafka.
5) Observing the Kafka topic: By observing the Kafka topic on Kafka Ui, it can be easily noticed that the messages started to show up there.
All of this confirms that our replicator is working properly and can mirror the data between the two platforms successfully.
This approach helped in real-time streaming between Kinesis and Kafka without over-engineering. In this guide, we built a lightweight replicator using Python, KCL, and Kafka libraries to bridge data between the two systems in real-time, ensuring reliability and configurability.
2025-11-27 16:17:47
Hafiz Syed Ashir Hassan @ AWS Amarathon 2025
Problem Statement:
Organisations struggle with unstructured data in various formats (documents, images, audio, video).
Manual processing is slow, inconsistent, and costly.
Existing automation systems are rigid, requiring templates, rules, and manual corrections.
Increasing demand for compliance, accuracy, and scalability.
Need for automating multi-format data processing with high accuracy using generative AI.
What is Bedrock Data Automation (BDA)?:
A fully-managed document and media automation capability in Amazon Web Services.
Enables building end-to-end extraction, classification, and transformation pipelines using foundation models.
Processes documents, images, audio, and video at scale.
Orchestrates multi-step workflows using serverless automation.
Minimises custom code while maximising flexibility.
Input Asset:
[ 1 ] Supports various formats:
Documents (PDF, DOCX, scanned, structured/unstructured)
Images (PNG, JPG)
Audio (voice notes, call recordings)
Video (meetings, CCTV, webinars)
[ 2 ] Offers two types of output instructions:
Standard Output Configuration
Custom Schema based on matched blueprint
Output Response:
Linearized Text representation of the asset based on configuration.
Output returned as JSON + additional files if selected in configuration.
Supported Formats & Information BDA Extracts:
[ 1 ] Documents:
Extracts fields, tables, entities
Classifies, transforms, summarises, and validates
[ 2 ] Images:
Offers OCR, document classification, object detection, and handwriting extraction
[ 3 ] Audio:
Provides transcription, summarisation, sentiment analysis, speaker detection, and intent extraction
[ 4 ] Video:
Offers video summaries, speech-to-text, scene detection, object recognition, and action understanding
Standard Output vs Custom Output (Blueprints):
[ 1 ] Standard Output:
Out-of-the-box extraction
Ideal for common documents
Zero setup, quick results
[ 2 ] Custom Output:
Based on blueprints
Allows for prompt or user-defined blueprints
Accelerates setup and maintains consistency
Suitable for industry-specific or complex documents
Types of Document Blueprints:
[ 1 ] Classification:
Invoice, bank statement, ID card, contract, HR letter, etc.
[ 2 ] Extraction:
Entities, fields, tables, metadata
[ 3 ] Transformation:
Modify or restructure data
[ 4 ] Normalization:
Standardise data values
[ 5 ] Validation:
Validate extracted fields against rules
Use Cases:
[ 1 ] Banking & Finance:
Automate bank statements, invoices, receipts, fraud checks
[ 2 ] Insurance:
Claims processing from forms, photos, reports
Auto-summaries, extraction, validation
[ 3 ] Customer Support:
Transcribe & summarize calls
Detect sentiment and customer intent
[ 4 ] HR & Legal:
Process resumes, contracts, offer letters
Extract skills, clauses, obligations
[ 5 ] Security & Operations:
Summaries from meeting recordings
CCTV context extraction (people, actions)
Key Takeaways:
Bedrock Data Automation (BDA) is a comprehensive, customizable, and scalable solution.
[ 1 ] One Platform for All Formats:
Automates document, image, audio, and video processing.
[ 2 ] Customizability:
Delivers highly accurate and customizable outputs using advanced foundation models.
Ensures trustworthy and consistent insights tailored to any business workflow.
[ 3 ] Enterprise-Ready:
Scales to thousands of files with high accuracy and compliance.
[ 4 ] Faster, Cheaper, Smarter:
Reduces manual workload and delivers clean, structured outputs instantly.
2025-11-27 16:16:55
Tan Xin @ AWS Amarathon 2025
Challenge
Traditional Workflow During a Failure
Check cloud resource status
Failure occurs
View incident history
Check configuration
Check alerts
Assign work order
Analyze logs
Hypothesize root cause
Search for solutions
Check metrics
Analyze dependencies
View recent updates
Search call chains
View O&M manual
Query service dashboard
Execute mitigation measures
Notify colleagues
Query abnormal metrics
Read manual
Monitor recovery status
Core Challenges
Timely loss mitigation
Fault isolation
Resolve issues within an acceptable timeframe
Limit failures within isolation boundaries to prevent cascading effects on other services, thereby reducing the scope of failure impact
Ensure services meet user expectations and SLAs
Solution Evolution
Ideal Solution
System Notifications and Alerts
Alarm Metric: 5xx rate over 30%
Alarm Detail: Service, Endpoint, Triggered Time
Supplementary Root Cause Analysis
Key Findings: API error rate 100%, DB no error, S3 bucket policy was updated.
Immediate Action: Check S3 bucket policy to Deny
Confidence: 90%
Confirm Automatic Operations
Message: System Revered.
RCA: S3 bucket policy was set to Deny
MTTR: 5 mins
Reduce failure recovery time from hours to minutes
SRE Expert Work Scope
Why did my User Service error rate reach 5% in the past hour?
Because the RDS MySQL instance experienced 12 connection limit exceeded issues in the past hour
Familiarize with the current system
Find service correlations
Analyze logs
Analyze audit logs
Analyze configuration
Analyze metrics
Consider Two Questions
If the system complexity is high, the troubleshooting workflow is long, and the log volume is large, can a single agent work smoothly?
Is it possible to clone SRE expert experience into agents to replace the Q&A method, allowing agents to make autonomous decisions and actions?
Multi-Agent Architecture Design
"Planner" creates workflows
"Executor" is responsible for executing assigned tasks
"Evaluator" is responsible for assessing whether each step's result is beneficial, returning to the "Planner" for subsequent planning
Results also need to be reviewed by the "Evaluator" before returning
The "Planner" can adjust the process based on feedback from the "Evaluator"
Multi-Agent vs Single Agent
Suitable for more complex tasks
Clearer responsibility and permission boundaries
Easier context engineering
More convenient scaling
AgentCore Best Practices
Introduction to Agent-Specific Runtime Environment
Challenges from "trial" to "implementation"
Challenges from PoC to production environment implementation
Performance
Elasticity
Security
Creating business value
Compliance
Agent Runtime Environment v1.0
INTERFACES & PROTOCOLS (MCP/A2A)
Agent Deployment
Agent Framework
Large Language Model
Memory
Prompts
Tools/Resources
Guardrails
Observability
Evaluation
Agent Runtime Environment v2.0
INTERFACES & PROTOCOLS (MCP/A2A)
Agent Deployment
Amazon Bedrock
AgentCore
Agent Framework
Large Language Model Runtime
Memory
Prompts
Tools/Resources
Identity Tools
Guardrails
Gateway
Observability
Evaluation
SPECIALIZED
FULLY-MANAGED
DIY
2025-11-27 16:10:02
I have been fooling around a lot with ai recently, and I thought I would write something about what I've been doing. There are a few things that I've been doing, and they're all fascinating.
This is part two of a small series that I have created to walk through the process I went through to get decent code.
I had a crazy idea. I thought to myself, let's write something that will go through my git repos and automagially update my dockerfiles so that the dockerfile uses a fixed but more recent version of the base image.
In my first post, I looked at the codebase, which frankly was very average, but worked. I thought to myself "I wonder how much of a refactor it would take to get a real service working".
As it turns out.... not very much.
I have been using Claude (not Claude code) for a little while now, and thought I might try to get it to refactor my terrible code to be well, better.
I use a simple prompt
refactor the following code to make it more robust
Then I pasted in my awful code.
What came out was generally pretty good, however, Claude initially generated another script that I had to run that was relatively static. On the plus side, my code was now object oriented, less procedural, and had a lot of features that I didn't have originally.
The biggest change here was that the code that was generated was robust and was object oriented. An example is below - the environment manager class that manages environment variables.
This is good practice, and is not something that I would have done myself.
class EnvironmentManager:
"""Manages environment variables and API keys."""
@staticmethod
def get_github_token() -> str:
"""Get GitHub token from environment or prompt user."""
token = os.environ.get("GIT_PAT_AI")
if not token:
token = getpass.getpass("Enter API key for GitHub: ")
os.environ["GIT_PAT_AI"] = token
return token
@staticmethod
def get_anthropic_key() -> str:
"""Get Anthropic API key from environment or prompt user."""
key = os.environ.get("ANTHROPIC_API_KEY")
if not key:
key = getpass.getpass("Enter API key for Anthropic: ")
os.environ["ANTHROPIC_API_KEY"] = key
return key
I am being honest here about what I would and wouldn't do without the assitance of another pAIr programmer.
I realised I didn't want a static python script. I wanted a service.
I used the prompt below to get Claude to refactor my code to make it more robust, and so that it had a REST based interface that I could send requests to.
refactor the code to accept the repo as an input via a rest interface
This was the piece that blew me away.
The code that was generated was even better. I ended up with a full service that ran, and had a job manager and logger that would take all of the incoming requests and manage them.
I realised that I didn't know how to use this, so I also asked for example usage using the prompt
write a curl example of how to use the api
Asking for usage examples is the fastest way to get an idea of code you never wrote. It's also a great way to walk through usage.
The codebase is now a BEAST, but it worked first go!!!
I got claude to also generate a sequence diagram for me. It's large, but shows the interaction between the front end of the service, the github service and the user.
The proof is in the pudding here. I start the server on my laptop and I can see that it's running as a service on port 8000.
I placed the usage before the code walkthrough because I recognise not everyone will be interested in the code walkthrough - there is a conclusion at the end of the article.
I start up the server and the following is displayed on my terminal (stdout).
./git-test2.py
2025-11-27 17:18:04,515 - INFO - Initialized AI model: claude-opus-4-1-20250805
INFO: Started server process [1966]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
I send my first request to the service. I need to include a data section that includes my git PAT TOKEN. This is for access to the repo. I also include the repo owner, the repo name, the branch, the path of the file and a commit message.
The last thing is a dry run variable. It is possible to use the service in dry run mode where no changes will actually be made.
curl -X POST "http://localhost:8000/update-dockerfile" -H "Content-Type: application/json" -d '{
"owner": "codecowboydotio",
"repo": "swapi-json-server",
"branch": "main",
"github_token": "GIT_PAT_TOKEN",
"dockerfile_path": "Dockerfile",
"commit_message": "Updated Dockerfile FROM via AI",
"dry_run": false
}'
The output on the console from my shell command is as follows. You can see that a job is created. The job has a unique ID, and is queued for processing.
[
{
"job_id":"109b7804-61c3-4f36-8b51-ea35642b41c2",
"status":"pending",
"message":"Job created and queued for processing",
"timestamp":"2025-11-27T17:19:02.302591"
}
]
Once the request is made, I can see the service fetch the contents, find the file, make the call to anthropic, and finally analyse the Dockerfile.
2025-11-27 17:19:02,305 - INFO - Fetching repository contents from https://api.github.com/repos/codecowboydotio/swapi-json-server/contents/
2025-11-27 17:19:02,831 - INFO - Downloading file from https://raw.githubusercontent.com/codecowboydotio/swapi-json-server/main/Dockerfile
2025-11-27 17:19:11,522 - INFO - HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
2025-11-27 17:19:11,533 - INFO - Successfully analyzed Dockerfile for base image updates
When I query the list of jobs using the /jobs endpoint, I can see that the status of the job is now completed, and there is a message that says no changes were needed.
curl -X GET http://localhost:8000/jobs -H "Content-Type: application/json" | jq
The output shows the time, the ID of the request, and a message saying no changes were needed. Importantly, thecurrent and updated FROM lines are displayed.
[
{
"job_id": "109b7804-61c3-4f36-8b51-ea35642b41c2",
"status": "completed",
"message": "No changes needed - Dockerfile is already up to date",
"timestamp": "2025-11-27T17:19:11.533974",
"result": {
"changed": false,
"current_from": "FROM docker.io/library/ubuntu:24.04",
"updated_from": "FROM docker.io/library/ubuntu:24.04"
},
"error": null
}
]
When I reset my Dockerfile to require a change and re-run the request, I see that there is a new job ID created.
{
"job_id": "6691c4ac-e232-43fd-86f9-073440d32bac",
"status": "pending",
"message": "Job created and queued for processing",
"timestamp": "2025-11-27T17:26:40.359356"
}
I then check my new job to see what the status is.
curl -X GET http://localhost:8000/jobs -H "Content-Type: application/json" | jq
As I am using the /jobs endpoint, this lists ALL jobs, not an individual job. I can see my original job that had no change, and I can see my new job, which has successfully updated the dockerfile in my repository.
[
{
"job_id": "109b7804-61c3-4f36-8b51-ea35642b41c2",
"status": "completed",
"message": "No changes needed - Dockerfile is already up to date",
"timestamp": "2025-11-27T17:19:11.533974",
"result": {
"changed": false,
"current_from": "FROM docker.io/library/ubuntu:24.04",
"updated_from": "FROM docker.io/library/ubuntu:24.04"
},
"error": null
},
{
"job_id": "6691c4ac-e232-43fd-86f9-073440d32bac",
"status": "completed",
"message": "Successfully updated and committed Dockerfile",
"timestamp": "2025-11-27T17:26:50.300875",
"result": {
"changed": true,
"current_from": "FROM docker.io/library/ubuntu:20.04",
"updated_from": "FROM docker.io/library/ubuntu:24.04",
"dry_run": false,
"commit_sha": "4304b404050e15293ed7d017752c252593cbc102",
"file_url": "https://github.com/codecowboydotio/swapi-json-server/blob/main/Dockerfile"
},
"error": null
}
]
We can see from the commits in the repo that it has indeed been updated!
Let's walk through the code base, and how to use it.
The codebase is documented but did not contain an easy way to run it. For my own sanity I went and pasted in one of the example REST calls so that I could cut and paste it at any time.
#!/usr/bin/env python3
"""
Dockerfile Base Image Updater REST API
This service provides a REST API to automatically update Dockerfile base images
to their latest versions using AI analysis and commits the changes to GitHub.
"""
# curl -X POST "http://localhost:8000/update-dockerfile" -H "Content-Type: application/json" -d '{
# "owner": "codecowboydotio",
# "repo": "swapi-json-server",
# "branch": "main",
# "github_token": "XXX",
# "dockerfile_path": "Dockerfile",
# "commit_message": "Updated Dockerfile FROM via AI",
# "dry_run": false
#}'
There are a lot more imports of libraries this time. Claude chose FastAPI, and urllib3, but kept my original langhain and requests portions of code.
import asyncio
import base64
import json
import logging
import os
import sys
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
import uuid
import requests
from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, validator, field_validator
from langchain.chat_models import init_chat_model
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
The logger configuration from the second iteration was kept.
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('dockerfile_updater_api.log')
]
)
logger = logging.getLogger(__name__)
What was interesting was the job manager class that has been introduced. This is not something that I had thought about, however, in terms of scaling this tiny program into a more robust and larger service, this is definitely something that would be needed. Below are both the classes that were instantiated and the dataclass objects. All of the classes and data classes that have been added are good to have, and provide a clearer definition of the "what" each type of data is.
class JobStatus(str, Enum):
"""Job status enumeration."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Config:
"""Configuration class for the application."""
api_version: str = "2022-11-28"
timeout: int = 30
max_retries: int = 3
ai_model: str = "claude-opus-4-1-20250805"
class DockerfileUpdateRequest(BaseModel):
"""Request model for Dockerfile update."""
owner: str
repo: str
branch: str = "main"
github_token: str
dockerfile_path: str = "Dockerfile"
commit_message: Optional[str] = "Updated Dockerfile FROM via AI"
dry_run: bool = False
@field_validator('owner', 'repo')
def validate_not_empty(cls, v):
if not v or not v.strip():
raise ValueError('Field cannot be empty')
return v.strip()
@field_validator('github_token')
def validate_github_token(cls, v):
if not v or len(v) < 10:
raise ValueError('Invalid GitHub token')
return v
class DockerfileUpdateResponse(BaseModel):
"""Response model for Dockerfile update."""
job_id: str
status: JobStatus
message: str
timestamp: datetime
class JobStatusResponse(BaseModel):
"""Response model for job status."""
job_id: str
status: JobStatus
message: str
timestamp: datetime
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
@dataclass
class JobResult:
"""Job result data class."""
job_id: str
status: JobStatus
message: str
timestamp: datetime
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class DockerfileUpdaterError(Exception):
"""Custom exception for Dockerfile updater errors."""
pass
The HTTP client class is client that that is used by the service to update the dockerfile. This is performed via a REST API call from the client to the github API. Claude has correctly added session handling, error handling and exception code.
class HTTPClient:
"""HTTP client with retry logic and proper error handling."""
def __init__(self, timeout: int = 30, max_retries: int = 3):
self.session = requests.Session()
self.timeout = timeout
# Configure retry strategy
retry_strategy = Retry(
total=max_retries,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "PUT"],
backoff_factor=1
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def get(self, url: str, **kwargs) -> requests.Response:
"""Make GET request with error handling."""
try:
kwargs.setdefault('timeout', self.timeout)
response = self.session.get(url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
logger.error(f"GET request failed for {url}: {e}")
raise DockerfileUpdaterError(f"HTTP GET failed: {e}")
def put(self, url: str, **kwargs) -> requests.Response:
"""Make PUT request with error handling."""
try:
kwargs.setdefault('timeout', self.timeout)
response = self.session.put(url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
logger.error(f"PUT request failed for {url}: {e}")
raise DockerfileUpdaterError(f"HTTP PUT failed: {e}")
This class is specifically for dealing with the Github API. This is slightly different from the HTTP client that provides the transport.
There are four different methods that are to:
All of this code is relatively robust, object oriented, and has appropriate error handling.
From where we started with a static procedural script, to get to here is quite amazing.
class GitHubAPI:
"""GitHub API client with proper error handling."""
def __init__(self, token: str, owner: str, repo: str, config: Config, http_client: HTTPClient):
self.token = token
self.owner = owner
self.repo = repo
self.config = config
self.http_client = http_client
self.base_url = "https://api.github.com"
self.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": config.api_version,
"Content-Type": "application/json"
}
def get_repository_contents(self) -> List[Dict[str, Any]]:
"""Get repository contents."""
url = f"{self.base_url}/repos/{self.owner}/{self.repo}/contents/"
logger.info(f"Fetching repository contents from {url}")
response = self.http_client.get(url, headers=self.headers)
if response.status_code == 204:
raise DockerfileUpdaterError("Repository is empty or deployment in progress")
try:
return response.json()
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON response: {e}")
raise DockerfileUpdaterError("Invalid JSON response from GitHub API")
def get_file_content(self, download_url: str) -> str:
"""Download file content from GitHub."""
logger.info(f"Downloading file from {download_url}")
response = self.http_client.get(download_url)
return response.text
def get_file_sha(self, file_path: str, branch: str) -> Optional[str]:
"""Get the SHA of an existing file."""
url = f"{self.base_url}/repos/{self.owner}/{self.repo}/contents/{file_path}"
params = {"ref": branch}
try:
response = self.http_client.get(url, headers=self.headers, params=params)
return response.json()["sha"]
except DockerfileUpdaterError:
# File doesn't exist
return None
def commit_file(self, file_path: str, content: str, commit_message: str, branch: str) -> Dict[str, Any]:
"""Commit file to repository."""
url = f"{self.base_url}/repos/{self.owner}/{self.repo}/contents/{file_path}"
# Encode content to base64
try:
content_encoded = base64.b64encode(content.encode('utf-8')).decode('utf-8')
except UnicodeEncodeError as e:
raise DockerfileUpdaterError(f"Failed to encode file content: {e}")
payload = {
"message": commit_message,
"content": content_encoded,
"branch": branch
}
# Get existing file SHA if file exists
existing_sha = self.get_file_sha(file_path, branch)
if existing_sha:
payload["sha"] = existing_sha
logger.info(f"Updating existing file {file_path}")
else:
logger.info(f"Creating new file {file_path}")
response = self.http_client.put(url, headers=self.headers, data=json.dumps(payload))
return response.json()
This class is the class that makes the called to the AI model. There are two methods here:
The two interesting things here are that clause has not altered the prompts, json schemas or changed the use the langchain.
Claude has continued to provide robust error handling and has turned my code into a class.
class AIAnalyzer:
"""AI-powered Dockerfile analyzer."""
def __init__(self, model_name: str = "claude-opus-4-1-20250805"):
try:
# Ensure Anthropic API key is available
if not os.environ.get("ANTHROPIC_API_KEY"):
raise DockerfileUpdaterError("ANTHROPIC_API_KEY environment variable is required")
self.model = init_chat_model(model_name, model_provider="anthropic")
logger.info(f"Initialized AI model: {model_name}")
except Exception as e:
logger.error(f"Failed to initialize AI model: {e}")
raise DockerfileUpdaterError(f"AI model initialization failed: {e}")
def find_dockerfile_url(self, repository_contents: List[Dict[str, Any]], dockerfile_path: str) -> str:
"""Find Dockerfile URL in repository contents using AI."""
# First try to find the file directly by name
for item in repository_contents:
if item.get("name") == dockerfile_path and item.get("type") == "file":
return item.get("download_url")
# If not found directly, use AI to search
dockerfile_schema = {
"title": "dockerfile",
"description": "Format of dockerfile links",
"type": "object",
"properties": {
"textresponse": {
"type": "string",
"description": "The text response portion",
},
"fileurl": {
"type": "string",
"description": "The actual url of the file",
},
},
"required": ["textresponse", "fileurl"],
}
structured_model = self.model.with_structured_output(dockerfile_schema)
try:
response = structured_model.invoke(
f'Find dockerfile named "{dockerfile_path}" in array {repository_contents} return only value do
wnload_url'
)
logger.info("Successfully found Dockerfile URL using AI")
return response["fileurl"]
except Exception as e:
logger.error(f"AI analysis failed for finding Dockerfile: {e}")
raise DockerfileUpdaterError(f"Failed to find Dockerfile '{dockerfile_path}': {e}")
def update_dockerfile_base_image(self, dockerfile_content: str) -> str:
"""Update Dockerfile base image using AI."""
dockerfile_schema = {
"title": "dockerfile",
"description": "the dockerfile",
"type": "object",
"properties": {
"textresponse": {
"type": "string",
"description": "The text response portion",
},
"dockerfile": {
"type": "string",
"description": "the dockerfile",
},
},
"required": ["textresponse", "dockerfile"],
}
structured_model = self.model.with_structured_output(dockerfile_schema)
try:
prompt = (
f'Update the FROM command to be the latest baseimage version for {dockerfile_content}, '
'return the updated dockerfile. Make no changes if the baseimage is already at the latest versi
on'
)
response = structured_model.invoke(prompt)
logger.info("Successfully analyzed Dockerfile for base image updates")
return response["dockerfile"]
except Exception as e:
logger.error(f"AI analysis failed for updating Dockerfile: {e}")
raise DockerfileUpdaterError(f"Failed to update Dockerfile: {e}")
This is the one thing that I just didn't even think of. Given that I have an API calling out to another API, and this is likely to be asynchronous, job handling is required. Again, the methods here are impressive.
This includes having multiple status types for jobs, including PENDING and COMPLETED.
class JobManager:
"""Manages background jobs."""
def __init__(self):
self.jobs: Dict[str, JobResult] = {}
def create_job(self, job_id: str) -> None:
"""Create a new job."""
self.jobs[job_id] = JobResult(
job_id=job_id,
status=JobStatus.PENDING,
message="Job created",
timestamp=datetime.now()
)
def update_job(self, job_id: str, status: JobStatus, message: str,
result: Optional[Dict[str, Any]] = None, error: Optional[str] = None) -> None:
"""Update job status."""
if job_id in self.jobs:
self.jobs[job_id].status = status
self.jobs[job_id].message = message
self.jobs[job_id].timestamp = datetime.now()
self.jobs[job_id].result = result
self.jobs[job_id].error = error
def get_job(self, job_id: str) -> Optional[JobResult]:
"""Get job by ID."""
return self.jobs.get(job_id)
def list_jobs(self) -> List[JobResult]:
"""List all jobs."""
return list(self.jobs.values())
This is a wrapper class that pulls together using the HTTP client above, and the AI analyser class to make the changes to the dockerfile. This class also performs implementation of the job handling code.
class DockerfileUpdaterService:
"""Main service class."""
def __init__(self, config: Config):
self.config = config
self.http_client = HTTPClient(config.timeout, config.max_retries)
self.ai_analyzer = AIAnalyzer(config.ai_model)
self.job_manager = JobManager()
async def update_dockerfile(self, job_id: str, request: DockerfileUpdateRequest) -> None:
"""Update Dockerfile in background."""
try:
self.job_manager.update_job(job_id, JobStatus.RUNNING, "Starting Dockerfile update process")
# Initialize GitHub API client
github_api = GitHubAPI(
request.github_token,
request.owner,
request.repo,
self.config,
self.http_client
)
# Get repository contents
contents = github_api.get_repository_contents()
self.job_manager.update_job(job_id, JobStatus.RUNNING, "Retrieved repository contents")
# Find Dockerfile URL
dockerfile_url = self.ai_analyzer.find_dockerfile_url(contents, request.dockerfile_path)
self.job_manager.update_job(job_id, JobStatus.RUNNING, f"Found Dockerfile at: {dockerfile_url}")
# Download current Dockerfile
current_dockerfile = github_api.get_file_content(dockerfile_url)
current_first_line = current_dockerfile.split('\n', 1)[0] if current_dockerfile else ""
# Update Dockerfile using AI
updated_dockerfile = self.ai_analyzer.update_dockerfile_base_image(current_dockerfile)
updated_first_line = updated_dockerfile.split('\n', 1)[0] if updated_dockerfile else ""
# Check if changes are needed
if current_first_line == updated_first_line:
self.job_manager.update_job(
job_id,
JobStatus.COMPLETED,
"No changes needed - Dockerfile is already up to date",
result={
"changed": False,
"current_from": current_first_line,
"updated_from": updated_first_line
}
)
return
result_data = {
"changed": True,
"current_from": current_first_line,
"updated_from": updated_first_line,
"dry_run": request.dry_run
}
# Commit updated Dockerfile (unless dry run)
if not request.dry_run:
commit_result = github_api.commit_file(
file_path=request.dockerfile_path,
content=updated_dockerfile,
commit_message=request.commit_message,
branch=request.branch
)
result_data.update({
"commit_sha": commit_result['commit']['sha'],
"file_url": commit_result['content']['html_url']
})
self.job_manager.update_job(
job_id,
JobStatus.COMPLETED,
"Successfully updated and committed Dockerfile",
result=result_data
)
else:
self.job_manager.update_job(
job_id,
JobStatus.COMPLETED,
"Dry run completed - changes detected but not committed",
result=result_data
)
except Exception as e:
error_message = str(e)
logger.error(f"Job {job_id} failed: {error_message}")
self.job_manager.update_job(
job_id,
JobStatus.FAILED,
"Job failed with error",
error=error_message
)
This portion of the code implements the API and reverse proxy / middleware layer of my API. This uses the config class and passes that to the updater service.
# Initialize FastAPI app
app = FastAPI(
title="Dockerfile Updater API",
description="REST API for updating Dockerfile base images using AI",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize services
config = Config()
service = DockerfileUpdaterService(config)
# Security
security = HTTPBearer(auto_error=False)
async def get_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Validate API key (optional)."""
# You can implement API key validation here if needed
return credentials
The code below implements a number of routes. These are:
Each of these routes can be called and with the appropriate headers is the primary way to interact with the service.
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "timestamp": datetime.now()}
@app.post("/update-dockerfile", response_model=DockerfileUpdateResponse)
async def update_dockerfile(
request: DockerfileUpdateRequest,
background_tasks: BackgroundTasks,
credentials: HTTPAuthorizationCredentials = Depends(get_api_key)
):
"""Update Dockerfile base image."""
try:
# Generate job ID
job_id = str(uuid.uuid4())
# Create job
service.job_manager.create_job(job_id)
# Start background task
background_tasks.add_task(service.update_dockerfile, job_id, request)
return DockerfileUpdateResponse(
job_id=job_id,
status=JobStatus.PENDING,
message="Job created and queued for processing",
timestamp=datetime.now()
)
except Exception as e:
logger.error(f"Failed to create update job: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/jobs/{job_id}", response_model=JobStatusResponse)
async def get_job_status(job_id: str):
"""Get job status by ID."""
job = service.job_manager.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return JobStatusResponse(**asdict(job))
@app.get("/jobs", response_model=List[JobStatusResponse])
async def list_jobs():
"""List all jobs."""
jobs = service.job_manager.list_jobs()
return [JobStatusResponse(**asdict(job)) for job in jobs]
@app.delete("/jobs/{job_id}")
async def delete_job(job_id: str):
"""Delete job by ID."""
if job_id not in service.job_manager.jobs:
raise HTTPException(status_code=404, detail="Job not found")
del service.job_manager.jobs[job_id]
return {"message": "Job deleted successfully"}
The main function is very standard. It starts a uvicorn server on port 8000 that hosts my code.
if __name__ == "__main__":
import uvicorn
# Ensure required environment variables
if not os.environ.get("ANTHROPIC_API_KEY"):
logger.error("ANTHROPIC_API_KEY environment variable is required")
sys.exit(1)
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
This has been a long journey so far for the reader, but for me as a developer it's been very quick. It took me only a few minutes to refactor the code into a service using Claude. I have a robust service now that allows me to get Dockerfiles updated without too much difficulty. I just need to feed in the right paramters and away it goes, and performs the checks on my behalf, does the analysis, and updates the repository.
This is a neat way to keep your Dockerfiles updated and your repos clean!
2025-11-27 16:06:19
Le secteur technologique africain connaît une croissance rapide, avec une génération de développeurs talentueux, créatifs et exposés aux outils les plus modernes. Pourtant, un paradoxe persiste : les pratiques de recrutement restent ancrées dans des approches obsolètes qui ne reflètent ni la réalité du métier ni les exigences de l’innovation.
Dans de nombreux pays africains, et particulièrement au Cameroun, les entreprises recrutent encore les développeurs comme on recruterait des enseignants d’histoire.
Les tests sont centrés sur la mémoire :
Mais un développeur moderne ne travaille plus ainsi.
Les éditeurs de code signalent les erreurs, l’autocomplétion accélère la syntaxe, la documentation est accessible en un clic, et l’essentiel du travail repose sur la capacité à comprendre, analyser, modéliser et résoudre des problèmes.
Évaluer un développeur sur sa mémoire dans un monde où les outils compensent justement la mémoire est une incohérence fondamentale.
Nombre d’entreprises se revendiquent “innovantes”, mais continuent de recruter selon des normes dépassées.
Résultat :
C’est ainsi qu’on retrouve des organisations « tech » qui, paradoxalement, peinent à produire des produits réellement innovants : on ne peut pas bâtir l’avenir avec des processus hérités du passé.
Le développement moderne repose sur trois piliers :
Un bon développeur comprend un problème métier, identifie les contraintes, propose une solution modélisée et cohérente. Cette compétence prime sur la capacité à réciter des méthodes.
Un professionnel performant sait chercher, comparer, documenter, tester et valider rapidement des solutions. La valeur du développeur en 2025 réside dans sa vitesse d’apprentissage, pas dans sa capacité de mémorisation.
L’environnement technologique actuel — GitHub Copilot, ChatGPT, frameworks avancés, IDE intelligents — impose une autre manière de travailler : plus productive, plus collaborative, plus outillée.
Recruter sans évaluer ces dimensions, c’est recruter à l’aveugle.
L’ère de l’IA et de l’automatisation redéfinit le métier de développeur.
Demander de la récitation alors que les outils complètent, corrigent et proposent du code est une perte de temps pour tous.
Ces pratiques archaïques :
L’enjeu est clair : adapter le recrutement aux standards internationaux modernes pour ne pas rester à la traîne.
Pour évaluer efficacement un développeur aujourd’hui, les entreprises devraient privilégier :
Ces approches reflètent la réalité du travail quotidien d’un développeur et permettent d’identifier ceux capables d’apporter une valeur réelle.
Le Cameroun et l’Afrique disposent d’un vivier immense de talents.
Mais tant que les entreprises utiliseront des méthodes de recrutement d’un autre âge, elles continueront à embaucher des profils « scolaires » plutôt que des créateurs d’innovation.
Dans un monde dominé par l’IA, la compétitivité dépend de la capacité à recruter des personnes capables de penser, analyser, collaborer et construire — et non de réciter.
Réinventer le recrutement, c’est la première étape pour réellement innover.