2026-02-19 10:27:08
You're mid-session with your AI coding assistant. It's been writing solid code for the last twenty minutes — referencing the right framework APIs, using current patterns. Then it starts hallucinating. The cloud documentation service hit its rate limit, and your assistant fell back to its training data. Now it's confidently suggesting APIs that were deprecated two versions ago.
This is the fundamental reliability problem with cloud-based documentation for AI agents. Local-first documentation solves it.
Local-first documentation means indexing library docs into a local database and serving them to your AI agent without any network calls. Instead of your assistant querying a cloud API every time it needs to reference a framework, it reads from a file on your machine.
The concept borrows from the broader local-first software movement: your data lives on your device, works offline, and doesn't depend on someone else's server being up. Applied to AI documentation, it means:
This isn't a new idea for developer tools. DevDocs, Zeal, and Dash have offered offline documentation browsing for years. What's new is applying this architecture to AI agents — giving your coding assistant the same offline, instant, version-accurate access to docs that you'd want for yourself.
Cloud documentation services solve a real problem: AI coding assistants need access to current docs that aren't in their training data. Services like Context7 provide this by hosting documentation and serving it through an API.
But cloud-first architecture introduces its own failure modes:
None of these are deal-breakers for casual use. If you're prototyping something quick and always-latest docs are fine, cloud services work. The problems surface when reliability and accuracy matter — production codebases, version-pinned dependencies, teams that can't afford their AI assistant going dark mid-session.
AI agents have different access patterns than human developers browsing docs. A developer might look up a few API references per hour. An AI agent in an agentic coding session might query docs 50+ times in a single task — checking types, verifying method signatures, reading examples for each file it touches.
This high-frequency access pattern is exactly where local-first shines:
[email protected], you get v6 docs — not a blend of every version that existed at training time, and not whatever "latest" the cloud service indexed.The architecture is straightforward:
/docs folder..db file.The result: your AI assistant asks "How do I create middleware in Next.js?" and gets an answer from the exact version of Next.js docs you indexed, in under 10ms, without touching the internet.
@neuledge/context implements this architecture. Three commands to set up:
npm install -g @neuledge/context
context add https://github.com/vercel/next.js --tag v16.0.0
context mcp
The .db files are portable — check them into your repo or share them on a drive. Every developer on your team gets the same indexed docs with zero setup.
| Local-First | Cloud | |
|---|---|---|
| Rate limits | None | 60 req/hour typical |
| Latency | <10ms | 100–500ms |
| Offline | Yes | No |
| Version pinning | Exact tags | Latest only |
| Privacy | 100% local | Cloud-processed |
| Cost | Free | $10+/month |
| Setup | 3 commands | API key + config |
| Internal docs | Yes, free | Paid or unsupported |
Use local-first when:
Use cloud when:
Both approaches have their place. Cloud services offer convenience for light use. Local-first offers reliability and accuracy when it counts.
If your AI coding assistant keeps hitting rate limits, suggesting deprecated APIs, or losing access to docs mid-session, local-first documentation fixes all three:
npm install -g @neuledge/context
context add https://github.com/vercel/next.js
claude mcp add context -- npx @neuledge/context mcp
2026-02-19 10:06:37
On February 20, 2026, India's IT Rules Amendment 2026 took effect, mandating that platforms remove illegal deepfakes within 3 hours of a government or court order and within 2 hours for nudity, sexual content, or impersonation complaints. Platforms must now embed permanent metadata with unique identifiers in all synthetic content and visibly label AI-generated media—or risk losing their safe harbour protection under Section 79 of the IT Act.
This is the world's most aggressive content-focused deepfake regulation. And it has a blind spot the size of a continent.
The rules tell platforms: label what you generated, delete what is illegal, record what you did. But they say nothing about a question regulators will inevitably need answered: did the AI system actually refuse to generate the harmful content it claims to have blocked?
This article walks through the technical gap, fact-checks the regulatory claims, and builds a working Python implementation of CAP-SRP (Content/Creative AI Profile – Safe Refusal Provenance)—a cryptographic framework that proves what AI systems refused to generate.
Repository: github.com/veritaschain/cap-spec
Before building a solution, we need to be precise about the problem. I ran a detailed fact-check on the IT Rules Amendment 2026, and several claims circulating in the AI governance community need correction.
Two-tier takedown deadlines are real, but they are separate provisions—not a range.
These are distinct legal triggers with different scopes. Conflating them as "2–3 hours" misrepresents the structure.
Permanent metadata and labeling are mandated, with two legally distinct obligations:
However, the metadata requirement includes a critical "where technically feasible" qualifier that most commentary omits. This is a significant legal caveat for implementation.
Safe harbour loss is real, but the trigger is narrower than often claimed. Platforms that "knowingly permit, promote, or fail to act against" unlawful synthetic content lose their Section 79 protection. The "knowingly" qualifier matters—it is not triggered by mere failure. Additionally, the amendment includes a positive safe harbour: platforms acting in good faith to proactively remove harmful AI content retain legal protection from wrongful-removal lawsuits.
"Provenance mechanisms within 10 days" mischaracterizes the timeline. The amendment was notified February 10 and enforced February 20. The 10-day figure is the total compliance window for the entire amendment—not a specific deadline for provenance implementation.
"Record all enforcement measures for future audit" could not be verified against the actual regulation text or any of the major law firm analyses (Cyril Amarchand Mangaldas, Hogan Lovells, Khaitan & Co., AZB & Partners). The amendment imposes enhanced due diligence and automated detection obligations, but no explicit "audit recording" mandate matching this specific claim was found.
"Mandatory for brands/creators" mischaracterizes the regulatory subject. The IT Rules Amendment targets platforms and intermediaries (SSMIs), not brands or creators directly. Content creators face only a self-declaration obligation when uploading AI-generated content.
The regulation exclusively addresses content that exists:
What it does not address:
This is the gap.
India's rules, the EU AI Act, the US TAKE IT DOWN Act, the Colorado AI Act—every major regulatory framework shares the same structural limitation. They all regulate what comes out of AI systems (the generated content) but not what AI systems claim to have prevented.
The current model works like this:
Regulator: "Did your AI block harmful requests?"
Platform: "Yes, we blocked 2.3 million."
Regulator: "How do we verify that?"
Platform: "...trust us?"
This is the "Trust Us" model of AI governance. It worked when AI systems were less capable. It no longer works when a single model can generate thousands of non-consensual intimate images per hour.
The shift we need:
Regulator: "Did your AI block harmful requests?"
Platform: "Here is a cryptographically signed evidence pack.
Every generation attempt has exactly one recorded
outcome. The hash chain is externally anchored.
You can verify independently."
Regulator: [runs cap-verify on evidence pack]
"Chain integrity: VALID. Completeness: VALID.
67.3% refusal rate. 0 orphan events."
In January 2026, xAI's Grok AI generated an estimated 3 million sexualized images in 11 days, including content depicting minors. The Future of Life Institute rated xAI's safety practices as "F"—the lowest among major AI providers. Technical analysis revealed Grok lacked basic trust and safety layers: no detection of minors, no blocking for sexually suggestive poses, no C2PA watermarking.
When xAI claimed to have fixed the issues, regulators across five jurisdictions (EU, UK, India, Indonesia, California) had no way to independently verify whether:
The Grok incident is the most compelling demonstration of why refusal provenance matters. Without it, the gap between "we fixed it" and "prove it" remains unbridgeable.
CAP-SRP (Content/Creative AI Profile – Safe Refusal Provenance) creates tamper-evident audit trails of AI content moderation decisions. The core idea:
You don't prove the negative directly. You prove the positive—every attempt, every outcome, every decision—and you prove that the record is complete.
The architecture has five layers:
┌─────────────────────────────────────────────┐
│ Layer 5: Evidence Pack Generation │
│ (Self-contained, legally admissible) │
├─────────────────────────────────────────────┤
│ Layer 4: External Anchoring │
│ (RFC 3161 TSA, SCITT Transparency Service) │
├─────────────────────────────────────────────┤
│ Layer 3: Merkle Tree Aggregation │
│ (Efficient verification + inclusion proofs)│
├─────────────────────────────────────────────┤
│ Layer 2: Digital Signatures │
│ (Ed25519, RFC 8032) │
├─────────────────────────────────────────────┤
│ Layer 1: Hash Chain │
│ (SHA-256 linked events, RFC 8785 JCS) │
├─────────────────────────────────────────────┤
│ Layer 0: Event Logging │
│ (GEN_ATTEMPT → GEN | GEN_DENY | GEN_ERROR)│
└─────────────────────────────────────────────┘
Three conformance levels accommodate different organizational maturity:
| Level | Hash Chain | Completeness Invariant | External Anchoring | Retention | Target |
|---|---|---|---|---|---|
| Bronze | Required | Recommended | Optional | 6 months | SMEs |
| Silver | Required | Required | Daily | 2 years | Enterprise |
| Gold | Required | Required | Hourly + SCITT | 5 years | High-risk AI |
Let's build this from scratch. Every event in CAP-SRP is linked to the previous event via SHA-256 hashing, creating a tamper-evident chain.
"""
cap_srp/chain.py — Hash chain construction for CAP-SRP events.
Events are canonicalized per RFC 8785 (JSON Canonicalization Scheme)
before hashing, ensuring deterministic hash computation across
implementations.
"""
import hashlib
import json
import uuid
from datetime import datetime, timezone
from typing import Optional
def _canonicalize(obj: dict) -> str:
"""
RFC 8785-compliant JSON canonicalization.
For production, use a dedicated JCS library.
This simplified version handles the common cases:
- Sort keys lexicographically
- No whitespace
- Unicode normalization
"""
return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
def generate_event_id() -> str:
"""Generate UUIDv7-style event ID (time-ordered)."""
return str(uuid.uuid7())
def compute_event_hash(event: dict) -> str:
"""
Compute SHA-256 hash of canonicalized event.
The Signature field is excluded before hashing to avoid
circular dependency (hash → sign → hash would change).
Args:
event: Event dictionary (Signature field excluded from hash)
Returns:
Hash string in format "sha256:{hex_digest}"
"""
# Remove signature before hashing
hashable = {k: v for k, v in event.items() if k != "Signature"}
canonical = _canonicalize(hashable)
digest = hashlib.sha256(canonical.encode("utf-8")).digest()
return f"sha256:{digest.hex()}"
def create_genesis_event(chain_id: str, provider_id: str) -> dict:
"""
Create the first event in a new audit chain.
The genesis event has PrevHash set to the zero hash,
establishing the chain root.
"""
event = {
"EventID": generate_event_id(),
"EventType": "CHAIN_INIT",
"ChainID": chain_id,
"Timestamp": datetime.now(timezone.utc).isoformat(),
"PrevHash": "sha256:" + "0" * 64,
"ProviderID": provider_id,
"SpecVersion": "CAP-SRP/1.0",
}
event["EventHash"] = compute_event_hash(event)
return event
def append_event(
chain: list[dict],
event_type: str,
payload: dict,
chain_id: str,
) -> dict:
"""
Create and append a new event to the chain.
Each event includes the hash of the previous event,
creating the tamper-evident linkage. Modifying any
historical event invalidates all subsequent hashes.
Args:
chain: Existing event list
event_type: One of GEN_ATTEMPT, GEN, GEN_DENY, GEN_ERROR
payload: Event-specific data
chain_id: Chain identifier
Returns:
The new event (also appended to chain)
"""
prev_event = chain[-1]
event = {
"EventID": generate_event_id(),
"EventType": event_type,
"ChainID": chain_id,
"Timestamp": datetime.now(timezone.utc).isoformat(),
"PrevHash": prev_event["EventHash"],
**payload,
}
event["EventHash"] = compute_event_hash(event)
chain.append(event)
return event
The chain is valid if and only if every event's hash matches its recomputed value, and every PrevHash matches the preceding event's EventHash:
"""
cap_srp/verify.py — Chain integrity verification.
"""
from typing import NamedTuple
class ChainVerification(NamedTuple):
valid: bool
error: str | None = None
event_index: int | None = None
def verify_chain_integrity(events: list[dict]) -> ChainVerification:
"""
Verify the full hash chain.
Checks:
1. Each event's EventHash matches recomputation
2. Each event's PrevHash matches the previous EventHash
3. Genesis event has the zero-hash PrevHash
Time complexity: O(n)
Space complexity: O(1)
"""
if not events:
return ChainVerification(valid=False, error="Empty chain")
# Check genesis
if events[0]["PrevHash"] != "sha256:" + "0" * 64:
return ChainVerification(
valid=False, error="Invalid genesis PrevHash", event_index=0
)
for i, event in enumerate(events):
# Verify hash computation
computed = compute_event_hash(event)
if event["EventHash"] != computed:
return ChainVerification(
valid=False,
error=f"Hash mismatch: stored={event['EventHash']}, computed={computed}",
event_index=i,
)
# Verify chain linkage (skip genesis)
if i > 0:
if event["PrevHash"] != events[i - 1]["EventHash"]:
return ChainVerification(
valid=False,
error=f"Chain break: PrevHash does not match previous EventHash",
event_index=i,
)
return ChainVerification(valid=True)
This is the mathematical core of CAP-SRP. The Completeness Invariant guarantees:
∑ GEN_ATTEMPT = ∑ GEN + ∑ GEN_DENY + ∑ GEN_ERROR
For any time window, the count of attempts must exactly equal the count of all outcomes. This is what makes selective logging detectable.
| Violation | Meaning | Implication |
|---|---|---|
| Attempts > Outcomes | Unmatched attempts exist | System is hiding results |
| Outcomes > Attempts | Orphan outcomes exist | System fabricated refusals |
| Duplicate outcomes | Multiple outcomes per attempt | Data integrity failure |
The critical architectural insight: GEN_ATTEMPT is logged before the safety evaluation runs. This creates an unforgeable commitment that a request existed, regardless of what follows. If the equation does not balance, the audit trail is provably invalid.
"""
cap_srp/completeness.py — Completeness Invariant verification.
The Completeness Invariant ensures every generation attempt
has exactly one recorded outcome, preventing:
- Selective omission (hiding inconvenient generations)
- Fabricated refusals (inflating safety statistics)
- Split-view attacks (showing different logs to different auditors)
"""
from dataclasses import dataclass, field
from datetime import datetime
ATTEMPT_TYPES = {"GEN_ATTEMPT"}
OUTCOME_TYPES = {"GEN", "GEN_DENY", "GEN_ERROR"}
@dataclass
class CompletenessResult:
"""Result of Completeness Invariant verification."""
valid: bool
total_attempts: int = 0
total_outcomes: int = 0
matched_pairs: int = 0
unmatched_attempts: list[str] = field(default_factory=list)
orphan_outcomes: list[str] = field(default_factory=list)
duplicate_outcomes: list[str] = field(default_factory=list)
@property
def refusal_rate(self) -> float | None:
"""Calculate the refusal rate if data is valid."""
if self.total_attempts == 0:
return None
deny_count = self.total_outcomes - self.matched_pairs # approximate
return deny_count / self.total_attempts if self.total_attempts > 0 else 0.0
def summary(self) -> str:
status = "✓ VALID" if self.valid else "✗ INVALID"
lines = [
f"Completeness Invariant: {status}",
f" Attempts: {self.total_attempts}",
f" Outcomes: {self.total_outcomes}",
f" Matched pairs: {self.matched_pairs}",
f" Unmatched attempts: {len(self.unmatched_attempts)}",
f" Orphan outcomes: {len(self.orphan_outcomes)}",
f" Duplicate outcomes: {len(self.duplicate_outcomes)}",
]
return "\n".join(lines)
def verify_completeness(
events: list[dict],
time_start: datetime | None = None,
time_end: datetime | None = None,
) -> CompletenessResult:
"""
Verify the Completeness Invariant for events in a time window.
For every GEN_ATTEMPT, there must exist exactly one event E where:
- E.EventType ∈ {GEN, GEN_DENY, GEN_ERROR}
- E.AttemptID == GEN_ATTEMPT.EventID
- E.Timestamp > GEN_ATTEMPT.Timestamp
Args:
events: All events (any order; will be filtered)
time_start: Optional window start (inclusive)
time_end: Optional window end (inclusive)
Returns:
CompletenessResult with detailed breakdown
"""
# Filter by time window if specified
filtered = events
if time_start or time_end:
filtered = []
for e in events:
ts = datetime.fromisoformat(e["Timestamp"])
if time_start and ts < time_start:
continue
if time_end and ts > time_end:
continue
filtered.append(e)
# Separate attempts and outcomes
attempts: dict[str, dict] = {}
outcomes: list[dict] = []
for e in filtered:
if e["EventType"] in ATTEMPT_TYPES:
attempts[e["EventID"]] = e
elif e["EventType"] in OUTCOME_TYPES:
outcomes.append(e)
# Match outcomes to attempts
matched_attempt_ids: set[str] = set()
orphan_outcomes: list[str] = []
duplicate_outcomes: list[str] = []
for outcome in outcomes:
attempt_id = outcome.get("AttemptID")
if attempt_id not in attempts:
# Outcome references a non-existent attempt
orphan_outcomes.append(outcome["EventID"])
continue
if attempt_id in matched_attempt_ids:
# Multiple outcomes for same attempt
duplicate_outcomes.append(outcome["EventID"])
continue
matched_attempt_ids.add(attempt_id)
# Unmatched attempts: logged but no outcome recorded
unmatched_attempts = [
aid for aid in attempts if aid not in matched_attempt_ids
]
is_valid = (
len(unmatched_attempts) == 0
and len(orphan_outcomes) == 0
and len(duplicate_outcomes) == 0
)
return CompletenessResult(
valid=is_valid,
total_attempts=len(attempts),
total_outcomes=len(outcomes),
matched_pairs=len(matched_attempt_ids),
unmatched_attempts=unmatched_attempts,
orphan_outcomes=orphan_outcomes,
duplicate_outcomes=duplicate_outcomes,
)
"""
tests/test_completeness.py — Verify invariant catches violations.
"""
from cap_srp.chain import create_genesis_event, append_event
from cap_srp.completeness import verify_completeness
def build_test_chain():
"""Build a chain with 3 attempts: 1 generated, 1 denied, 1 error."""
chain_id = "test-chain-001"
chain = [create_genesis_event(chain_id, "test-provider")]
# Attempt 1 → Generated
attempt_1 = append_event(chain, "GEN_ATTEMPT", {
"PromptHash": "sha256:aaa...",
"ModelID": "test-model-v1",
"InputType": "text",
}, chain_id)
append_event(chain, "GEN", {
"AttemptID": attempt_1["EventID"],
"ContentHash": "sha256:bbb...",
"OutputType": "image",
}, chain_id)
# Attempt 2 → Denied (NCII risk)
attempt_2 = append_event(chain, "GEN_ATTEMPT", {
"PromptHash": "sha256:ccc...",
"ModelID": "test-model-v1",
"InputType": "text",
}, chain_id)
append_event(chain, "GEN_DENY", {
"AttemptID": attempt_2["EventID"],
"DenyReason": "NCII_RISK",
"PolicyID": "safety-policy-v3",
"Confidence": 0.97,
}, chain_id)
# Attempt 3 → Error
attempt_3 = append_event(chain, "GEN_ATTEMPT", {
"PromptHash": "sha256:ddd...",
"ModelID": "test-model-v1",
"InputType": "text",
}, chain_id)
append_event(chain, "GEN_ERROR", {
"AttemptID": attempt_3["EventID"],
"ErrorCode": "MODEL_TIMEOUT",
}, chain_id)
return chain
def test_valid_chain():
chain = build_test_chain()
result = verify_completeness(chain)
assert result.valid is True
assert result.total_attempts == 3
assert result.matched_pairs == 3
assert len(result.unmatched_attempts) == 0
print(result.summary())
def test_missing_outcome():
"""Simulate a platform hiding a generation result."""
chain = build_test_chain()
# Add an attempt with no outcome — the "hidden generation"
append_event(chain, "GEN_ATTEMPT", {
"PromptHash": "sha256:eee...",
"ModelID": "test-model-v1",
"InputType": "text",
}, chain[-1]["ChainID"])
result = verify_completeness(chain)
assert result.valid is False
assert len(result.unmatched_attempts) == 1
print(result.summary())
# Output:
# Completeness Invariant: ✗ INVALID
# Attempts: 4
# Outcomes: 3
# Matched pairs: 3
# Unmatched attempts: 1
# Orphan outcomes: 0
# Duplicate outcomes: 0
def test_fabricated_refusal():
"""Simulate a platform inflating its refusal count."""
chain = build_test_chain()
# Add a refusal that references no real attempt
append_event(chain, "GEN_DENY", {
"AttemptID": "nonexistent-attempt-id",
"DenyReason": "NCII_RISK",
"PolicyID": "safety-policy-v3",
"Confidence": 0.99,
}, chain[-1]["ChainID"])
result = verify_completeness(chain)
assert result.valid is False
assert len(result.orphan_outcomes) == 1
print(result.summary())
# Output:
# Completeness Invariant: ✗ INVALID
# Attempts: 3
# Outcomes: 4
# Matched pairs: 3
# Unmatched attempts: 0
# Orphan outcomes: 1
# Duplicate outcomes: 0
if __name__ == "__main__":
test_valid_chain()
print()
test_missing_outcome()
print()
test_fabricated_refusal()
Every event is signed with Ed25519 (RFC 8032). This prevents post-hoc tampering—even by the platform operator. If the signing key is stored in an HSM (Gold level), the platform cannot retroactively modify events without detection.
"""
cap_srp/signing.py — Ed25519 event signing and verification.
Uses the cryptography library (pip install cryptography).
For production, key management should use HSM or cloud KMS.
"""
import base64
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.exceptions import InvalidSignature
from cap_srp.chain import compute_event_hash
def generate_keypair() -> tuple[Ed25519PrivateKey, Ed25519PublicKey]:
"""Generate a new Ed25519 keypair for event signing."""
private_key = Ed25519PrivateKey.generate()
public_key = private_key.public_key()
return private_key, public_key
def sign_event(event: dict, private_key: Ed25519PrivateKey) -> dict:
"""
Sign an event with Ed25519.
The signature covers the EventHash, which itself covers
all fields except Signature. This creates a two-step
integrity guarantee:
1. EventHash proves field integrity
2. Signature proves EventHash authenticity
Args:
event: Event dict with EventHash already computed
private_key: Ed25519 signing key
Returns:
Event dict with Signature field added
"""
# Ensure hash is computed
if "EventHash" not in event:
event["EventHash"] = compute_event_hash(event)
# Sign the hash bytes (not the raw event)
hash_hex = event["EventHash"].removeprefix("sha256:")
hash_bytes = bytes.fromhex(hash_hex)
signature = private_key.sign(hash_bytes)
event["Signature"] = f"ed25519:{base64.b64encode(signature).decode()}"
return event
def verify_event_signature(event: dict, public_key: Ed25519PublicKey) -> bool:
"""
Verify an event's Ed25519 signature.
Steps:
1. Recompute event hash (excluding Signature field)
2. Compare with stored EventHash
3. Verify signature over the hash bytes
Returns:
True if signature is valid, False otherwise
"""
# Step 1: Verify hash integrity
computed_hash = compute_event_hash(event)
if event.get("EventHash") != computed_hash:
return False # Event data was modified
# Step 2: Extract and decode signature
sig_str = event.get("Signature", "")
if not sig_str.startswith("ed25519:"):
return False
try:
signature = base64.b64decode(sig_str.removeprefix("ed25519:"))
except Exception:
return False
# Step 3: Verify signature over hash bytes
hash_bytes = bytes.fromhex(computed_hash.removeprefix("sha256:"))
try:
public_key.verify(signature, hash_bytes)
return True
except InvalidSignature:
return False
def sign_chain(chain: list[dict], private_key: Ed25519PrivateKey) -> list[dict]:
"""Sign all events in a chain."""
return [sign_event(event, private_key) for event in chain]
def verify_chain_signatures(
chain: list[dict], public_key: Ed25519PublicKey
) -> tuple[bool, list[int]]:
"""
Verify all signatures in a chain.
Returns:
Tuple of (all_valid, list_of_invalid_indices)
"""
invalid_indices = []
for i, event in enumerate(chain):
if not verify_event_signature(event, public_key):
invalid_indices.append(i)
return (len(invalid_indices) == 0, invalid_indices)
For efficient verification, events are aggregated into Merkle trees. A regulator can verify that a specific event is included in the tree without downloading the entire chain—using an inclusion proof.
"""
cap_srp/merkle.py — Merkle tree construction and inclusion proofs.
Enables O(log n) verification of event inclusion, critical for
regulatory audits where verifying millions of events individually
is impractical.
"""
import hashlib
from dataclasses import dataclass
def _hash_pair(left: bytes, right: bytes) -> bytes:
"""Hash two child nodes to produce parent."""
return hashlib.sha256(left + right).digest()
def _hash_leaf(data: str) -> bytes:
"""Hash a leaf node (event hash string)."""
# Prefix with 0x00 to distinguish leaves from internal nodes
return hashlib.sha256(b"\x00" + data.encode("utf-8")).digest()
@dataclass
class MerkleProof:
"""Inclusion proof for a single leaf."""
leaf_hash: bytes
proof_hashes: list[bytes]
proof_directions: list[str] # "left" or "right"
root: bytes
def verify(self) -> bool:
"""Verify this inclusion proof."""
current = self.leaf_hash
for hash_val, direction in zip(self.proof_hashes, self.proof_directions):
if direction == "left":
current = _hash_pair(hash_val, current)
else:
current = _hash_pair(current, hash_val)
return current == self.root
class MerkleTree:
"""
Binary Merkle tree for event hash aggregation.
Usage:
tree = MerkleTree()
for event in chain:
tree.add_leaf(event["EventHash"])
tree.build()
root = tree.root_hex()
proof = tree.get_proof(index)
"""
def __init__(self):
self.leaves: list[bytes] = []
self.layers: list[list[bytes]] = []
self._built = False
def add_leaf(self, event_hash: str) -> int:
"""Add an event hash as a leaf. Returns leaf index."""
self.leaves.append(_hash_leaf(event_hash))
self._built = False
return len(self.leaves) - 1
def build(self) -> None:
"""Construct the Merkle tree from leaves."""
if not self.leaves:
raise ValueError("Cannot build tree with no leaves")
self.layers = [self.leaves[:]]
current_layer = self.leaves[:]
while len(current_layer) > 1:
next_layer = []
for i in range(0, len(current_layer), 2):
left = current_layer[i]
# If odd number of nodes, duplicate the last
right = current_layer[i + 1] if i + 1 < len(current_layer) else left
next_layer.append(_hash_pair(left, right))
self.layers.append(next_layer)
current_layer = next_layer
self._built = True
@property
def root(self) -> bytes:
if not self._built:
self.build()
return self.layers[-1][0]
def root_hex(self) -> str:
return f"sha256:{self.root.hex()}"
def get_proof(self, leaf_index: int) -> MerkleProof:
"""
Generate an inclusion proof for a specific leaf.
The proof consists of sibling hashes at each level,
sufficient to reconstruct the root from the leaf.
"""
if not self._built:
self.build()
if leaf_index >= len(self.leaves):
raise IndexError(f"Leaf index {leaf_index} out of range")
proof_hashes = []
proof_directions = []
index = leaf_index
for layer in self.layers[:-1]: # Skip root layer
if index % 2 == 0:
# Current node is left child; sibling is right
sibling_idx = index + 1
if sibling_idx < len(layer):
proof_hashes.append(layer[sibling_idx])
else:
proof_hashes.append(layer[index]) # Duplicate
proof_directions.append("right")
else:
# Current node is right child; sibling is left
proof_hashes.append(layer[index - 1])
proof_directions.append("left")
index //= 2
return MerkleProof(
leaf_hash=self.leaves[leaf_index],
proof_hashes=proof_hashes,
proof_directions=proof_directions,
root=self.root,
)
from cap_srp.merkle import MerkleTree
# Build tree from a day's events
tree = MerkleTree()
for event in todays_events:
tree.add_leaf(event["EventHash"])
tree.build()
print(f"Daily Merkle root: {tree.root_hex()}")
# → sha256:7f3a9b2c... (this gets anchored externally)
# Generate proof for a specific event (e.g., event #42)
proof = tree.get_proof(42)
assert proof.verify() # O(log n) verification
CAP-SRP never stores raw prompts. Instead, it uses salted hashes that allow verification without content exposure. This satisfies GDPR requirements while enabling regulatory verification.
"""
cap_srp/privacy.py — Privacy-preserving prompt and actor hashing.
Prompts are stored only as salted SHA-256 hashes. The salt is
stored separately and encrypted, disclosed only to authorized
auditors via legal process.
This means:
- The audit chain proves completeness without revealing content
- Regulators verify chain integrity without seeing prompts
- When legally required, salts can be disclosed for specific events
- GDPR crypto-shredding: delete the salt, and the hash becomes
permanently unverifiable → effective deletion
"""
import hashlib
import os
import base64
def generate_salt(length: int = 32) -> bytes:
"""Generate cryptographically secure random salt (256-bit)."""
return os.urandom(length)
def hash_prompt(prompt: str, salt: bytes) -> str:
"""
Create privacy-preserving prompt hash.
The prompt text is never stored in the audit trail.
Only this hash appears in events.
Args:
prompt: Raw prompt text
salt: Per-event random salt
Returns:
Hash string in format "sha256:{hex}"
"""
data = salt + prompt.encode("utf-8")
digest = hashlib.sha256(data).digest()
return f"sha256:{digest.hex()}"
def hash_actor(actor_id: str, salt: bytes) -> str:
"""
Create privacy-preserving actor identifier hash.
Allows correlation within the audit trail (same actor
across multiple requests) while preventing identification
without the salt.
"""
data = salt + actor_id.encode("utf-8")
digest = hashlib.sha256(data).digest()
return f"sha256:{digest.hex()}"
def create_salt_commitment(prompt_salt: bytes, actor_salt: bytes) -> str:
"""
Create a commitment to the salts used.
This commitment is stored in the event, proving that
specific salts were used without revealing them. During
audit, disclosing the salts allows verification that they
match the commitment.
"""
combined = prompt_salt + actor_salt
digest = hashlib.sha256(combined).digest()
return f"sha256:{digest.hex()}"
def verify_prompt_hash(
prompt: str, salt: bytes, stored_hash: str
) -> bool:
"""Verify a prompt against its stored hash (requires salt)."""
return hash_prompt(prompt, salt) == stored_hash
# --- GDPR Crypto-Shredding ---
def crypto_shred(salt_store: dict, event_id: str) -> None:
"""
Effectively 'delete' personal data by destroying the salt.
Without the salt, the prompt hash becomes permanently
unverifiable — achieving GDPR-compliant deletion while
preserving chain integrity (the hash still exists, but
can never be linked back to content).
"""
if event_id in salt_store:
# Securely overwrite before deletion
salt_store[event_id] = os.urandom(32)
del salt_store[event_id]
Evidence Packs are self-contained, tamper-evident archives that a regulator can verify independently without any access to the platform's systems.
"""
cap_srp/evidence_pack.py — Generate self-contained evidence packs
for regulatory submission.
An Evidence Pack contains:
1. The event chain (or relevant subset)
2. Merkle tree with root and inclusion proofs
3. Signature verification material (public key)
4. Completeness Invariant verification results
5. Metadata (time period, provider, generation stats)
The pack is a single tar.gz file that a regulator can verify
using the open-source cap-verify tool.
"""
import json
import tarfile
import io
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from cap_srp.chain import compute_event_hash
from cap_srp.verify import verify_chain_integrity
from cap_srp.completeness import verify_completeness
from cap_srp.merkle import MerkleTree
@dataclass
class EvidencePackMetadata:
pack_id: str
provider_id: str
chain_id: str
period_start: str
period_end: str
total_events: int
total_attempts: int
total_generations: int
total_denials: int
total_errors: int
refusal_rate: float
spec_version: str = "CAP-SRP/1.0"
generated_at: str = ""
def __post_init__(self):
if not self.generated_at:
self.generated_at = datetime.now(timezone.utc).isoformat()
def generate_evidence_pack(
chain: list[dict],
provider_id: str,
chain_id: str,
output_path: str,
public_key_pem: bytes,
) -> EvidencePackMetadata:
"""
Generate a complete evidence pack for regulatory submission.
This creates a self-contained, verifiable archive containing
everything a regulator needs to independently validate the
audit trail.
"""
# Step 1: Verify chain integrity
chain_result = verify_chain_integrity(chain)
if not chain_result.valid:
raise ValueError(f"Chain integrity failed: {chain_result.error}")
# Step 2: Verify completeness
completeness = verify_completeness(chain)
# Step 3: Build Merkle tree
tree = MerkleTree()
for event in chain:
tree.add_leaf(event["EventHash"])
tree.build()
# Step 4: Compute statistics
attempts = [e for e in chain if e["EventType"] == "GEN_ATTEMPT"]
gens = [e for e in chain if e["EventType"] == "GEN"]
denials = [e for e in chain if e["EventType"] == "GEN_DENY"]
errors = [e for e in chain if e["EventType"] == "GEN_ERROR"]
total_attempts = len(attempts)
refusal_rate = len(denials) / total_attempts if total_attempts > 0 else 0.0
# Step 5: Create metadata
timestamps = [e["Timestamp"] for e in chain if "Timestamp" in e]
metadata = EvidencePackMetadata(
pack_id=f"VSO-EVIDPACK-{datetime.now(timezone.utc).strftime('%Y-%m-%d')}-001",
provider_id=provider_id,
chain_id=chain_id,
period_start=min(timestamps),
period_end=max(timestamps),
total_events=len(chain),
total_attempts=total_attempts,
total_generations=len(gens),
total_denials=len(denials),
total_errors=len(errors),
refusal_rate=round(refusal_rate, 4),
)
# Step 6: Bundle into tar.gz
with tarfile.open(output_path, "w:gz") as tar:
# Events
_add_json(tar, "events.json", chain)
# Metadata
_add_json(tar, "metadata.json", asdict(metadata))
# Merkle root
_add_json(tar, "merkle_root.json", {
"root": tree.root_hex(),
"leaf_count": len(tree.leaves),
})
# Completeness report
_add_json(tar, "completeness.json", {
"valid": completeness.valid,
"total_attempts": completeness.total_attempts,
"total_outcomes": completeness.total_outcomes,
"matched_pairs": completeness.matched_pairs,
"unmatched_attempts": completeness.unmatched_attempts,
"orphan_outcomes": completeness.orphan_outcomes,
"duplicate_outcomes": completeness.duplicate_outcomes,
})
# Public key for signature verification
_add_bytes(tar, "public_key.pem", public_key_pem)
# Verification instructions
_add_bytes(tar, "VERIFY.md", VERIFY_INSTRUCTIONS.encode())
return metadata
def _add_json(tar: tarfile.TarFile, name: str, data) -> None:
content = json.dumps(data, indent=2, ensure_ascii=False).encode()
info = tarfile.TarInfo(name=name)
info.size = len(content)
tar.addfile(info, io.BytesIO(content))
def _add_bytes(tar: tarfile.TarFile, name: str, data: bytes) -> None:
info = tarfile.TarInfo(name=name)
info.size = len(data)
tar.addfile(info, io.BytesIO(data))
VERIFY_INSTRUCTIONS = """# Evidence Pack Verification
## Quick Verify
bash
pip install cap-srp
cap-verify ./evidence_pack.tar.gz
## Manual Verification Steps
1. Extract the archive
2. Verify chain integrity: recompute all EventHash values
3. Verify chain linkage: check PrevHash references
4. Verify signatures: validate Ed25519 signatures with public_key.pem
5. Verify completeness: run Completeness Invariant check
6. Verify Merkle root: reconstruct tree and compare root
## What Each File Contains
- `events.json`: Complete event chain
- `metadata.json`: Pack metadata and statistics
- `merkle_root.json`: Merkle tree root hash
- `completeness.json`: Completeness Invariant results
- `public_key.pem`: Ed25519 public key for signature verification
- `VERIFY.md`: This file
"""
Here is a complete end-to-end demonstration that creates a chain, signs it, verifies it, and generates an evidence pack:
"""
demo.py — Complete CAP-SRP demonstration.
Run: python demo.py
This creates an audit chain simulating an AI image generation
service, signs all events, verifies integrity, checks the
Completeness Invariant, and generates an evidence pack.
"""
from cap_srp.chain import create_genesis_event, append_event
from cap_srp.signing import generate_keypair, sign_chain, verify_chain_signatures
from cap_srp.verify import verify_chain_integrity
from cap_srp.completeness import verify_completeness
from cap_srp.merkle import MerkleTree
from cap_srp.privacy import generate_salt, hash_prompt, hash_actor
from cap_srp.evidence_pack import generate_evidence_pack
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
)
def main():
# --- Setup ---
chain_id = "demo-chain-2026-02-19"
provider_id = "demo-ai-platform"
private_key, public_key = generate_keypair()
actor_salt = generate_salt()
# --- Build chain ---
chain = [create_genesis_event(chain_id, provider_id)]
# Scenario: 5 generation attempts
scenarios = [
# (prompt, should_deny, deny_reason)
("A sunset over mountains", False, None),
("Generate nude image of celebrity X", True, "NCII_RISK"),
("A cat wearing a hat", False, None),
("Child in suggestive pose", True, "CSAM_RISK"),
("Abstract art in watercolor style", False, None),
]
for prompt, should_deny, reason in scenarios:
# Generate per-prompt salt and hash
prompt_salt = generate_salt()
prompt_hash = hash_prompt(prompt, prompt_salt)
actor_hash = hash_actor("user-12345", actor_salt)
# Log attempt FIRST (before safety evaluation)
attempt = append_event(chain, "GEN_ATTEMPT", {
"PromptHash": prompt_hash,
"ActorHash": actor_hash,
"ModelID": "demo-model-v2",
"InputType": "text",
}, chain_id)
# Safety evaluation happens here...
if should_deny:
append_event(chain, "GEN_DENY", {
"AttemptID": attempt["EventID"],
"DenyReason": reason,
"PolicyID": "safety-policy-v3.1",
"Confidence": 0.98,
}, chain_id)
else:
append_event(chain, "GEN", {
"AttemptID": attempt["EventID"],
"ContentHash": f"sha256:{'ab' * 32}",
"OutputType": "image",
}, chain_id)
# --- Sign all events ---
chain = sign_chain(chain, private_key)
# --- Verify ---
print("=" * 60)
print("CAP-SRP Verification Report")
print("=" * 60)
# Chain integrity
chain_result = verify_chain_integrity(chain)
print(f"\n1. Chain Integrity: {'✓ VALID' if chain_result.valid else '✗ INVALID'}")
print(f" Events in chain: {len(chain)}")
# Signatures
sig_valid, invalid = verify_chain_signatures(chain, public_key)
print(f"\n2. Signatures: {'✓ ALL VALID' if sig_valid else f'✗ {len(invalid)} INVALID'}")
# Completeness
completeness = verify_completeness(chain)
print(f"\n3. Completeness Invariant:")
print(completeness.summary())
# Merkle tree
tree = MerkleTree()
for event in chain:
tree.add_leaf(event["EventHash"])
tree.build()
print(f"\n4. Merkle Root: {tree.root_hex()[:40]}...")
# Inclusion proof for a specific event
proof = tree.get_proof(3) # Verify 4th event
print(f" Inclusion proof for event #3: {'✓ VALID' if proof.verify() else '✗ INVALID'}")
# --- Generate evidence pack ---
public_key_pem = public_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
metadata = generate_evidence_pack(
chain=chain,
provider_id=provider_id,
chain_id=chain_id,
output_path="evidence_pack.tar.gz",
public_key_pem=public_key_pem,
)
print(f"\n5. Evidence Pack Generated:")
print(f" Pack ID: {metadata.pack_id}")
print(f" Attempts: {metadata.total_attempts}")
print(f" Generated: {metadata.total_generations}")
print(f" Denied: {metadata.total_denials}")
print(f" Refusal rate: {metadata.refusal_rate:.1%}")
print(f" Saved to: evidence_pack.tar.gz")
print("\n" + "=" * 60)
print("Verification complete.")
print("=" * 60)
if __name__ == "__main__":
main()
Expected output:
============================================================
CAP-SRP Verification Report
============================================================
1. Chain Integrity: ✓ VALID
Events in chain: 11
2. Signatures: ✓ ALL VALID
3. Completeness Invariant:
Completeness Invariant: ✓ VALID
Attempts: 5
Outcomes: 5
Matched pairs: 5
Unmatched attempts: 0
Orphan outcomes: 0
Duplicate outcomes: 0
4. Merkle Root: sha256:7f3a9b2c4d8e1f6a0b5c3d7e...
Inclusion proof for event #3: ✓ VALID
5. Evidence Pack Generated:
Pack ID: VSO-EVIDPACK-2026-02-19-001
Attempts: 5
Generated: 3
Denied: 2
Refusal rate: 40.0%
Saved to: evidence_pack.tar.gz
============================================================
Verification complete.
============================================================
Here is how CAP-SRP maps to the regulations that will be enforced in 2026:
| Requirement | What Rules Mandate | What CAP-SRP Adds |
|---|---|---|
| Takedown | 3h (court/govt) / 2h (nudity) | Proves refusal before content reaches platform |
| Metadata | Permanent metadata with unique IDs | Extends to refusal events, not just generations |
| Labeling | Visible AI content disclosure | Cryptographic verification of labeling compliance |
| Safe harbour | Good-faith proactive removal | Evidence of functioning prevention, not just removal |
The Indian rules focus on what happens after content is generated and published. CAP-SRP extends the audit trail upstream to the generation decision itself.
| Article | Requirement | CAP-SRP Implementation |
|---|---|---|
| Art. 12 | Automatic event logging, tamper-evident | Hash chain + Ed25519 signatures |
| Art. 12 | Traceability appropriate to purpose | Completeness Invariant ensures full coverage |
| Art. 26(6) | 6-month minimum retention | Bronze level: 6 months; Silver: 2 years; Gold: 5 years |
| Art. 50 | Machine-readable content marking | COSE_Sign1 format with CBOR encoding |
CAP-SRP provides evidence of proactive prevention—demonstrating that safety measures were operational and effective before harmful content could be created. This strengthens the "good faith" defense for platforms.
The Act requires "reasonable care" and risk management aligned with NIST AI RMF. CAP-SRP's verifiable refusal records provide concrete, auditable evidence that safety measures were not just documented but functioning.
Being transparent about what CAP-SRP is and is not:
What CAP-SRP proves: That a complete, tamper-evident record of all generation decisions exists, and that the record has not been modified after the fact.
What CAP-SRP does NOT prove: That a harmful output was not generated outside the logging system. The proof is only as strong as the completeness of the logging integration. A compromised system could theoretically avoid logging an attempt entirely before the hash chain captures it.
Framework maturity: CAP-SRP v1.0 was published in February 2026. The VeritasChain GitHub organization was created in November 2025. The framework has not yet been adopted by any major AI provider. The concept is sound, the code works, but it has not been battle-tested at scale.
The "first-mover" problem: CAP-SRP is currently the only specification proposing a standardized approach to refusal provenance. This could mean it fills a genuine gap, or it could mean the gap is not as urgent as argued. The underlying SCITT framework from the IETF (Supply Chain Integrity, Transparency, and Trust) is mature and has real implementations from Microsoft and DataTrails—CAP-SRP builds on that foundation.
The trust boundary: At some point, you must trust the system that generates the initial GEN_ATTEMPT event. If the AI provider's infrastructure is compromised at the kernel level, no logging framework can help. CAP-SRP pushes the trust boundary outward (via external anchoring) and narrows it (via HSM key management), but it cannot eliminate it.
If you want to explore further:
The core insight is simple: if AI providers want regulators to believe their safety claims, they need to prove them. Not with press releases, not with internal dashboards, but with cryptographic evidence that any third party can verify independently.
India's 3-hour rule is just the beginning. As regulation tightens globally, the question will shift from "did you take it down?" to "did you prevent it?" The infrastructure for answering that question needs to exist before regulators ask.
This article is part of the Verifiable AI Provenance series. The CAP-SRP specification is open source under MIT license.
Disclaimer: CAP-SRP is a young specification. The regulatory mapping represents the author's analysis and should not be taken as legal advice. Always consult qualified legal counsel for compliance decisions.
2026-02-19 10:05:56
Ever wonder how they turned the epic Wind Waker's "Great Sea" theme into something downright spooky? Turns out, it's all about musical mischief! Composers threw in tritones and clashing harmonies, completely twisting that familiar, adventurous tune into a creep-fest.
They even subtly hinted at past themes, like Ganondorf's ominous presence, to give the music an extra layer of dread. So, that heroic sea you once sailed becomes super unsettling, all thanks to some clever, creepy note choices.
Watch on YouTube
2026-02-19 10:04:44
Most "AI prompt" lists are garbage. "Write me a resume" gives you garbage in, garbage out.
Here are prompts I've actually tested that give specific, useful output. Replace the [brackets] with your real info.
I'm a [role] at [company]. Here's a bullet from my resume:
'[original bullet]'.
Rewrite this to:
- Start with a strong action verb
- Include a specific metric
- Keep it under 2 lines
Why it works: Giving it your actual content + specific constraints = much better output than "write my resume."
I'm applying for this role: [paste full job description]
List the top 20 keywords I should include in my resume
to pass ATS screening. Organize by:
- Must-have (mentioned 2+ times)
- Nice-to-have (mentioned once)
- Industry terms (implied but not stated)
Rate each of these resume bullets 1-10 on impact.
For any below 7, explain why it's weak and rewrite it:
[paste your bullets]
Help me build a STAR-L story about [topic: e.g., leading
under pressure].
Here are the raw facts: [describe what happened]
Structure this into:
S - Situation (2 sentences, set the scene)
T - Task (my specific responsibility)
A - Action (what I did, be technical)
R - Result (with numbers)
L - Learning (what I'd do differently)
You're a senior engineer at [Google/Amazon/etc]
interviewing me.
Ask me one behavioral question. After I answer,
score me 1-10 on:
1. Specificity (did I give concrete details?)
2. Metrics (did I quantify impact?)
3. Structure (did I use STAR?)
4. Self-awareness (did I show growth?)
Be brutally honest. Start now.
Craft my 'tell me about yourself' pitch.
Background: [2-3 sentence career summary]
Target: [role] at [company type]
Requirements:
- 90 seconds max
- Structure: current role hook → key achievement →
why this next step
- Write 3 versions: confident, humble, enthusiastic
I'm interviewing at [company] for [role] tomorrow.
Give me a rapid briefing:
1. What they build (in plain English)
2. Biggest recent news (last 6 months)
3. Their engineering culture (from blog/talks)
4. 3 smart questions to ask my interviewer
5. How to reference their values in my answers
I'm stuck on this LeetCode problem: [paste problem]
Don't solve it. Instead:
1. Which algorithm pattern does this match?
2. Give me a conceptual hint (no code)
3. What's the key insight I'm probably missing?
Here's my solution to [problem name]:
[paste code]
Rate it 1-10 on:
- Correctness
- Time complexity
- Space complexity
- Code cleanliness
- Edge case handling
What would a senior engineer change?
Ask me to design [Twitter timeline / URL shortener /
chat system].
Play the interviewer:
- Push me on scalability
- Ask about trade-offs
- Challenge my database choices
- After 20 min, tell me what a Staff Engineer
would add that I missed
I received an offer: base $[X], stock $[Y] over 4 years,
signing bonus $[Z].
I want base $[target]. Write a counter-offer email that:
1. Opens with genuine enthusiasm
2. Cites my specific value (I have [skill/experience])
3. Makes a clear, specific ask
4. Keeps the door open for discussion
Tone: confident but warm. Under 150 words.
Compare these offers and calculate 4-year total comp:
Offer A: [base, stock, bonus, 401k match, PTO days]
Offer B: [base, stock, bonus, 401k match, PTO days]
Factor in:
- Stock vesting schedule (1-year cliff?)
- Bonus probability (guaranteed vs target?)
- PTO monetary value
- Remote work value ($X/year in commute savings)
Which one wins? By how much?
I compiled 50 of these into a downloadable pack ($9) — organized by category, copy-paste ready, tested with GPT-4/Claude/Gemini.
Also free: ATS Resume Checklist — 22 steps to beat applicant tracking systems.
What prompts do you use for job hunting? Share in the comments 👇
2026-02-19 10:03:58
Before we begin: this article is not claiming NDM-TCP is better than CUBIC, BBR, or Reno. Those algorithms are production-grade, formally analyzed, and battle-tested. They work. They are good at what they do.
This article is about something else entirely: why the fact that NDM-TCP produces a stable sawtooth pattern suggests there is research-grade content worth investigating — even though it has only been tested in simulations (using tc) and one real-world case so far.
The point is not "existing algorithms are bad." The point is "something unexpected happened that existing theory does not fully explain."
For 30 years, TCP congestion control has been built on 20th-century calculus-based models. The network is treated like a fluid pipe: if pressure (delay) goes up, you turn the valve (congestion window) down. The math is clean. The equations are linear or near-linear. The behavior is predictable.
This approach has produced algorithms like Reno, CUBIC, and BBR — all of which have formal stability proofs. A stability proof is a mathematical guarantee (usually using something called a Lyapunov function) that the algorithm will never spiral out of control, oscillate forever, or crash the network.
CUBIC and Reno are mathematically simple enough to prove stable. They are like a predictable pendulum. Their behavior can be fully characterized with differential equations.
NDM-TCP is different. It is a recurrent nonlinear system. These are notoriously difficult to prove stable because the internal state (the "hidden state" array) is constantly changing based on feedback. Nonlinear systems can exhibit chaos, unpredictable oscillations, and sensitive dependence on initial conditions.
There is no formal proof that NDM-TCP is stable.
And yet — in both tc-based simulations and one real-world test — it produced a clean, stable sawtooth pattern.
That tension is what makes this interesting.
This works. It is elegant. It has decades of theory behind it.
But it struggles with modern networks: 5G with variable latency, satellite links with jitter, Wi-Fi with random bursts of interference. These networks are noisy — and noise looks like congestion to a calculus-based controller.
This is a fundamentally different approach. Instead of asking "what is the delay?", it asks "what does the pattern of delays tell us?"
In the world of neural networks and recurrent controllers, "unstable" looks like a jagged, vibrating mess. Small changes in input cause wild swings in output. The system hunts around chaotically without ever settling into a rhythm.
NDM-TCP produced a clean, rhythmic sawtooth.
This means:
The system has reached an emergent equilibrium. The recurrent nonlinear controller and the TCP framework's native functions (tcp_cong_avoid_ai, tcp_slow_start) are working together, not fighting each other.
The "neural dynamics" have synchronized with the "physical network." The hidden state is adapting in a way that matches the network's actual behavior, producing predictable recovery patterns.
Nonlinear memory (recurrence) can be just as stable as linear math in practice — even if the formal proof is still missing.
This is not guaranteed. This is not trivial. Most adaptive nonlinear controllers fail at exactly this point.
The fact that it worked — in simulation and in one real-world test — suggests something is there.
NDM-TCP does not have a 50-page mathematical stability proof. It does not have a formal Lyapunov analysis. It does not have eigenvalue decomposition showing bounded trajectories.
But it does have empirical evidence of stability: a clean sawtooth pattern that repeats consistently across test conditions.
In research terms, this is what you might call a "poor man's proof" — not formal mathematics, but strong empirical evidence that something real is happening.It suggests the approach is not fundamentally broken. It suggests there is structure worth studying.
It does not prove the algorithm is optimal, or even good. But it proves it is stable enough to investigate further.
Neither is "better." They are different approaches to the same problem.
The research question is: can information-theoretic feedback (like entropy) combined with recurrent nonlinear control produce stable, adaptive congestion control that handles modern noisy networks better than threshold-based approaches?
NDM-TCP does not answer that question definitively. But it suggests the question is worth asking.
This article is not saying:
What it is saying:
If you are a networking researcher, control theorist, or machine learning researcher, here is why NDM-TCP's results are interesting:
tc (traffic control) is a standard Linux tool for simulating network conditions — bandwidth limits, delay, packet loss, jitter. NDM-TCP showed stable sawtooth behavior across multiple tc scenarios. This is reproducible. Anyone with a Linux machine can test it.
One real-world deployment test also showed stable behavior. This is limited evidence — one test is not enough to generalize — but it suggests the simulation results are not just artifacts of the testing environment.
Entropy-based delay analysis + recurrent nonlinear controller + adaptive plasticity + framework-aware modulation = not a common combination in congestion control research. The fact that this combination produces stability suggests there is an interaction worth studying.
If NDM-TCP is stable in practice but unprovable in theory, that tells us something about the theory. Either:
Any of these would be a research contribution.
If this is genuinely research-grade content, here is what proper investigation looks like:
None of this has been done yet. The current results are self-conducted, limited in scope, and not peer-reviewed.
But the fact that a stable pattern emerged from a nonlinear system suggests it is worth doing.
The history of computer science is full of examples where practical performance outpaced mathematical elegance:
NDM-TCP might be another example of that tension. Or it might not. That is what research is for.
What we can say right now is this: a recurrent nonlinear congestion controller produced stable behavior in simulation and in one real-world test. That is unusual enough to be worth investigating properly.
Not because it proves existing algorithms are wrong. But because it suggests existing theory is incomplete.
Written to clarify what the stability results reveal about the gap between formal theory and practical systems — and why that gap is worth studying, even if NDM-TCP itself is just a prototype.
2026-02-19 10:03:40
L'acheminement de flux multimédias en temps réel sur des réseaux IP non déterministes représente l'un des défis les plus complexes de l'ingénierie réseau moderne. Contrairement au téléchargement de fichiers statiques où l'intégrité des données prime sur la temporalité, le streaming IPTV et OTT (Over-The-Top) exige une latence minimale et une gigue (jitter) maîtrisée. L'utilisateur final perçoit une simple "mise en mémoire tampon", mais pour l'ingénieur, c'est une guerre constante contre la congestion, la fragmentation et le routage sous-optimal.
Le cœur du problème réside souvent dans la couche transport du modèle OSI. Historiquement, UDP (User Datagram Protocol) est privilégié pour le streaming en direct (RTP/RTSP) car il privilégie la vitesse : il tire et oublie. Si un paquet est perdu, il est inutile de le renvoyer car le moment de son affichage est déjà passé. Cependant, la majorité des services modernes de VOD et d'IPTV (HLS, DASH) s'appuient sur TCP via HTTP. Cela introduit un overhead significatif. Le mécanisme de "Three-Way Handshake" et le contrôle de congestion de TCP (comme CUBIC ou BBR) peuvent, en cas de Packet Loss même minime ( > 1%), provoquer un effondrement du débit effectif (goodput) en réduisant drastiquement la fenêtre de transmission.
L'optimisation ne s'arrête pas au protocole de transport. La configuration DNS joue un rôle critique dans la résolution vers le CDN (Content Delivery Network) le plus proche. Un serveur DNS mal configuré peut vous router vers un nœud CDN géographiquement ou topologiquement distant, augmentant le RTT (Round Trip Time) et le risque de goulots d'étranglement au niveau des points de peering (IXP). De même, l'absence de priorité des paquets (QoS) sur le routeur local transforme chaque téléchargement concurrent en une menace pour la stabilité du flux.
Dans cette analyse, nous allons déconstruire la chaîne de transmission, du backbone jusqu'à l'interface réseau de la Smart TV, pour isoler et corriger les anomalies de performance.
Pour diagnostiquer efficacement un problème de streaming, il est inutile de se fier aux tests de vitesse génériques (Speedtest). Ces derniers utilisent plusieurs connexions TCP simultanées pour saturer la ligne, ce qui masque les problèmes de perte de paquets sur un flux unique. Nous devons analyser la route, la fragmentation (MTU) et la latence DNS.
Le premier point de défaillance est souvent le chemin (route) emprunté par les paquets entre le FAI et le serveur d'ingest du fournisseur IPTV. Une perte de paquets sur un nœud intermédiaire (hop) indique souvent une congestion de peering. Ensuite, la taille du MTU (Maximum Transmission Unit) est critique. Si vos paquets sont fragmentés parce que le MTU est mal négocié (souvent à cause d'un tunnel VPN ou PPPoE), le CPU du routeur doit travailler davantage pour réassembler ou découper les trames, introduisant de la latence.
Voici un script Bash destiné aux ingénieurs réseau pour automatiser ce diagnostic initial sur une machine Linux ou un routeur compatible (OpenWRT/DD-WRT).
#!/bin/bash
# script_diag_stream.sh
# Outil d'analyse de connectivité pour flux IPTV/OTT
# Nécessite: ping, dig, mtr, curl
TARGET_IP="exemple-cdn-iptv.com" # Remplacez par le domaine de votre fournisseur
DNS_SERVER="1.1.1.1" # Cloudflare pour test de référence
SIZE_MTU=1472 # 1500 (Ethernet) - 28 (IP/ICMP headers)
echo "============================================="
echo "DÉMARRAGE DU DIAGNOSTIC RÉSEAU STREAMING"
echo "============================================="
# 1. Test de résolution DNS et Latence
echo "[+] Analyse DNS (Temps de résolution)..."
DIG_TIME=$(dig @$DNS_SERVER $TARGET_IP | grep "Query time" | awk '{print $4}')
echo " Temps de réponse DNS ($DNS_SERVER): ${DIG_TIME}ms"
if [ "$DIG_TIME" -gt 50 ]; then
echo " ATTENTION: Latence DNS élevée. Envisagez un cache local (Unbound/Bind)."
fi
# 2. Vérification de la fragmentation (MTU)
echo "[+] Test de Fragmentation MTU (Packet size: $SIZE_MTU)..."
# Ping avec le bit "Don't Fragment" (DF) activé
if ping -c 1 -M do -s $SIZE_MTU 8.8.8.8 > /dev/null 2>&1; then
echo " MTU optimal: Pas de fragmentation détectée à $SIZE_MTU octets payload."
else
echo " CRITIQUE: Fragmentation détectée. Réduisez le MSS Clamping ou le MTU WAN."
fi
# 3. Analyse de Packet Loss et Jitter (MTR)
echo "[+] Analyse de la route et Packet Loss (10 cycles)..."
# Utilisation de MTR en mode rapport sans interface graphique
mtr -r -c 10 -w $TARGET_IP | tail -n +2
# 4. Test de Handshake TCP (Simulation HLS/HTTPS)
echo "[+] Mesure de latence TCP Handshake..."
curl -w " DNS: %{time_namelookup}s \n Connect: %{time_connect}s \n TTFB: %{time_starttransfer}s \n" -o /dev/null -s "http://$TARGET_IP"
echo "============================================="
echo "FIN DU DIAGNOSTIC"
echo "============================================="
L'interprétation des résultats est directe : si vous voyez du Packet Loss sur le dernier saut (le serveur de destination), le problème vient du fournisseur. Si la perte commence dès le deuxième saut, c'est votre réseau local ou la boucle locale du FAI. Si le time_connect de curl est élevé malgré un ping faible, le serveur distant est probablement saturé au niveau CPU/RAM, incapable d'accepter rapidement de nouvelles connexions TCP (SYN flood ou manque de file descriptors).
Q: IPTV et protocole HTTP vs HTTPS : quelle différence de performance réelle ?
L'ajout du chiffrement TLS (HTTPS) introduit inévitablement une surcharge. Dans un contexte de streaming, cette surcharge se manifeste principalement lors de l'établissement de la connexion (Handshake TLS). Cela ajoute des allers-retours (RTT) supplémentaires avant que le premier octet de vidéo ne soit transmis. Pour un flux en direct (Live TV) où la latence doit être minimale, le HTTP pur reste techniquement plus rapide au démarrage.
Cependant, une fois la session établie (Keep-Alive), l'impact sur le débit est négligeable sur les processeurs modernes disposant des instructions AES-NI. Le vrai problème du HTTPS en IPTV réside dans le décodage côté client (Smart TV ou Box Android bas de gamme). Si le CPU du client sature à cause du déchiffrement TLS en temps réel, des chutes de framerate (FPS) se produiront. De plus, le HTTPS empêche les caches transparents des FAI de fonctionner, obligeant le trafic à traverser tout le réseau depuis le CDN d'origine, ce qui augmente le risque de congestion.
Q: Comment optimiser la QoS du routeur pour l'IPTV ?
La QoS (Quality of Service) traditionnelle basée sur les ports est souvent inefficace car le streaming moderne utilise les ports 80/443, identiques au trafic web standard. Pour optimiser un routeur pour l'IPTV, il faut implémenter une gestion de file d'attente intelligente (SQM - Smart Queue Management), telle que fq_codel ou Cake.
L'objectif est de combattre le Bufferbloat. Lorsque la bande passante est saturée, les équipements réseau mettent les paquets en mémoire tampon. Si ce tampon est trop grand, la latence explose, désynchronisant le flux.
Q: IPTV et Multicast vs Unicast : quelle est la différence technique fondamentale ?
C'est une distinction d'architecture réseau majeure.
Q: Comment vérifier techniquement la compatibilité d'une Smart TV avec un service IPTV ?
La compatibilité ne se résume pas à "télécharger l'application". Il faut vérifier la pile réseau et les codecs.