Reference Implementation
The Arbiter: Meta Framework
Complete Python implementation of the Orchestrator with Execution, Voting and Anomaly Detection logic
System Overview
Parallel Execution
ThreadPoolExecutor manages concurrent compilation and execution across all language runtimes.
Consensus Voting
Counter-based majority voting algorithm determines ground truth from implementation outputs.
Anomaly Detection
Automatic identification of dissenting implementations with remediation prompt generation.
Complete Arbiter Implementation
arbiter.py
Python 3.10+#!/usr/bin/env python3
"""
The Arbiter: Parallax Protocol Orchestrator
============================================
A Meta Framework for Consensus-Driven Polyglot Development.
This implementation demonstrates the core concepts of N-Version Programming
with LLM-generated code and majority voting consensus.
Author: Parallax Protocol Research
Licence: Public Domain (Defensive Publication)
"""
import subprocess
import concurrent.futures
from collections import Counter
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from enum import Enum
import json
import time
class ConsensusResult(Enum):
"""Possible outcomes of the consensus voting."""
UNANIMOUS = "unanimous"
MAJORITY = "majority"
SPLIT = "split"
FAILURE = "failure"
@dataclass
class ExecutionResult:
"""Result from a single language implementation execution."""
language: str
output: Optional[str]
error: Optional[str]
return_code: int
execution_time_ms: float
@dataclass
class ConsensusOutcome:
"""Final outcome after consensus voting."""
result: ConsensusResult
consensus_value: Optional[str]
vote_count: int
total_implementations: int
anomalies: List[str]
execution_results: Dict[str, ExecutionResult]
# --- Configuration ---
# In a production system, these would be generated by LLMs based on OCL specs
PROGRAMS = {
"c": {
"compile": "gcc -o calc calc.c -lm",
"run": "./calc",
"source_file": "calc.c"
},
"java": {
"compile": "javac Calc.java",
"run": "java Calc",
"source_file": "Calc.java"
},
"python": {
"compile": None, # Interpreted language
"run": "python3 calc.py",
"source_file": "calc.py"
}
}
# Test input vector
INPUT_DATA = "1000 5 2" # Principal, Rate, Time
def compile_and_run(
lang: str,
config: Dict,
input_args: str
) -> ExecutionResult:
"""
Compiles (if needed) and runs the specific language implementation.
Args:
lang: The programming language identifier
config: Configuration dict with compile/run commands
input_args: Input arguments to pass to the programme
Returns:
ExecutionResult with output, errors and timing data
"""
start_time = time.perf_counter()
# Phase 1: Compilation (if required)
if config["compile"]:
try:
compile_proc = subprocess.run(
config["compile"],
shell=True,
capture_output=True,
timeout=30
)
if compile_proc.returncode != 0:
return ExecutionResult(
language=lang,
output=None,
error=f"Compile Error: {compile_proc.stderr.decode()}",
return_code=compile_proc.returncode,
execution_time_ms=(time.perf_counter() - start_time) * 1000
)
except subprocess.TimeoutExpired:
return ExecutionResult(
language=lang,
output=None,
error="Compilation timed out after 30 seconds",
return_code=-1,
execution_time_ms=(time.perf_counter() - start_time) * 1000
)
# Phase 2: Execution
try:
cmd = f"{config['run']} {input_args}"
run_proc = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=5 # Strict timeout for execution
)
execution_time = (time.perf_counter() - start_time) * 1000
if run_proc.returncode != 0:
return ExecutionResult(
language=lang,
output=None,
error=f"Runtime Error: {run_proc.stderr}",
return_code=run_proc.returncode,
execution_time_ms=execution_time
)
return ExecutionResult(
language=lang,
output=run_proc.stdout.strip(),
error=None,
return_code=0,
execution_time_ms=execution_time
)
except subprocess.TimeoutExpired:
return ExecutionResult(
language=lang,
output=None,
error="Execution timed out after 5 seconds",
return_code=-1,
execution_time_ms=(time.perf_counter() - start_time) * 1000
)
except Exception as e:
return ExecutionResult(
language=lang,
output=None,
error=str(e),
return_code=-1,
execution_time_ms=(time.perf_counter() - start_time) * 1000
)
def generate_remediation_prompt(
faulty_lang: str,
consensus_value: str,
faulty_output: str,
input_vector: str,
spec_id: str = "SPEC-001"
) -> str:
"""
Generates a remediation prompt for the LLM to patch the faulty code.
This is the "Self-Healing" component of the Parallax Protocol.
"""
return f"""
[REMEDIATION REQUIRED]
Your implementation failed the Consensus Check.
Specification ID: {spec_id}
Input Vector: [{input_vector}]
Spec Requirement: result = principal * (rate / 100) * time
Peer Implementations (C, Java) Output: {consensus_value}
Your ({faulty_lang}) Output: {faulty_output}
DIVERGENCE DETECTED: Your output differs from the consensus by \
{abs(float(consensus_value) - float(faulty_output)) if faulty_output.replace('.','').isdigit() else 'N/A'}
TASK: Analyse the divergence and patch the {faulty_lang} code immediately.
Return ONLY the corrected code with no commentary.
"""
def perform_consensus_voting(
results: Dict[str, ExecutionResult]
) -> ConsensusOutcome:
"""
Applies the Voter Algorithm to determine consensus.
Voting Logic:
- UNANIMOUS: All N versions agree. Result accepted with highest confidence.
- MAJORITY: >50% agree. Result accepted; anomalies flagged for remediation.
- SPLIT: No majority. Critical failure - manual intervention required.
- FAILURE: All implementations failed.
"""
# Filter successful executions
successful_outputs = {
lang: result.output
for lang, result in results.items()
if result.output is not None
}
if not successful_outputs:
return ConsensusOutcome(
result=ConsensusResult.FAILURE,
consensus_value=None,
vote_count=0,
total_implementations=len(PROGRAMS),
anomalies=list(results.keys()),
execution_results=results
)
# Count votes
vote_counts = Counter(successful_outputs.values())
consensus_value, votes = vote_counts.most_common(1)[0]
# Identify anomalies (implementations that disagree with consensus)
anomalies = [
lang for lang, output in successful_outputs.items()
if output != consensus_value
]
# Add failed implementations to anomalies
for lang, result in results.items():
if result.output is None and lang not in anomalies:
anomalies.append(lang)
# Determine consensus result
if votes == len(PROGRAMS):
result_type = ConsensusResult.UNANIMOUS
elif votes > len(PROGRAMS) / 2:
result_type = ConsensusResult.MAJORITY
else:
result_type = ConsensusResult.SPLIT
return ConsensusOutcome(
result=result_type,
consensus_value=consensus_value,
vote_count=votes,
total_implementations=len(PROGRAMS),
anomalies=anomalies,
execution_results=results
)
def log_to_elk(outcome: ConsensusOutcome, input_data: str) -> Dict:
"""
Formats the outcome for ELK Stack ingestion.
This would typically be sent to Logstash for indexing in Elasticsearch.
"""
return {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"input_vector": input_data,
"consensus_result": outcome.result.value,
"consensus_value": outcome.consensus_value,
"vote_ratio": f"{outcome.vote_count}/{outcome.total_implementations}",
"anomalies": outcome.anomalies,
"execution_times": {
lang: result.execution_time_ms
for lang, result in outcome.execution_results.items()
},
"hallucination_detected": len(outcome.anomalies) > 0
}
def main():
"""
Main entry point for the Arbiter.
Executes the full Tribunal Protocol:
1. Parallel execution of all implementations
2. Consensus voting
3. Anomaly detection
4. Remediation prompt generation (if needed)
"""
print("=" * 60)
print("THE ARBITER - Parallax Protocol Orchestrator")
print("=" * 60)
print(f"\nInitiating Tribunal Protocol on Input: [{INPUT_DATA}]\n")
results: Dict[str, ExecutionResult] = {}
# Phase 1: Parallel Execution using ThreadPool
print("--- Phase 1: Parallel Execution ---")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(compile_and_run, lang, config, INPUT_DATA): lang
for lang, config in PROGRAMS.items()
}
for future in concurrent.futures.as_completed(futures):
lang = futures[future]
result = future.result()
results[lang] = result
if result.error:
print(f"[!] {lang.upper():6} | ERROR: {result.error[:50]}...")
else:
print(f"[+] {lang.upper():6} | Output: {result.output} | "
f"Time: {result.execution_time_ms:.2f}ms")
# Phase 2: Consensus Voting
print("\n--- Phase 2: Tribunal Judgement ---")
outcome = perform_consensus_voting(results)
if outcome.result == ConsensusResult.UNANIMOUS:
print(f"\n✓ SUCCESS: Unanimous Consensus")
print(f" Accepted Value: {outcome.consensus_value}")
print(f" Confidence: MAXIMUM")
elif outcome.result == ConsensusResult.MAJORITY:
print(f"\n⚠ WARNING: Majority Consensus ({outcome.vote_count}/{outcome.total_implementations})")
print(f" Accepted Value: {outcome.consensus_value}")
print(f" Anomalies Detected: {outcome.anomalies}")
# Phase 3: Generate Remediation Prompts
print("\n--- Phase 3: Remediation ---")
for anomaly_lang in outcome.anomalies:
if anomaly_lang in results and results[anomaly_lang].output:
prompt = generate_remediation_prompt(
faulty_lang=anomaly_lang,
consensus_value=outcome.consensus_value,
faulty_output=results[anomaly_lang].output,
input_vector=INPUT_DATA
)
print(f"\n[REMEDIATION PROMPT FOR {anomaly_lang.upper()}]")
print("-" * 40)
print(prompt)
elif outcome.result == ConsensusResult.SPLIT:
print(f"\n✗ CRITICAL: Split Brain Scenario")
print(f" No consensus reached. Manual intervention required.")
print(f" Outputs: {[r.output for r in results.values() if r.output]}")
else:
print(f"\n✗ FAILURE: All implementations failed")
for lang, result in results.items():
print(f" {lang}: {result.error}")
# Output ELK-compatible log
elk_log = log_to_elk(outcome, INPUT_DATA)
print("\n--- ELK Log Entry ---")
print(json.dumps(elk_log, indent=2))
return outcome
if __name__ == "__main__":
main()Sample Generated Implementations
These are examples of the code that would be generated by LLM agents from the Golden Spec:
calc.c
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
double calculate_interest(
double principal,
double rate,
int time
) {
// Pre-conditions (OCL)
assert(principal > 0);
assert(rate >= 0 && rate <= 100);
assert(time > 0);
double result = principal *
(rate / 100.0) *
time;
// Post-conditions (OCL)
assert(result >= 0);
return result;
}
int main(int argc, char *argv[]) {
if (argc != 4) return 1;
double p = atof(argv[1]);
double r = atof(argv[2]);
int t = atoi(argv[3]);
printf("%.1f\n",
calculate_interest(p, r, t));
return 0;
}Calc.java
public class Calc {
public static double
calculateInterest(
double principal,
double rate,
int time
) {
// Pre-conditions (OCL)
assert principal > 0 :
"principal must be > 0";
assert rate >= 0 && rate <= 100 :
"rate must be 0-100";
assert time > 0 :
"time must be > 0";
double result = principal *
(rate / 100.0) *
time;
// Post-conditions (OCL)
assert result >= 0 :
"result must be >= 0";
return result;
}
public static void main(
String[] args
) {
double p = Double
.parseDouble(args[0]);
double r = Double
.parseDouble(args[1]);
int t = Integer
.parseInt(args[2]);
System.out.println(
calculateInterest(p, r, t));
}
}calc.py
#!/usr/bin/env python3
"""
Interest Calculator
Generated from OCL Spec
"""
import sys
def calculate_interest(
principal: float,
rate: float,
time: int
) -> float:
"""
Calculate simple interest.
OCL Constraints enforced.
"""
# Pre-conditions (OCL)
assert principal > 0, \
"principal must be > 0"
assert 0 <= rate <= 100, \
"rate must be 0-100"
assert time > 0, \
"time must be > 0"
result = principal * \
(rate / 100) * \
time
# Post-conditions (OCL)
assert result >= 0, \
"result must be >= 0"
return result
if __name__ == "__main__":
p = float(sys.argv[1])
r = float(sys.argv[2])
t = int(sys.argv[3])
print(calculate_interest(
p, r, t
))Execution Flow Diagram
Golden Spec (OCL + Natural Language)
LLM Agents (Parallel Generation)
C Code
Java Code
Python Code
Docker Sandbox (Parallel Execution)
Consensus Engine (Majority Voting)
Accepted Output
Remediation (if needed)