Reference Implementation

The Arbiter: Meta Framework

Complete Python implementation of the Orchestrator with Execution, Voting and Anomaly Detection logic

System Overview

Parallel Execution

ThreadPoolExecutor manages concurrent compilation and execution across all language runtimes.

Consensus Voting

Counter-based majority voting algorithm determines ground truth from implementation outputs.

Anomaly Detection

Automatic identification of dissenting implementations with remediation prompt generation.

Complete Arbiter Implementation

arbiter.py
Python 3.10+
#!/usr/bin/env python3
"""
The Arbiter: Parallax Protocol Orchestrator
============================================
A Meta Framework for Consensus-Driven Polyglot Development.

This implementation demonstrates the core concepts of N-Version Programming
with LLM-generated code and majority voting consensus.

Author: Parallax Protocol Research
Licence: Public Domain (Defensive Publication)
"""

import subprocess
import concurrent.futures
from collections import Counter
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from enum import Enum
import json
import time


class ConsensusResult(Enum):
    """Possible outcomes of the consensus voting."""
    UNANIMOUS = "unanimous"
    MAJORITY = "majority"
    SPLIT = "split"
    FAILURE = "failure"


@dataclass
class ExecutionResult:
    """Result from a single language implementation execution."""
    language: str
    output: Optional[str]
    error: Optional[str]
    return_code: int
    execution_time_ms: float


@dataclass
class ConsensusOutcome:
    """Final outcome after consensus voting."""
    result: ConsensusResult
    consensus_value: Optional[str]
    vote_count: int
    total_implementations: int
    anomalies: List[str]
    execution_results: Dict[str, ExecutionResult]


# --- Configuration ---
# In a production system, these would be generated by LLMs based on OCL specs
PROGRAMS = {
    "c": {
        "compile": "gcc -o calc calc.c -lm",
        "run": "./calc",
        "source_file": "calc.c"
    },
    "java": {
        "compile": "javac Calc.java",
        "run": "java Calc",
        "source_file": "Calc.java"
    },
    "python": {
        "compile": None,  # Interpreted language
        "run": "python3 calc.py",
        "source_file": "calc.py"
    }
}

# Test input vector
INPUT_DATA = "1000 5 2"  # Principal, Rate, Time


def compile_and_run(
    lang: str,
    config: Dict,
    input_args: str
) -> ExecutionResult:
    """
    Compiles (if needed) and runs the specific language implementation.

    Args:
        lang: The programming language identifier
        config: Configuration dict with compile/run commands
        input_args: Input arguments to pass to the programme

    Returns:
        ExecutionResult with output, errors and timing data
    """
    start_time = time.perf_counter()

    # Phase 1: Compilation (if required)
    if config["compile"]:
        try:
            compile_proc = subprocess.run(
                config["compile"],
                shell=True,
                capture_output=True,
                timeout=30
            )
            if compile_proc.returncode != 0:
                return ExecutionResult(
                    language=lang,
                    output=None,
                    error=f"Compile Error: {compile_proc.stderr.decode()}",
                    return_code=compile_proc.returncode,
                    execution_time_ms=(time.perf_counter() - start_time) * 1000
                )
        except subprocess.TimeoutExpired:
            return ExecutionResult(
                language=lang,
                output=None,
                error="Compilation timed out after 30 seconds",
                return_code=-1,
                execution_time_ms=(time.perf_counter() - start_time) * 1000
            )

    # Phase 2: Execution
    try:
        cmd = f"{config['run']} {input_args}"
        run_proc = subprocess.run(
            cmd,
            shell=True,
            capture_output=True,
            text=True,
            timeout=5  # Strict timeout for execution
        )

        execution_time = (time.perf_counter() - start_time) * 1000

        if run_proc.returncode != 0:
            return ExecutionResult(
                language=lang,
                output=None,
                error=f"Runtime Error: {run_proc.stderr}",
                return_code=run_proc.returncode,
                execution_time_ms=execution_time
            )

        return ExecutionResult(
            language=lang,
            output=run_proc.stdout.strip(),
            error=None,
            return_code=0,
            execution_time_ms=execution_time
        )

    except subprocess.TimeoutExpired:
        return ExecutionResult(
            language=lang,
            output=None,
            error="Execution timed out after 5 seconds",
            return_code=-1,
            execution_time_ms=(time.perf_counter() - start_time) * 1000
        )
    except Exception as e:
        return ExecutionResult(
            language=lang,
            output=None,
            error=str(e),
            return_code=-1,
            execution_time_ms=(time.perf_counter() - start_time) * 1000
        )


def generate_remediation_prompt(
    faulty_lang: str,
    consensus_value: str,
    faulty_output: str,
    input_vector: str,
    spec_id: str = "SPEC-001"
) -> str:
    """
    Generates a remediation prompt for the LLM to patch the faulty code.

    This is the "Self-Healing" component of the Parallax Protocol.
    """
    return f"""
[REMEDIATION REQUIRED]

Your implementation failed the Consensus Check.

Specification ID: {spec_id}
Input Vector: [{input_vector}]
Spec Requirement: result = principal * (rate / 100) * time

Peer Implementations (C, Java) Output: {consensus_value}
Your ({faulty_lang}) Output: {faulty_output}

DIVERGENCE DETECTED: Your output differs from the consensus by \
{abs(float(consensus_value) - float(faulty_output)) if faulty_output.replace('.','').isdigit() else 'N/A'}

TASK: Analyse the divergence and patch the {faulty_lang} code immediately.
Return ONLY the corrected code with no commentary.
"""


def perform_consensus_voting(
    results: Dict[str, ExecutionResult]
) -> ConsensusOutcome:
    """
    Applies the Voter Algorithm to determine consensus.

    Voting Logic:
    - UNANIMOUS: All N versions agree. Result accepted with highest confidence.
    - MAJORITY: >50% agree. Result accepted; anomalies flagged for remediation.
    - SPLIT: No majority. Critical failure - manual intervention required.
    - FAILURE: All implementations failed.
    """
    # Filter successful executions
    successful_outputs = {
        lang: result.output
        for lang, result in results.items()
        if result.output is not None
    }

    if not successful_outputs:
        return ConsensusOutcome(
            result=ConsensusResult.FAILURE,
            consensus_value=None,
            vote_count=0,
            total_implementations=len(PROGRAMS),
            anomalies=list(results.keys()),
            execution_results=results
        )

    # Count votes
    vote_counts = Counter(successful_outputs.values())
    consensus_value, votes = vote_counts.most_common(1)[0]

    # Identify anomalies (implementations that disagree with consensus)
    anomalies = [
        lang for lang, output in successful_outputs.items()
        if output != consensus_value
    ]

    # Add failed implementations to anomalies
    for lang, result in results.items():
        if result.output is None and lang not in anomalies:
            anomalies.append(lang)

    # Determine consensus result
    if votes == len(PROGRAMS):
        result_type = ConsensusResult.UNANIMOUS
    elif votes > len(PROGRAMS) / 2:
        result_type = ConsensusResult.MAJORITY
    else:
        result_type = ConsensusResult.SPLIT

    return ConsensusOutcome(
        result=result_type,
        consensus_value=consensus_value,
        vote_count=votes,
        total_implementations=len(PROGRAMS),
        anomalies=anomalies,
        execution_results=results
    )


def log_to_elk(outcome: ConsensusOutcome, input_data: str) -> Dict:
    """
    Formats the outcome for ELK Stack ingestion.

    This would typically be sent to Logstash for indexing in Elasticsearch.
    """
    return {
        "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        "input_vector": input_data,
        "consensus_result": outcome.result.value,
        "consensus_value": outcome.consensus_value,
        "vote_ratio": f"{outcome.vote_count}/{outcome.total_implementations}",
        "anomalies": outcome.anomalies,
        "execution_times": {
            lang: result.execution_time_ms
            for lang, result in outcome.execution_results.items()
        },
        "hallucination_detected": len(outcome.anomalies) > 0
    }


def main():
    """
    Main entry point for the Arbiter.

    Executes the full Tribunal Protocol:
    1. Parallel execution of all implementations
    2. Consensus voting
    3. Anomaly detection
    4. Remediation prompt generation (if needed)
    """
    print("=" * 60)
    print("THE ARBITER - Parallax Protocol Orchestrator")
    print("=" * 60)
    print(f"\nInitiating Tribunal Protocol on Input: [{INPUT_DATA}]\n")

    results: Dict[str, ExecutionResult] = {}

    # Phase 1: Parallel Execution using ThreadPool
    print("--- Phase 1: Parallel Execution ---")
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        futures = {
            executor.submit(compile_and_run, lang, config, INPUT_DATA): lang
            for lang, config in PROGRAMS.items()
        }

        for future in concurrent.futures.as_completed(futures):
            lang = futures[future]
            result = future.result()
            results[lang] = result

            if result.error:
                print(f"[!] {lang.upper():6} | ERROR: {result.error[:50]}...")
            else:
                print(f"[+] {lang.upper():6} | Output: {result.output} | "
                      f"Time: {result.execution_time_ms:.2f}ms")

    # Phase 2: Consensus Voting
    print("\n--- Phase 2: Tribunal Judgement ---")
    outcome = perform_consensus_voting(results)

    if outcome.result == ConsensusResult.UNANIMOUS:
        print(f"\n✓ SUCCESS: Unanimous Consensus")
        print(f"  Accepted Value: {outcome.consensus_value}")
        print(f"  Confidence: MAXIMUM")

    elif outcome.result == ConsensusResult.MAJORITY:
        print(f"\n⚠ WARNING: Majority Consensus ({outcome.vote_count}/{outcome.total_implementations})")
        print(f"  Accepted Value: {outcome.consensus_value}")
        print(f"  Anomalies Detected: {outcome.anomalies}")

        # Phase 3: Generate Remediation Prompts
        print("\n--- Phase 3: Remediation ---")
        for anomaly_lang in outcome.anomalies:
            if anomaly_lang in results and results[anomaly_lang].output:
                prompt = generate_remediation_prompt(
                    faulty_lang=anomaly_lang,
                    consensus_value=outcome.consensus_value,
                    faulty_output=results[anomaly_lang].output,
                    input_vector=INPUT_DATA
                )
                print(f"\n[REMEDIATION PROMPT FOR {anomaly_lang.upper()}]")
                print("-" * 40)
                print(prompt)

    elif outcome.result == ConsensusResult.SPLIT:
        print(f"\n✗ CRITICAL: Split Brain Scenario")
        print(f"  No consensus reached. Manual intervention required.")
        print(f"  Outputs: {[r.output for r in results.values() if r.output]}")

    else:
        print(f"\n✗ FAILURE: All implementations failed")
        for lang, result in results.items():
            print(f"  {lang}: {result.error}")

    # Output ELK-compatible log
    elk_log = log_to_elk(outcome, INPUT_DATA)
    print("\n--- ELK Log Entry ---")
    print(json.dumps(elk_log, indent=2))

    return outcome


if __name__ == "__main__":
    main()

Sample Generated Implementations

These are examples of the code that would be generated by LLM agents from the Golden Spec:

calc.c
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>

double calculate_interest(
    double principal,
    double rate,
    int time
) {
    // Pre-conditions (OCL)
    assert(principal > 0);
    assert(rate >= 0 && rate <= 100);
    assert(time > 0);

    double result = principal *
                   (rate / 100.0) *
                   time;

    // Post-conditions (OCL)
    assert(result >= 0);

    return result;
}

int main(int argc, char *argv[]) {
    if (argc != 4) return 1;

    double p = atof(argv[1]);
    double r = atof(argv[2]);
    int t = atoi(argv[3]);

    printf("%.1f\n",
           calculate_interest(p, r, t));
    return 0;
}
Calc.java
public class Calc {
    public static double
    calculateInterest(
        double principal,
        double rate,
        int time
    ) {
        // Pre-conditions (OCL)
        assert principal > 0 :
            "principal must be > 0";
        assert rate >= 0 && rate <= 100 :
            "rate must be 0-100";
        assert time > 0 :
            "time must be > 0";

        double result = principal *
                       (rate / 100.0) *
                       time;

        // Post-conditions (OCL)
        assert result >= 0 :
            "result must be >= 0";

        return result;
    }

    public static void main(
        String[] args
    ) {
        double p = Double
            .parseDouble(args[0]);
        double r = Double
            .parseDouble(args[1]);
        int t = Integer
            .parseInt(args[2]);

        System.out.println(
            calculateInterest(p, r, t));
    }
}
calc.py
#!/usr/bin/env python3
"""
Interest Calculator
Generated from OCL Spec
"""
import sys


def calculate_interest(
    principal: float,
    rate: float,
    time: int
) -> float:
    """
    Calculate simple interest.

    OCL Constraints enforced.
    """
    # Pre-conditions (OCL)
    assert principal > 0, \
        "principal must be > 0"
    assert 0 <= rate <= 100, \
        "rate must be 0-100"
    assert time > 0, \
        "time must be > 0"

    result = principal * \
             (rate / 100) * \
             time

    # Post-conditions (OCL)
    assert result >= 0, \
        "result must be >= 0"

    return result


if __name__ == "__main__":
    p = float(sys.argv[1])
    r = float(sys.argv[2])
    t = int(sys.argv[3])

    print(calculate_interest(
        p, r, t
    ))

Execution Flow Diagram

Golden Spec (OCL + Natural Language)
LLM Agents (Parallel Generation)
C Code
Java Code
Python Code
Docker Sandbox (Parallel Execution)
Consensus Engine (Majority Voting)
Accepted Output
Remediation (if needed)