adding poc codebase

This commit is contained in:
martianlantern
2025-08-24 09:03:38 +05:30
parent 0142870f45
commit b0f522d4d7
32 changed files with 826 additions and 1 deletions

132
README.md
View File

@@ -1 +1,131 @@
# ThinkMesh
# ThinkMesh
ThinkMesh is a python library for running diverse reasoning paths in parallel, scoring them with internal confidence signals, reallocates compute to promising branches, and fuses outcomes with verifiers and reducers. It works with offline Hugging Face Transformers and vLLM/TGI, and with hosted APIs.
## Highlights
- Parallel reasoning with DeepConfstyle confidence gating and budget reallocation
- Offlinefirst with Transformers; optional vLLM/TGI for serverside batching
- Hosted adapters for OpenAI and Anthropic
- Async execution with dynamic microbatches
- Reducers (majority/judge) and pluggable verifiers (regex/numeric/custom)
- Caching, metrics, and JSON traces
## Install
```bash
pip install thinkmesh
pip install "thinkmesh[transformers]"
pip install "thinkmesh[openai]"
pip install "thinkmesh[anthropic]"
pip install "thinkmesh[vllm]"
pip install "thinkmesh[tgi]"
```
## Quickstart: Offline DeepConf
```python
from thinkmesh import think, ThinkConfig, ModelSpec, StrategySpec
cfg = ThinkConfig(
model=ModelSpec(backend="transformers", model_name="Qwen2.5-7B-Instruct",
max_tokens=256, temperature=0.7, seed=42, extra={"device":"cuda:0"}),
strategy=StrategySpec(name="deepconf", parallel=8, max_steps=2,
deepconf={"k":5,"tau_low":-1.25,"tau_ent":2.2,"realloc_top_p":0.4}),
reducer={"name":"majority"},
budgets={"wall_clock_s":20,"tokens":4000},
)
ans = think("Show that the product of any three consecutive integers is divisible by 3.", cfg)
print(ans.content, ans.confidence)
```
## Quickstart: OpenAI SelfConsistency
```python
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
from thinkmesh import think, ThinkConfig, ModelSpec, StrategySpec
cfg = ThinkConfig(
model=ModelSpec(backend="openai", model_name="gpt-4o-mini", max_tokens=256, temperature=0.6),
strategy=StrategySpec(name="self_consistency", parallel=6, max_steps=1),
reducer={"name":"majority"},
budgets={"wall_clock_s":15,"tokens":3000},
)
print(think("List three creative uses for a paperclip.", cfg).content)
```
## CLI
```bash
thinkmesh think -m Qwen2.5-7B-Instruct --backend transformers --strategy deepconf "What is 37*43?"
```
## Examples
### Debate Strategy (hosted)
```python
from thinkmesh import think, ThinkConfig, ModelSpec, StrategySpec
cfg = ThinkConfig(
model=ModelSpec(backend="openai", model_name="gpt-4o-mini", max_tokens=256, temperature=0.7),
strategy=StrategySpec(name="debate", parallel=4, max_steps=2, debate={"rounds":2}),
reducer={"name":"judge"},
budgets={"wall_clock_s":25,"tokens":5000},
)
print(think("Argue whether every even integer > 2 is the sum of two primes.", cfg).content)
```
### vLLM Local Server
```python
from thinkmesh import think, ThinkConfig, ModelSpec, StrategySpec
cfg = ThinkConfig(
model=ModelSpec(backend="vllm", model_name="Qwen2.5-7B-Instruct",
max_tokens=256, temperature=0.7, extra={"base_url":"http://localhost:8000/v1","api_key":"sk-"}),
strategy=StrategySpec(name="deepconf", parallel=8, max_steps=2, deepconf={"k":5}),
reducer={"name":"majority"},
budgets={"wall_clock_s":20,"tokens":4000},
)
print(think("Give a constructive proof for the Pigeonhole Principle on a simple case.", cfg).content)
```
### Custom Verifier
```python
from thinkmesh import think, ThinkConfig, ModelSpec, StrategySpec
cfg = ThinkConfig(
model=ModelSpec(backend="transformers", model_name="Qwen2.5-7B-Instruct", max_tokens=128),
strategy=StrategySpec(name="self_consistency", parallel=5, max_steps=1),
reducer={"name":"majority"},
verifier={"type":"regex","pattern":r"Final Answer\s*:\s*.+$"},
budgets={"wall_clock_s":10,"tokens":1500},
)
print(think("Answer with 'Final Answer: <value>' for 19*21.", cfg).content)
```
### Tree Of Thought (offline)
```python
from thinkmesh import think, ThinkConfig, ModelSpec, StrategySpec
cfg = ThinkConfig(
model=ModelSpec(backend="transformers", model_name="Qwen2.5-7B-Instruct", max_tokens=192),
strategy=StrategySpec(name="tree", parallel=6, max_steps=2, tree={"branches":3,"depth":2}),
reducer={"name":"majority"},
budgets={"wall_clock_s":20,"tokens":3500},
)
print(think("Sketch a plan to prove that sqrt(2) is irrational.", cfg).content)
```
## Traces, Metrics, Caching
Traces are emitted as JSON graphs inside the returned structure. Prometheus metrics and OpenTelemetry spans can be enabled via config extras. A local disk cache deduplicates repeated generations by hashing adapter, model, prompt, and params.
## Extending
- Implement a new backend by providing a `Thinker.generate` method that returns token text and optional token logprobs
- Add a new strategy by wiring a function in `thinkmesh/strategies` and registering by name
- Add reducers/verifiers under `thinkmesh/reduce`
## License
MIT

46
pyproject.toml Normal file
View File

@@ -0,0 +1,46 @@
[build-system]
requires = ["hatchling>=1.25"]
build-backend = "hatchling.build"
[project]
name = "thinkmesh"
version = "0.1.1"
description = "Parallel thinking for LLMs with DeepConf-style confidence gating, offline/online backends, and async batching."
readme = "README.md"
requires-python = ">=3.11"
license = "MIT"
authors = [{name="ThinkMesh Authors"}]
dependencies = [
"pydantic>=2.7",
"anyio>=4.3",
"pluggy>=1.5",
"dataclasses-json>=0.6",
"typer>=0.12",
"structlog>=24.1",
"orjson>=3.10",
"prometheus-client>=0.20",
"opentelemetry-sdk>=1.24",
"diskcache>=5.6"
]
[project.optional-dependencies]
transformers = ["transformers>=4.42", "accelerate>=0.31", "torch>=2.3"]
vllm = ["vllm>=0.5", "openai>=1.40"]
tgi = ["text-generation>=0.7"]
openai = ["openai>=1.40"]
anthropic = ["anthropic>=0.30"]
obs = ["opentelemetry-exporter-otlp>=1.24"]
dev = ["pytest>=8.3", "hypothesis>=6.104", "ruff>=0.5", "mypy>=1.11"]
[project.scripts]
thinkmesh = "thinkmesh.cli.thinkmesh_cli:app"
[tool.hatch.build.targets.wheel]
packages = ["src/thinkmesh"]
[tool.ruff]
line-length = 100
[tool.mypy]
python_version = "3.11"
strict = true

View File

@@ -0,0 +1,2 @@
from .core import think, Answer, Trace
from .config import ThinkConfig, ModelSpec, StrategySpec

View File

@@ -0,0 +1,23 @@
from typing import List, Dict, Any
from .base import GenResult
from ..config import ModelSpec
class AnthropicAdapter:
def __init__(self, model: ModelSpec):
from anthropic import AsyncAnthropic
self.client = AsyncAnthropic()
self.model = model
def supports_logprobs(self) -> bool:
return False
def max_batch_size(self) -> int:
return int(self.model.extra.get("batch_size", 4))
async def generate(self, prompts: List[str], *, params: Dict[str, Any]) -> List[GenResult]:
out = []
for p in prompts:
r = await self.client.messages.create(model=self.model.model_name, max_tokens=params.get("max_tokens", self.model.max_tokens), temperature=self.model.temperature, top_p=self.model.top_p, messages=[{"role":"user","content":p}])
txt = "".join([c.text for c in r.content if getattr(c,"type","text")=="text"])
out.append(GenResult(text=txt, tokens=None, token_logprobs=None, finish_reason="stop", meta={}))
return out

View File

@@ -0,0 +1,34 @@
from typing import Protocol, List, Dict, Any, Optional
from dataclasses import dataclass
from ..config import ModelSpec
@dataclass
class GenResult:
text: str
tokens: Optional[list]
token_logprobs: Optional[list]
finish_reason: str
meta: Dict[str, Any]
class Thinker(Protocol):
async def generate(self, prompts: List[str], *, params: Dict[str, Any]) -> List[GenResult]: ...
def supports_logprobs(self) -> bool: ...
def max_batch_size(self) -> int: ...
async def load_thinker(model: ModelSpec) -> Thinker:
if model.backend == "transformers":
from .transformers_local import TransformersLocal
return await TransformersLocal.create(model)
if model.backend == "openai":
from .openai_api import OpenAIAdapter
return OpenAIAdapter(model)
if model.backend == "anthropic":
from .anthropic_api import AnthropicAdapter
return AnthropicAdapter(model)
if model.backend == "vllm":
from .vllm import VLLMAdapter
return VLLMAdapter(model)
if model.backend == "tgi":
from .tgi import TGIAdapter
return TGIAdapter(model)
raise ValueError("unknown backend")

View File

@@ -0,0 +1,29 @@
from typing import List, Dict, Any
from .base import GenResult
from ..config import ModelSpec
class OpenAIAdapter:
def __init__(self, model: ModelSpec):
from openai import AsyncOpenAI
self.client = AsyncOpenAI()
self.model = model
def supports_logprobs(self) -> bool:
return False
def max_batch_size(self) -> int:
return int(self.model.extra.get("batch_size", 4))
async def generate(self, prompts: List[str], *, params: Dict[str, Any]) -> List[GenResult]:
out = []
for p in prompts:
r = await self.client.responses.create(model=self.model.model_name, input=p, max_output_tokens=params.get("max_tokens", self.model.max_tokens), temperature=self.model.temperature, top_p=self.model.top_p)
try:
txt = r.output_text
except Exception:
try:
txt = r.output[0].content[0].text
except Exception:
txt = str(r.to_dict_recursive())
out.append(GenResult(text=txt, tokens=None, token_logprobs=None, finish_reason="stop", meta={}))
return out

View File

@@ -0,0 +1,22 @@
from typing import List, Dict, Any
from .base import GenResult
from ..config import ModelSpec
class TGIAdapter:
def __init__(self, model: ModelSpec):
self.model = model
def supports_logprobs(self) -> bool:
return False
def max_batch_size(self) -> int:
return int(self.model.extra.get("batch_size", 8))
async def generate(self, prompts: List[str], *, params: Dict[str, Any]) -> List[GenResult]:
from text_generation import AsyncClient
client = AsyncClient(self.model.extra.get("endpoint","http://localhost:8080"))
res = []
for p in prompts:
r = await client.generate(p, max_new_tokens=params.get("max_tokens", self.model.max_tokens), temperature=self.model.temperature, top_p=self.model.top_p)
res.append(GenResult(text=r.generated_text, tokens=None, token_logprobs=None, finish_reason="stop", meta={}))
return res

View File

@@ -0,0 +1,57 @@
from typing import List, Dict, Any
from .base import GenResult
from ..config import ModelSpec
class TransformersLocal:
def __init__(self, model: ModelSpec, pipe):
self.model = model
self.pipe = pipe
self.batch_size = int(model.extra.get("batch_size", 4))
@staticmethod
async def create(model: ModelSpec):
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
device = model.extra.get("device","cpu")
dtype = model.extra.get("dtype","auto")
tok = AutoTokenizer.from_pretrained(model.model_name, use_fast=True)
if isinstance(dtype, str):
if dtype == "auto":
torch_dtype = torch.float16
else:
torch_dtype = getattr(torch, dtype)
else:
torch_dtype = dtype
mdl = AutoModelForCausalLM.from_pretrained(model.model_name, torch_dtype=torch_dtype, device_map="auto" if device!="cpu" else None)
return TransformersLocal(model, (mdl, tok, device))
def supports_logprobs(self) -> bool:
return True
def max_batch_size(self) -> int:
return self.batch_size
async def generate(self, prompts: List[str], *, params: Dict[str, Any]) -> List[GenResult]:
import torch
mdl, tok, device = self.pipe
inputs = tok(prompts, return_tensors="pt", padding=True, truncation=True).to(mdl.device)
gen_kwargs = dict(max_new_tokens=params.get("max_tokens", self.model.max_tokens), do_sample=(self.model.temperature or 0) > 0, temperature=self.model.temperature, top_p=self.model.top_p, return_dict_in_generate=True, output_scores=True)
with torch.no_grad():
out = mdl.generate(**inputs, **gen_kwargs)
scores = out.scores
seqs = out.sequences
res = []
attn = inputs["attention_mask"]
for i in range(seqs.size(0)):
input_len = int(attn[i].sum().item())
gen_ids = seqs[i][input_len:]
toks = tok.convert_ids_to_tokens(gen_ids.tolist())
lps = []
for t in range(len(gen_ids)):
step_logits = scores[t][i]
logprobs = step_logits.log_softmax(dim=-1)
lp = float(logprobs[gen_ids[t]].item())
lps.append(lp)
text = tok.decode(gen_ids, skip_special_tokens=True)
res.append(GenResult(text=text, tokens=toks, token_logprobs=lps, finish_reason="length", meta={"tokens": len(gen_ids)}))
return res

View File

@@ -0,0 +1,23 @@
from typing import List, Dict, Any
from .base import GenResult
from ..config import ModelSpec
class VLLMAdapter:
def __init__(self, model: ModelSpec):
self.model = model
def supports_logprobs(self) -> bool:
return False
def max_batch_size(self) -> int:
return int(self.model.extra.get("batch_size", 8))
async def generate(self, prompts: List[str], *, params: Dict[str, Any]) -> List[GenResult]:
from openai import AsyncOpenAI
client = AsyncOpenAI(base_url=self.model.extra.get("base_url","http://localhost:8000/v1"), api_key=self.model.extra.get("api_key","sk-"))
out = []
for p in prompts:
r = await client.completions.create(model=self.model.model_name, prompt=p, max_tokens=params.get("max_tokens", self.model.max_tokens), temperature=self.model.temperature, top_p=self.model.top_p)
txt = r.choices[0].text
out.append(GenResult(text=txt, tokens=None, token_logprobs=None, finish_reason="stop", meta={}))
return out

24
src/thinkmesh/cache/store.py vendored Normal file
View File

@@ -0,0 +1,24 @@
import hashlib
import orjson
from typing import Any, Dict
from diskcache import Cache as DC
class Cache:
def __init__(self, enabled: bool = True, ttl_s: int = 86400, path: str = ".thinkmesh_cache"):
self.enabled = enabled
self.ttl = ttl_s
self.db = DC(path) if enabled else None
def key(self, adapter: str, model: str, prompt: str, params: Dict[str, Any]) -> str:
h = hashlib.sha256(orjson.dumps([adapter, model, prompt, params])).hexdigest()
return h
def get(self, k: str):
if not self.enabled:
return None
return self.db.get(k, default=None)
def set(self, k: str, v: Any):
if not self.enabled:
return
self.db.set(k, v, expire=self.ttl)

View File

@@ -0,0 +1,25 @@
import json
import typer
from thinkmesh import think, ThinkConfig, ModelSpec, StrategySpec
app = typer.Typer()
@app.command("think")
def think_cmd(prompt: str, backend: str = typer.Option("transformers", "--backend","-b"),
model: str = typer.Option("Qwen2.5-7B-Instruct", "--model","-m"),
strategy: str = typer.Option("deepconf","--strategy","-s"),
parallel: int = typer.Option(8,"--parallel","-p"),
max_steps: int = typer.Option(2,"--max-steps"),
max_tokens: int = typer.Option(256,"--max-tokens"),
temperature: float = typer.Option(0.7,"--temperature","-t")):
cfg = ThinkConfig(
model=ModelSpec(backend=backend, model_name=model, max_tokens=max_tokens, temperature=temperature),
strategy=StrategySpec(name=strategy, parallel=parallel, max_steps=max_steps),
reducer={"name":"majority"},
budgets={"wall_clock_s":30,"tokens":8000},
)
ans = think(prompt, cfg)
print(json.dumps({"content": ans.content, "confidence": ans.confidence, "meta": ans.meta}, ensure_ascii=False))
def app():
typer.run(think_cmd)

View File

@@ -0,0 +1,27 @@
import math
import re
from typing import List
def avg_logprob_last_k(lps: List[float], k: int = 5) -> float:
if not lps:
return 0.0
k = max(1, min(k, len(lps)))
return float(sum(lps[-k:]) / k)
def entropy_last_k(lps: List[float], k: int = 5) -> float:
if not lps:
return 0.0
k = max(1, min(k, len(lps)))
xs = [math.exp(x) for x in lps[-k:]]
s = sum(xs)
ps = [x/s for x in xs]
return float(-sum(p*math.log(p+1e-12) for p in ps))
def self_rated_confidence(text: str) -> float:
m = re.search(r"<confidence>\s*([0-1](?:\.\d+)?)", text)
if not m:
return 0.5
try:
return max(0.0, min(1.0, float(m.group(1))))
except:
return 0.5

View File

@@ -0,0 +1,8 @@
from typing import List, Tuple
def select_top_p(scored: List[Tuple[object, float]], p: float) -> List[Tuple[object, float]]:
if not scored:
return []
scored = sorted(scored, key=lambda x: x[1], reverse=True)
n = max(1, int(len(scored) * p))
return scored[:n]

29
src/thinkmesh/config.py Normal file
View File

@@ -0,0 +1,29 @@
from pydantic import BaseModel, Field
from typing import Literal, Optional, Dict, Any
class ModelSpec(BaseModel):
backend: Literal["transformers","vllm","tgi","openai","anthropic"]
model_name: str
max_tokens: int = 512
temperature: float = 0.7
top_p: Optional[float] = None
seed: Optional[int] = None
extra: Dict[str, Any] = Field(default_factory=dict)
class StrategySpec(BaseModel):
name: Literal["deepconf","self_consistency","debate","tree","graph"]
parallel: int = 8
max_steps: int = 2
deepconf: Dict[str, Any] = Field(default_factory=dict)
debate: Dict[str, Any] = Field(default_factory=dict)
tree: Dict[str, Any] = Field(default_factory=dict)
graph: Dict[str, Any] = Field(default_factory=dict)
class ThinkConfig(BaseModel):
model: ModelSpec
strategy: StrategySpec
reducer: Dict[str, Any] = Field(default_factory=lambda: {"name":"majority"})
verifier: Optional[Dict[str, Any]] = None
budgets: Dict[str, Any] = Field(default_factory=lambda: {"wall_clock_s":30,"tokens":8000})
cache: Dict[str, Any] = Field(default_factory=lambda: {"enabled":True,"ttl_s":86400})
telemetry: Dict[str, Any] = Field(default_factory=lambda: {"otel":False,"metrics":True,"trace_dump":True})

19
src/thinkmesh/core.py Normal file
View File

@@ -0,0 +1,19 @@
import asyncio
from dataclasses import dataclass
from typing import Any, Dict
from .config import ThinkConfig
from .orchestrator import Orchestrator
@dataclass
class Answer:
content: str
confidence: float
meta: Dict[str, Any]
@dataclass
class Trace:
graph_json: Dict[str, Any]
logs_path: str | None
def think(task: str | Dict[str, Any], config: ThinkConfig) -> Answer:
return asyncio.run(Orchestrator(config).run(task))

View File

@@ -0,0 +1,33 @@
import time
from typing import Any, Dict
from .config import ThinkConfig
from .sched.runner import Runner
from .adapters.base import load_thinker
from .strategies.base import load_strategy
from .reduce.majority import reduce_majority
from .reduce.judge import reduce_judge
from .reduce.verifier import build_verifier
from .telemetry.logging import get_logger
from .cache.store import Cache
class Orchestrator:
def __init__(self, cfg: ThinkConfig):
self.cfg = cfg
self.logger = get_logger()
self.cache = Cache(enabled=cfg.cache.get("enabled", True), ttl_s=cfg.cache.get("ttl_s", 86400))
self.runner = Runner(self.cache, self.logger, budgets=cfg.budgets)
async def run(self, task: str | Dict[str, Any]):
thinker = await load_thinker(self.cfg.model)
strategy = load_strategy(self.cfg.strategy.name)
verifier = build_verifier(self.cfg.verifier) if self.cfg.verifier else None
start = time.time()
strat_res = await strategy(self.runner, thinker, task, self.cfg)
reducer_name = self.cfg.reducer.get("name","majority")
if reducer_name == "judge":
final = await reduce_judge(self.runner, thinker, task, self.cfg, strat_res["candidates"], self.cfg.reducer)
else:
final = reduce_majority(strat_res["candidates"])
elapsed = time.time() - start
final["answer"].meta["elapsed_s"] = elapsed
return final["answer"]

View File

@@ -0,0 +1,30 @@
from typing import Dict, Any, List
from ..core import Answer
judge_prompt = "You are a strict judge comparing two candidate answers for the SAME question. Return JSON with keys winner (\"A\"|\"B\"|\"tie\"), scoreA (0..1), scoreB (0..1), rationale."
async def reduce_judge(runner, thinker, task, cfg, candidates: List[Dict[str, Any]], rconf: Dict[str, Any]) -> Dict[str, Any]:
if len(candidates) == 1:
c = candidates[0]
ans = Answer(content=c["text"], confidence=float(c["scores"].get("conf", 0.0)), meta={})
return {"answer": ans, "candidates": candidates}
pairs = []
for i in range(0, len(candidates)-1, 2):
a = candidates[i]["text"]
b = candidates[i+1]["text"]
q = f"{judge_prompt}\nQuestion:\n{task}\nA:\n{a}\nB:\n{b}\nJSON:"
pairs.append(q)
res = await runner.judge(thinker, pairs, {"max_tokens": 128})
wins = []
for i, r in enumerate(res):
txt = r.text
w = "A" if "winner" in txt and '\"A\"' in txt else "B" if '\"B\"' in txt else "tie"
if w == "A":
wins.append(candidates[2*i])
elif w == "B":
wins.append(candidates[2*i+1])
if not wins:
wins = [max(candidates, key=lambda c: c["scores"].get("conf", 0.0))]
best = max(wins, key=lambda c: c["scores"].get("conf", 0.0))
ans = Answer(content=best["text"], confidence=float(best["scores"].get("conf", 0.0)), meta={})
return {"answer": ans, "candidates": candidates}

View File

@@ -0,0 +1,20 @@
import re
from collections import Counter
from typing import Dict, Any, List
from ..core import Answer
def normalize(x: str) -> str:
s = x.strip().lower()
m = re.search(r"[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?", s)
if m:
return m.group(0)
return s
def reduce_majority(cands: List[Dict[str, Any]]) -> Dict[str, Any]:
keys = [normalize(c["text"]) for c in cands]
counts = Counter(keys)
top_key, _ = counts.most_common(1)[0]
filtered = [c for c in cands if normalize(c["text"]) == top_key]
best = max(filtered, key=lambda c: c["scores"].get("conf", 0.0))
ans = Answer(content=best["text"], confidence=float(best["scores"].get("conf", 0.0)), meta={"votes": dict(counts)})
return {"answer": ans, "candidates": cands}

View File

@@ -0,0 +1,18 @@
import re
from typing import Callable, Dict, Any, Optional
def build_verifier(cfg: Optional[Dict[str, Any]]) -> Callable[[str], tuple[bool,float]]:
if not cfg:
return lambda x: (True, 1.0)
if cfg.get("type") == "regex":
pattern = re.compile(cfg.get("pattern",".*"), re.S)
def f(x: str):
ok = bool(pattern.search(x))
return (ok, 1.0 if ok else 0.0)
return f
if cfg.get("type") == "numeric":
def f(x: str):
m = re.search(r"[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?", x)
return (bool(m), 1.0 if m else 0.0)
return f
return lambda x: (True, 1.0)

View File

@@ -0,0 +1,18 @@
import time
class TokenBucket:
def __init__(self, rate: float, capacity: float):
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.updated = time.monotonic()
def consume(self, amount: float) -> bool:
now = time.monotonic()
elapsed = now - self.updated
self.updated = now
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
if self.tokens >= amount:
self.tokens -= amount
return True
return False

View File

@@ -0,0 +1,31 @@
import time
from typing import Any, Dict, List
from ..adapters.base import Thinker, GenResult
from ..sched.rate_limit import TokenBucket
from ..telemetry.metrics import Metrics
class Runner:
def __init__(self, cache, logger, budgets: Dict[str, Any]):
self.cache = cache
self.logger = logger
self.wall_clock_s = budgets.get("wall_clock_s", 30)
self.token_budget = budgets.get("tokens", 8000)
self.metrics = Metrics()
self.bucket = TokenBucket(rate=1e9, capacity=1e9)
async def generate_batched(self, thinker: Thinker, prompts: List[str], params: Dict[str, Any]) -> List[GenResult]:
batches = []
max_bs = max(1, thinker.max_batch_size())
for i in range(0, len(prompts), max_bs):
batches.append(prompts[i:i+max_bs])
out: List[GenResult] = []
start = time.time()
for b in batches:
r = await thinker.generate(b, params=params)
out.extend(r)
if time.time() - start > self.wall_clock_s:
break
return out
async def judge(self, thinker: Thinker, prompt_pairs: List[str], params: Dict[str, Any]) -> List[GenResult]:
return await self.generate_batched(thinker, prompt_pairs, params)

View File

@@ -0,0 +1,18 @@
from .deepconf import deepconf_run
from .self_consistency import self_consistency_run
from .debate import debate_run
from .tree import tree_run
from .graph import graph_run
def load_strategy(name: str):
if name == "deepconf":
return deepconf_run
if name == "self_consistency":
return self_consistency_run
if name == "debate":
return debate_run
if name == "tree":
return tree_run
if name == "graph":
return graph_run
raise ValueError("unknown strategy")

View File

@@ -0,0 +1,24 @@
from ..sched.runner import Runner
from ..adapters.base import Thinker
from ..config import ThinkConfig
from ..confidence.meters import self_rated_confidence
def seed_prompts(task: str, k: int):
return [f"You are Debater {i+1}. Read the problem and propose a solution. Then await rebuttal.\n{task}" for i in range(k)]
async def debate_run(runner: Runner, thinker: Thinker, task, cfg: ThinkConfig):
k = int(cfg.strategy.parallel)
rounds = int(cfg.strategy.debate.get("rounds", 2))
prompts = seed_prompts(str(task), k)
res = await runner.generate_batched(thinker, prompts, {"max_tokens": max(64, cfg.model.max_tokens//4)})
texts = [r.text for r in res]
for _ in range(rounds-1):
rebuttals = []
for i, t in enumerate(texts):
others = "\n".join([f"Debater {j+1}: {texts[j]}" for j in range(len(texts)) if j!=i])
rebuttals.append(f"Your earlier argument:\n{t}\nOpponents:\n{others}\nWrite a concise rebuttal and improved final answer.")
res = await runner.generate_batched(thinker, rebuttals, {"max_tokens": max(64, cfg.model.max_tokens//3)})
texts = [r.text for r in res]
cands = [{"text": x, "scores": {"conf": float(self_rated_confidence(x))}, "steps": []} for x in texts]
trace = {"nodes": [{"id": i, "conf": c["scores"]["conf"]} for i,c in enumerate(cands)], "edges": []}
return {"candidates": cands, "trace": trace}

View File

@@ -0,0 +1,54 @@
import math
from typing import Any, Dict, List
from ..sched.runner import Runner
from ..adapters.base import Thinker
from ..config import ThinkConfig
from ..confidence.meters import avg_logprob_last_k, entropy_last_k, self_rated_confidence
def make_variants(task: str, k: int) -> List[str]:
out = []
for i in range(k):
out.append(f"{task}\nVariant #{i+1}: Continue reasoning step by step. Provide a concise final answer.")
return out
async def deepconf_run(runner: Runner, thinker: Thinker, task: str | Dict[str, Any], cfg: ThinkConfig):
k = int(cfg.strategy.deepconf.get("k", 5))
tau_low = float(cfg.strategy.deepconf.get("tau_low", -1.25))
tau_ent = float(cfg.strategy.deepconf.get("tau_ent", 2.2))
realloc_top_p = float(cfg.strategy.deepconf.get("realloc_top_p", 0.4))
parallel = int(cfg.strategy.parallel)
step1_tokens = max(32, min(128, int(cfg.model.max_tokens * 0.25)))
step2_tokens = cfg.model.max_tokens
variants = make_variants(task if isinstance(task,str) else str(task), parallel)
step1 = await runner.generate_batched(thinker, variants, {"max_tokens": step1_tokens})
scored = []
for r in step1:
if thinker.supports_logprobs() and r.token_logprobs:
conf = avg_logprob_last_k(r.token_logprobs, k=min(k,len(r.token_logprobs)))
ent = entropy_last_k(r.token_logprobs, k=min(k,len(r.token_logprobs)))
else:
conf = self_rated_confidence(r.text)
ent = 0.0
keep = True
if thinker.supports_logprobs() and r.token_logprobs:
if conf < tau_low or ent > tau_ent:
keep = False
if keep:
scored.append((r, conf))
if not scored:
scored = [(max(step1, key=lambda x: len(x.text)), 0.0)]
scored.sort(key=lambda x: x[1], reverse=True)
top_n = max(1, int(math.ceil(len(scored) * realloc_top_p)))
seeds = []
for i in range(top_n):
seeds.append(scored[i][0].text + "\nContinue and finalize the solution. Provide the final answer clearly marked.")
step2 = await runner.generate_batched(thinker, seeds, {"max_tokens": step2_tokens})
cands = []
for i, r in enumerate(step2):
if thinker.supports_logprobs() and r.token_logprobs:
conf2 = avg_logprob_last_k(r.token_logprobs, k=min(k,len(r.token_logprobs)))
else:
conf2 = self_rated_confidence(r.text)
cands.append({"text": r.text, "scores": {"conf": float(conf2)}, "steps": [{"text": seeds[i], "scores": {"conf": float(scored[min(i,len(scored)-1)][1])}}]})
trace = {"nodes": [{"id": i, "conf": c["scores"]["conf"]} for i,c in enumerate(cands)], "edges": []}
return {"candidates": cands, "trace": trace}

View File

@@ -0,0 +1,14 @@
from ..sched.runner import Runner
from ..adapters.base import Thinker
from ..config import ThinkConfig
from ..confidence.meters import self_rated_confidence
async def graph_run(runner: Runner, thinker: Thinker, task, cfg: ThinkConfig):
k = int(cfg.strategy.parallel)
prompts = [f"{task}\nPath {i+1}: explore a unique chain of thought and propose an answer." for i in range(k)]
mids = await runner.generate_batched(thinker, prompts, {"max_tokens": max(64, cfg.model.max_tokens//3)})
cont = [m.text + "\nCross-check assumptions and provide the final answer." for m in mids]
finals = await runner.generate_batched(thinker, cont, {"max_tokens": cfg.model.max_tokens})
cands = [{"text": r.text, "scores": {"conf": float(self_rated_confidence(r.text))}, "steps": []} for r in finals]
trace = {"nodes": [{"id": i, "conf": c["scores"]["conf"]} for i,c in enumerate(cands)], "edges": []}
return {"candidates": cands, "trace": trace}

View File

@@ -0,0 +1,15 @@
from ..sched.runner import Runner
from ..adapters.base import Thinker
from ..config import ThinkConfig
from ..confidence.meters import self_rated_confidence
async def self_consistency_run(runner: Runner, thinker: Thinker, task, cfg: ThinkConfig):
k = int(cfg.strategy.parallel)
prompts = [str(task) for _ in range(k)]
res = await runner.generate_batched(thinker, prompts, {"max_tokens": cfg.model.max_tokens})
cands = []
for r in res:
conf = r.token_logprobs and sum(r.token_logprobs[-5:])/max(1,len(r.token_logprobs[-5:])) or self_rated_confidence(r.text)
cands.append({"text": r.text, "scores": {"conf": float(conf)}, "steps": []})
trace = {"nodes": [{"id": i, "conf": c["scores"]["conf"]} for i,c in enumerate(cands)], "edges": []}
return {"candidates": cands, "trace": trace}

View File

@@ -0,0 +1,21 @@
from ..sched.runner import Runner
from ..adapters.base import Thinker
from ..config import ThinkConfig
from ..confidence.meters import self_rated_confidence
async def tree_run(runner: Runner, thinker: Thinker, task, cfg: ThinkConfig):
branches = int(cfg.strategy.tree.get("branches", max(2, cfg.strategy.parallel//2)))
depth = int(cfg.strategy.tree.get("depth", min(2, cfg.strategy.max_steps)))
frontier = [str(task)]
for d in range(depth):
prompts = []
for f in frontier:
for b in range(branches):
prompts.append(f"{f}\nBranch {b+1}: explore a distinct line of reasoning and move toward a final answer.")
res = await runner.generate_batched(thinker, prompts, {"max_tokens": max(64, cfg.model.max_tokens//3)})
texts = [r.text for r in res]
frontier = [t + "\nConclude with a final answer." for t in texts[:branches]]
finals = await runner.generate_batched(thinker, frontier, {"max_tokens": cfg.model.max_tokens})
cands = [{"text": r.text, "scores": {"conf": float(self_rated_confidence(r.text))}, "steps": []} for r in finals]
trace = {"nodes": [{"id": i, "conf": c["scores"]["conf"]} for i,c in enumerate(cands)], "edges": []}
return {"candidates": cands, "trace": trace}

View File

@@ -0,0 +1,5 @@
import structlog
def get_logger():
structlog.configure(processors=[structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer()])
return structlog.get_logger()

View File

@@ -0,0 +1,9 @@
from prometheus_client import Counter, Histogram
class Metrics:
def __init__(self):
self.tokens_generated = Counter("thinkmesh_tokens_generated","")
self.branches_pruned = Counter("thinkmesh_branches_pruned","")
self.reallocations = Counter("thinkmesh_reallocations","")
self.latency_ms = Histogram("thinkmesh_latency_ms","")
self.batch_size = Histogram("thinkmesh_batch_size","")

View File

@@ -0,0 +1,11 @@
class OTEL:
def __init__(self, enabled: bool = False):
self.enabled = enabled
def span(self, name: str):
class N:
def __enter__(self_non):
return None
def __exit__(self_non, exc_type, exc, tb):
return False
return N()

View File

@@ -0,0 +1,2 @@
def paraphrase_seed(i: int) -> str:
return ["reason carefully","consider alternatives","be concise at the end","justify steps","avoid leaps"][i%5]

View File

@@ -0,0 +1,4 @@
import orjson
def dump_trace(trace: dict) -> str:
return orjson.dumps(trace).decode()