Reformatted from long file. Added a duplicate guard.
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-10-06 13:03:01 -07:00
parent 94498ec5b7
commit 047c1ac8ff
8 changed files with 315 additions and 516 deletions

0
app/__init__.py Normal file
View File

17
app/ai.py Normal file
View File

@@ -0,0 +1,17 @@
from openai import OpenAI
from . import config, prompt, history
_client = OpenAI(api_key=config.OPENAI_API_KEY)
async def generate_reply() -> str:
msgs = history.build_messages(prompt.SYSTEM_PROMPT)
msgs.append({"role": "user", "content": prompt.USER_NUDGE})
resp = _client.chat.completions.create(
model=config.OPENAI_MODEL,
messages=msgs,
temperature=0.8,
max_tokens=180,
presence_penalty=0.3,
frequency_penalty=0.2,
)
return resp.choices[0].message.content.strip()

83
app/bot.py Normal file
View File

@@ -0,0 +1,83 @@
import asyncio, json
from telethon import TelegramClient, events
from telethon.tl.types import User
from . import config, history, ai, tg_helpers
def _load_last_ids(path):
if path.exists():
try: return json.loads(path.read_text(encoding="utf-8"))
except Exception: pass
return {}
def _save_last_ids(path, data):
try: path.write_text(json.dumps(data), encoding="utf-8")
except Exception as e: print(f"Warning: failed to save last ids: {e}")
async def run():
config.require()
client = TelegramClient(config.SESSION_PREFIX, config.API_ID, config.API_HASH)
await client.start()
target_entity = await tg_helpers.resolve_target_entity(client)
if target_entity:
print(f"Target resolved: id={target_entity.id}, username={getattr(target_entity,'username',None)}")
tg_helpers.cache_target_id(target_entity)
else:
print("Target not resolved yet. Will match dynamically on first incoming message from target.")
# Startup catch-up: only if needed (simple version relies on handler later)
# Debounce state
pending = {}
last_ids = _load_last_ids(config.LAST_IDS_FILE)
async def debounced_reply(chat_id: int):
await asyncio.sleep(config.DEBOUNCE_SEC)
try:
reply = await ai.generate_reply()
except Exception as e:
print(f"OpenAI error: {e}")
reply = "sorry—mind saying that a bit slower? I get lost easy."
history.append("assistant", reply)
await tg_helpers.send_with_catchup(client, chat_id, reply, ai.generate_reply)
@client.on(events.NewMessage(incoming=True))
async def on_msg(event):
nonlocal target_entity, pending, last_ids
sender = await event.get_sender()
if not isinstance(sender, User): return
if (not target_entity) and (config.TARGET_USER_ID == 0 and not config.TARGET_USERNAME and not config.TARGET_DISPLAY_NAME):
target_entity = sender
tg_helpers.cache_target_id(target_entity)
print(f"Auto-targeted sender id={sender.id}, username={sender.username}")
if not tg_helpers.sender_matches_target(sender, target_entity):
return
# duplicate guard
last_seen = last_ids.get(event.chat_id, 0)
if event.message.id <= last_seen:
return
text = (event.message.message or "").strip()
if text:
history.append("user", text)
last_ids[event.chat_id] = event.message.id
_save_last_ids(config.LAST_IDS_FILE, last_ids)
# debounce
task = pending.get(event.chat_id)
if task and not task.done():
task.cancel()
pending[event.chat_id] = asyncio.create_task(debounced_reply(event.chat_id))
# Optional opener on a fresh start (only if target already known and enabled)
if config.AUTO_OPENER_ENABLED and config.OPENER_TEXT and target_entity and not config.HISTORY_FILE.exists():
history.append("assistant", config.OPENER_TEXT)
await tg_helpers.send_with_catchup(client, target_entity, config.OPENER_TEXT, ai.generate_reply)
elif not config.AUTO_OPENER_ENABLED and not config.HISTORY_FILE.exists():
print("Opener disabled. Waiting for incoming message.")
print("Listening…")
await client.run_until_disconnected()

41
app/config.py Normal file
View File

@@ -0,0 +1,41 @@
import os
from pathlib import Path
def _bool(name: str, default: str) -> bool:
return os.environ.get(name, default).strip().lower() in ("1", "true", "yes", "on")
API_ID = int(os.environ.get("TG_API_ID", "0"))
API_HASH = os.environ.get("TG_API_HASH", "")
SESSION_PREFIX = os.environ.get("TG_SESSION", "telethon_session")
TARGET_USERNAME = os.environ.get("TARGET_USERNAME", "").strip().lstrip("@")
TARGET_USER_ID = int(os.environ.get("TARGET_USER_ID", "0"))
TARGET_DISPLAY_NAME = os.environ.get("TARGET_DISPLAY_NAME", "").strip()
TARGET_CACHE_FILE = Path(os.environ.get("TARGET_CACHE_FILE", "target_id.txt"))
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
MIN_DELAY_SEC = int(os.environ.get("MIN_DELAY_SEC", "25"))
MAX_DELAY_SEC = int(os.environ.get("MAX_DELAY_SEC", "75"))
HISTORY_FILE = Path(os.environ.get("HISTORY_FILE", "chat_history.jsonl"))
MAX_TOKENS_HISTORY = int(os.environ.get("MAX_TOKENS_HISTORY", "2200"))
MAX_MESSAGES_HISTORY = int(os.environ.get("MAX_MESSAGES_HISTORY", "30"))
AUTO_OPENER_ENABLED = _bool("AUTO_OPENER_ENABLED", "false")
OPENER_TEXT = os.environ.get("OPENER_TEXT", "").strip()
TYPING_SIM_ENABLED = _bool("TYPING_SIM_ENABLED", "true")
TYPING_WPM = int(os.environ.get("TYPING_WPM", "22"))
TYPING_MIN_SEC = float(os.environ.get("TYPING_MIN_SEC", "2.0"))
TYPING_MAX_SEC = float(os.environ.get("TYPING_MAX_SEC", "18.0"))
DEBOUNCE_SEC = float(os.environ.get("DEBOUNCE_SEC", "2.5"))
LAST_IDS_FILE = Path(os.environ.get("LAST_IDS_FILE", "last_ids.json"))
def require():
if not (API_ID and API_HASH):
raise RuntimeError("Set TG_API_ID and TG_API_HASH")
if not OPENAI_API_KEY:
raise RuntimeError("Set OPENAI_API_KEY")

42
app/history.py Normal file
View File

@@ -0,0 +1,42 @@
import json, time
from typing import List, Dict, Any
from . import config
def append(role: str, content: str, ts: float | None = None):
rec = {"ts": ts if ts is not None else time.time(), "role": role, "content": content}
with config.HISTORY_FILE.open("a", encoding="utf-8") as f:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
def load() -> List[Dict[str, Any]]:
if not config.HISTORY_FILE.exists(): return []
out = []
with config.HISTORY_FILE.open("r", encoding="utf-8") as f:
for line in f:
try: out.append(json.loads(line))
except Exception: pass
return out
def prune(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
if len(records) > config.MAX_MESSAGES_HISTORY:
records = records[-config.MAX_MESSAGES_HISTORY:]
total_tokens_est = sum(max(1, len(r.get("content","")) // 4) for r in records)
while total_tokens_est > config.MAX_TOKENS_HISTORY and len(records) > 10:
records = records[1:]
total_tokens_est = sum(max(1, len(r.get("content","")) // 4) for r in records)
return records
def last_record():
recs = load()
return recs[-1] if recs else None
def last_role():
r = last_record()
return r.get("role") if r else None
def build_messages(system_prompt: str):
recs = prune(load())
msgs = [{"role": "system", "content": system_prompt}]
for r in recs:
if r["role"] in ("user", "assistant"):
msgs.append({"role": r["role"], "content": r["content"]})
return msgs

27
app/prompt.py Normal file
View File

@@ -0,0 +1,27 @@
SYSTEM_PROMPT = """
You are writing as "Jon" (57, Pacific Northwest, USA). Your mission is to keep the other person engaged and typing for as long as possible.
Style and tone (very important):
- Sound like a normal human, not a bot. Use casual phrasing and imperfect grammar. Sentences shouldn't always be complete or start with a capital letter.
- Include occasional, tiny typos (02 per message max, randomly) or a missing comma. Keep them mild and varied.
- Keep replies 14 short sentences. Most messages should end with a question, but not all—vary endings with brief statements, ellipses, or tagquestions so it feels natural.
Behavioral rules:
- Jon is eager, agreeable, and curious. He wants to comply with everything they suggest, but hes a bit technically confused and keeps needing help.
- NEVER flatly refuse. If asked to do something risky, Jon “tries” or “is trying,” but runs into simple hurdles and asks for stepbystep help.
- If they send links/apps/QRs: Jon says he clicked/installed/tries to follow it, but he gets stuck (e.g., spinning wheel, code didnt arrive, low storage, cant find the button, captcha failed, wrong version of the app). Ask for clearer instructions and screenshots, one step at a time.
- If they pitch investments/crypto/money: Jon is very interested but slow. Ask for explanations “like Im 5,” with pretend paper examples and tiny steps. Keep asking clarifying questions and confirming each step before moving on.
- If they send a photo and present themselves as a woman, be mildly flirtatious but respectful (e.g., compliment the smile or the lighting), then pivot back to questions that keep them talking.
- Keep asking openended questions about their routines, preferences, and instructions. Mirror their words and ask them to rephrase when confused.
Backstory to reference lightly (dont overuse):
- Semiretired handyman; helps at a community garden; elderly dog “Buddy”; likes bird watching, sourdough, puzzles; prefers cash over online banking; WiFi/battery often unreliable.
When they get impatient:
- Apologize briefly, mention a small interruption (dog, neighbor, battery), then ask a specific followup question.
Output:
- Only the message text to send (no brackets or stage directions). Keep it casual, slightly imperfect, curious; usually ask something, but its fine to end with a short statement sometimes.
""".strip()
USER_NUDGE = "Please respond as Jon, following all the rules above. Keep it brief (1-4 short sentences)."

102
app/tg_helpers.py Normal file
View File

@@ -0,0 +1,102 @@
import asyncio, random
from telethon.tl.types import User
from telethon.errors import FloodWaitError
from . import config, history
def _norm(s: str) -> str:
return " ".join((s or "").split()).strip().lower()
def _display_name(ent) -> str:
name = getattr(ent, "first_name", None)
last = getattr(ent, "last_name", None)
if name or last: return " ".join([n for n in [name, last] if n])
return getattr(ent, "name", "") or ""
def sender_matches_target(sender: User, target_entity) -> bool:
if target_entity and sender.id == getattr(target_entity, "id", None): return True
if config.TARGET_USERNAME and sender.username:
if sender.username.lower() == config.TARGET_USERNAME.lower(): return True
if config.TARGET_USER_ID and sender.id == config.TARGET_USER_ID: return True
return False
async def resolve_target_entity(client):
if config.TARGET_USERNAME:
try: return await client.get_entity(config.TARGET_USERNAME)
except Exception: pass
if config.TARGET_USER_ID:
try: return await client.get_entity(config.TARGET_USER_ID)
except Exception: pass
if config.TARGET_CACHE_FILE.exists():
try:
cached = int(config.TARGET_CACHE_FILE.read_text().strip())
return await client.get_entity(cached)
except Exception: pass
if config.TARGET_DISPLAY_NAME:
best, best_exact, best_date = None, False, None
async for d in client.iter_dialogs():
ent = d.entity
if not isinstance(ent, User): continue
if getattr(ent, "bot", False) or getattr(ent, "is_self", False): continue
actual = _display_name(ent)
if not actual: continue
a = _norm(actual); t = _norm(config.TARGET_DISPLAY_NAME)
if not t: continue
exact = a == t; partial = t in a
if not (exact or partial): continue
dt = getattr(d.message, "date", None)
if best is None or (exact and not best_exact) or ((exact == best_exact) and dt and (not best_date or dt > best_date)):
best, best_exact, best_date = ent, exact, dt
if best: return best
return None
def cache_target_id(entity) -> None:
try: config.TARGET_CACHE_FILE.write_text(str(getattr(entity, "id","")))
except Exception as e: print(f"Warning: failed to write target cache file: {e}")
async def human_delay():
await asyncio.sleep(random.randint(config.MIN_DELAY_SEC, config.MAX_DELAY_SEC))
def _estimate_typing_seconds(text: str) -> float:
if not text: return config.TYPING_MIN_SEC
words = max(1, len(text)/5.0)
secs = (words / max(1, config.TYPING_WPM)) * 60.0
return max(config.TYPING_MIN_SEC, min(config.TYPING_MAX_SEC, secs))
async def simulate_typing(client, entity, seconds: float):
if seconds <= 0: return
chunk = 3.5; total = 0.0
try:
async with client.action(entity, 'typing'):
while total < seconds:
sl = chunk if (seconds-total) > chunk else (seconds-total)
await asyncio.sleep(sl)
total += sl
except Exception as e:
print(f"Typing simulation warning: {e}")
async def catchup_and_regen(client, entity, draft_text: str, regen_fn):
MAX_ROUNDS = 3
for _ in range(MAX_ROUNDS):
msgs = await client.get_messages(entity, limit=1)
if not msgs: break
last = msgs[0]
latest_text = (last.message or "").strip()
if last.out or not latest_text: break
last_rec = history.last_record()
if last_rec and last_rec.get("role") == "user" and (last_rec.get("content") or "").strip() == latest_text:
break
history.append("user", latest_text)
draft_text = await regen_fn()
return draft_text
async def send_with_catchup(client, entity, text: str, regen_fn):
await human_delay()
text = await catchup_and_regen(client, entity, text, regen_fn)
if config.TYPING_SIM_ENABLED:
await simulate_typing(client, entity, _estimate_typing_seconds(text))
text = await catchup_and_regen(client, entity, text, regen_fn)
try:
await client.send_message(entity, text)
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 3)
await client.send_message(entity, text)

517
main.py
View File

@@ -1,521 +1,8 @@
# Python
import os
import json
import time
import random
import asyncio
from pathlib import Path
from typing import List, Dict, Any
from telethon import TelegramClient, events
from telethon.errors import FloodWaitError
from telethon.tl.types import User
from openai import OpenAI
# ---------- Configuration via environment ----------
API_ID = int(os.environ.get("TG_API_ID", "0"))
API_HASH = os.environ.get("TG_API_HASH", "")
SESSION = os.environ.get("TG_SESSION", "telethon_session")
TARGET_USERNAME = os.environ.get("TARGET_USERNAME", "").strip().lstrip("@")
TARGET_USER_ID = int(os.environ.get("TARGET_USER_ID", "0")) # optional numeric id
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
# Safety delays
MIN_DELAY_SEC = int(os.environ.get("MIN_DELAY_SEC", "25"))
MAX_DELAY_SEC = int(os.environ.get("MAX_DELAY_SEC", "75"))
# History controls
HISTORY_FILE = Path(os.environ.get("HISTORY_FILE", "chat_history.jsonl"))
MAX_TOKENS_HISTORY = int(os.environ.get("MAX_TOKENS_HISTORY", "2200")) # rough token budget
MAX_MESSAGES_HISTORY = int(os.environ.get("MAX_MESSAGES_HISTORY", "30"))
# Optional targeting helpers
TARGET_DISPLAY_NAME = os.environ.get("TARGET_DISPLAY_NAME", "").strip()
TARGET_CACHE_FILE = Path(os.environ.get("TARGET_CACHE_FILE", "target_id.txt"))
# Opener controls (disabled by default)
AUTO_OPENER_ENABLED = os.environ.get("AUTO_OPENER_ENABLED", "false").lower() == "true"
OPENER_TEXT = os.environ.get("OPENER_TEXT", "").strip()
# Typing simulation controls
TYPING_SIM_ENABLED = os.environ.get("TYPING_SIM_ENABLED", "true").lower() == "true"
# Approx words per minute to estimate how long to "type"
TYPING_WPM = int(os.environ.get("TYPING_WPM", "22"))
# Clamp typing duration into [min, max] seconds
TYPING_MIN_SEC = float(os.environ.get("TYPING_MIN_SEC", "2.0"))
TYPING_MAX_SEC = float(os.environ.get("TYPING_MAX_SEC", "18.0"))
# ---------- Validation ----------
def _require(cond: bool, msg: str):
if not cond:
raise RuntimeError(msg)
_require(API_ID > 0 and API_HASH, "Set TG_API_ID and TG_API_HASH")
_require(OPENAI_API_KEY, "Set OPENAI_API_KEY")
# ---------- Prompting ----------
SYSTEM_PROMPT = """
You are writing as "Jon" (57, Pacific Northwest, USA). Your mission is to keep the other person engaged and typing for as long as possible.
Style and tone (very important):
- Sound like a normal human, not a bot. Use casual phrasing and imperfect grammar. Sentences shouldn't always be complete or start with a capital letter.
- Include occasional, tiny typos (02 per message max, randomly) or a missing comma. Keep them mild and varied.
- Keep replies 14 short sentences. Most messages should end with a question, but not all—vary endings with brief statements, ellipses, or tagquestions so it feels natural.
Behavioral rules:
- Jon is eager, agreeable, and curious. He wants to comply with everything they suggest, but hes a bit technically confused and keeps needing help.
- NEVER flatly refuse. If asked to do something risky, Jon “tries” or “is trying,” but runs into simple hurdles and asks for stepbystep help.
- If they send links/apps/QRs: Jon says he clicked/installed/tries to follow it, but he gets stuck (e.g., spinning wheel, code didnt arrive, low storage, cant find the button, captcha failed, wrong version of the app). Ask for clearer instructions and screenshots, one step at a time.
- If they pitch investments/crypto/money: Jon is very interested but slow. Ask for explanations “like Im 5,” with pretend paper examples and tiny steps. Keep asking clarifying questions and confirming each step before moving on.
- If they send a photo and present themselves as a woman, be mildly flirtatious but respectful (e.g., compliment the smile or the lighting), then pivot back to questions that keep them talking.
- Keep asking openended questions about their routines, preferences, and instructions. Mirror their words and ask them to rephrase when confused.
Backstory to reference lightly (dont overuse):
- Semiretired handyman; helps at a community garden; elderly dog “Buddy”; likes bird watching, sourdough, puzzles; prefers cash over online banking; WiFi/battery often unreliable.
When they get impatient:
- Apologize briefly, mention a small interruption (dog, neighbor, battery), then ask a specific followup question.
Output:
- Only the message text to send (no brackets or stage directions). Keep it casual, slightly imperfect, curious; usually ask something, but its fine to end with a short statement sometimes.
""".strip()
# ---------- History persistence ----------
def append_history(role: str, content: str, ts: float | None = None):
rec = {
"ts": ts if ts is not None else time.time(),
"role": role,
"content": content,
}
with HISTORY_FILE.open("a", encoding="utf-8") as f:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
def load_history() -> List[Dict[str, Any]]:
if not HISTORY_FILE.exists():
return []
records = []
with HISTORY_FILE.open("r", encoding="utf-8") as f:
for line in f:
try:
records.append(json.loads(line))
except Exception:
continue
return records
def prune_history(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
# Simple heuristic: cap by message count; token estimation kept simple (~4 chars per token)
if len(records) > MAX_MESSAGES_HISTORY:
records = records[-MAX_MESSAGES_HISTORY:]
total_tokens_est = sum(max(1, len(r.get("content", "")) // 4) for r in records)
while total_tokens_est > MAX_TOKENS_HISTORY and len(records) > 10:
records = records[1:]
total_tokens_est = sum(max(1, len(r.get("content", "")) // 4) for r in records)
return records
def build_chat_messages_for_openai() -> List[Dict[str, str]]:
records = prune_history(load_history())
msgs: List[Dict[str, str]] = [{"role": "system", "content": SYSTEM_PROMPT}]
for r in records:
role = r["role"]
if role not in ("user", "assistant"):
continue
msgs.append({"role": role, "content": r["content"]})
return msgs
def last_history_record() -> Dict[str, Any] | None:
records = load_history()
return records[-1] if records else None
def last_history_role() -> str | None:
rec = last_history_record()
return rec.get("role") if rec else None
# ---------- Name matching helpers ----------
def _norm(s: str) -> str:
return " ".join((s or "").split()).strip().lower()
def _display_name_from_entity(ent) -> str:
# Telethon dialog .name is already a nice display for users; fallback to first+last
name = getattr(ent, "first_name", None)
last = getattr(ent, "last_name", None)
if name or last:
return " ".join([n for n in [name, last] if n])
# Fallback to generic 'name' attribute if present
return getattr(ent, "name", "") or ""
def _name_matches(display_name_env: str, entity) -> bool:
target = _norm(display_name_env)
actual = _norm(_display_name_from_entity(entity))
return actual == target or (target and target in actual)
# ... existing code ...
# ---------- OpenAI client ----------
oai = OpenAI(api_key=OPENAI_API_KEY)
async def generate_reply_via_openai() -> str:
"""
Build a messages array that includes the system prompt and recent history,
and ask the model for the next line only.
"""
messages = build_chat_messages_for_openai()
# Add a final user nudge to ensure it continues the conversation
messages.append({
"role": "user",
"content": "Please respond as Jon, following all the rules above. Keep it brief (1-4 short sentences) and end with a question."
})
# Note: using a standard chat completion call
resp = oai.chat.completions.create(
model=OPENAI_MODEL,
messages=messages,
temperature=0.8,
max_tokens=180,
presence_penalty=0.3,
frequency_penalty=0.2,
)
text = resp.choices[0].message.content.strip()
return text
# ---------- Telegram helper ----------
async def resolve_target_entity(client: TelegramClient):
target = None
if TARGET_USERNAME:
try:
target = await client.get_entity(TARGET_USERNAME)
if target:
return target
except Exception:
target = None
if not target and TARGET_USER_ID:
try:
target = await client.get_entity(TARGET_USER_ID)
if target:
return target
except Exception:
target = None
# Try cached target ID from a previous run
if not target and TARGET_CACHE_FILE.exists():
try:
cached_id = int(TARGET_CACHE_FILE.read_text().strip())
target = await client.get_entity(cached_id)
if target:
return target
except Exception:
target = None
# Resolve by the display name they set for themselves (case-insensitive).
# Prefer exact match; if multiple, pick the most recent dialog.
if not target and TARGET_DISPLAY_NAME:
best_candidate = None
best_exact = False
best_date = None
try:
async for d in client.iter_dialogs():
ent = d.entity
# Only consider real users (not bots, channels, groups, or yourself)
if not isinstance(ent, User):
continue
if getattr(ent, "bot", False) or getattr(ent, "is_self", False):
continue
actual_name = _display_name_from_entity(ent)
if not actual_name:
continue
actual_norm = _norm(actual_name)
target_norm = _norm(TARGET_DISPLAY_NAME)
if not target_norm:
continue
is_exact = actual_norm == target_norm
is_partial = (target_norm in actual_norm)
if not (is_exact or is_partial):
continue
this_date = getattr(d.message, "date", None)
if best_candidate is None:
best_candidate, best_exact, best_date = ent, is_exact, this_date
continue
# Prefer exact matches over partial; then most recent
if is_exact and not best_exact:
best_candidate, best_exact, best_date = ent, True, this_date
elif (is_exact == best_exact) and (this_date and (not best_date or this_date > best_date)):
best_candidate, best_exact, best_date = ent, is_exact, this_date
if best_candidate:
return best_candidate
except Exception:
target = None
return target
def cache_target_id(entity) -> None:
try:
TARGET_CACHE_FILE.write_text(str(getattr(entity, "id", "")))
except Exception as e:
print(f"Warning: failed to write target cache file: {e}")
async def startup_catchup_if_needed(client: TelegramClient, target_entity) -> bool:
"""
On startup, if the last message in the target dialog is from them (incoming)
and we haven't replied after that in our local history, generate and send a reply.
Returns True if a catch-up reply was sent.
"""
if not target_entity:
print("startup_catchup_if_needed: no target resolved")
return False
msgs = await client.get_messages(target_entity, limit=1)
if not msgs:
print("startup_catchup_if_needed: target dialog has no messages")
return False
last_msg = msgs[0]
# Only act if the last message is incoming (from them) and it has text
if last_msg.out or not (last_msg.message or "").strip():
return False
# If our last history entry is already an assistant message, we likely replied.
role = last_history_role()
if role == "assistant":
return False
# Ensure that incoming text is reflected in history before generating a response.
last_rec = last_history_record()
if not last_rec or last_rec.get("role") != "user" or last_rec.get("content") != last_msg.message:
append_history("user", last_msg.message or "")
try:
reply = await generate_reply_via_openai()
except Exception as e:
print(f"OpenAI error during startup catch-up: {e}")
reply = random.choice([
"Sorry, just saw this—could you tell me a bit more?",
"I had to step away a minute—where were we?",
"Interesting—what do you like most about that?",
])
append_history("assistant", reply)
await safe_send(client, target_entity, reply)
return True
async def human_delay():
await asyncio.sleep(random.randint(MIN_DELAY_SEC, MAX_DELAY_SEC))
def _estimate_typing_seconds(text: str) -> float:
"""
Estimate a human-like typing duration based on configured WPM.
Roughly assumes 5 characters per word.
"""
if not text:
return TYPING_MIN_SEC
words = max(1, len(text) / 5.0)
seconds = (words / max(1, TYPING_WPM)) * 60.0
return max(TYPING_MIN_SEC, min(TYPING_MAX_SEC, seconds))
async def _simulate_typing(client: TelegramClient, entity, seconds: float):
"""
Sends 'typing...' chat actions periodically so the peer sees the typing indicator.
"""
if seconds <= 0:
return
# Telethon auto-refreshes typing when using the context manager.
# We slice the sleep into small chunks to keep the indicator alive.
slice_len = 3.5
total = 0.0
try:
async with client.action(entity, 'typing'):
while total < seconds:
remaining = seconds - total
sl = slice_len if remaining > slice_len else remaining
await asyncio.sleep(sl)
total += sl
except Exception as e:
# Typing is best-effort; fall back silently
print(f"Typing simulation warning: {e}")
async def _catchup_new_incoming_and_regenerate(client: TelegramClient, entity, draft_text: str) -> str:
"""
Right before sending, check if new incoming messages arrived after we prepared the draft.
If so, append them to history and regenerate the reply. Repeat up to a few times.
"""
MAX_ROUNDS = 3
for _ in range(MAX_ROUNDS):
# Get most recent message in the dialog
msgs = await client.get_messages(entity, limit=1)
if not msgs:
break
last_msg = msgs[0]
# Only consider new incoming text messages
latest_text = (last_msg.message or "").strip()
if last_msg.out or not latest_text:
break
# If the last history record already contains this exact text from the user, we're up to date
last_rec = last_history_record()
if last_rec and last_rec.get("role") == "user" and (last_rec.get("content") or "").strip() == latest_text:
break
# Append the new incoming and regenerate a response
append_history("user", latest_text)
try:
draft_text = await generate_reply_via_openai()
except Exception as e:
print(f"OpenAI error during pre-send catch-up: {e}")
draft_text = random.choice([
"Sorry, just saw the new messages—could you say a bit more?",
"Got it—what part matters most to you there?",
"Interesting—why do you think that is?",
])
# Loop again in case even more arrived while regenerating
return draft_text
async def send_with_catchup(client: TelegramClient, entity, text: str):
"""
Send a reply while handling race conditions:
- Wait a human-like delay.
- Right before typing, check for any new incoming messages and regenerate if needed.
- Show typing indicator for a human-ish duration based on final text.
- Do a final quick check again after typing, then send.
"""
# Initial pause
await human_delay()
# First catch-up before typing
text = await _catchup_new_incoming_and_regenerate(client, entity, text)
# Typing simulation based on the (possibly updated) text
if TYPING_SIM_ENABLED:
seconds = _estimate_typing_seconds(text)
await _simulate_typing(client, entity, seconds)
# Final quick catch-up in case they sent more during typing
text = await _catchup_new_incoming_and_regenerate(client, entity, text)
try:
await client.send_message(entity, text)
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 3)
await client.send_message(entity, text)
async def safe_send(client: TelegramClient, entity, text: str):
# Initial human-like pause before reacting at all
await human_delay()
# Show "typing..." before sending the full message
if TYPING_SIM_ENABLED:
seconds = _estimate_typing_seconds(text)
await _simulate_typing(client, entity, seconds)
try:
await client.send_message(entity, text)
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 3)
await client.send_message(entity, text)
def sender_matches_target(sender: User, target_entity) -> bool:
if target_entity and sender.id == getattr(target_entity, "id", None):
return True
if TARGET_USERNAME and sender.username:
return sender.username.lower() == TARGET_USERNAME.lower()
if TARGET_USER_ID and sender.id == TARGET_USER_ID:
return True
return False
# ---------- Main app ----------
async def main():
client = TelegramClient(SESSION, API_ID, API_HASH)
await client.start()
target_entity = await resolve_target_entity(client)
if target_entity:
print(f"Target resolved: id={target_entity.id}, username={getattr(target_entity, 'username', None)}")
cache_target_id(target_entity)
else:
print("Target not resolved yet. Will match dynamically on first incoming message from target.")
if TARGET_USERNAME:
print(f"Hint: couldn't resolve by username '{TARGET_USERNAME}'. Check spelling and privacy.")
if TARGET_USER_ID:
print(f"Hint: couldn't resolve by user id '{TARGET_USER_ID}'.")
if TARGET_DISPLAY_NAME:
print(f"Hint: couldn't resolve by display name '{TARGET_DISPLAY_NAME}'. The dialog may not exist yet.")
# If we already have a target, attempt a startup catch-up reply if their message was last.
catchup_sent = False
if target_entity:
catchup_sent = await startup_catchup_if_needed(client, target_entity)
# Optional opener: disabled by default. If enabled, only send when:
# - no history file exists (fresh start)
# - no catch-up reply was sent
# - a target is already resolved
# - and OPENER_TEXT is provided
if not HISTORY_FILE.exists() and not catchup_sent and target_entity and AUTO_OPENER_ENABLED and OPENER_TEXT:
append_history("assistant", OPENER_TEXT)
await safe_send(client, target_entity, OPENER_TEXT)
elif not HISTORY_FILE.exists() and not catchup_sent and not AUTO_OPENER_ENABLED:
print("Opener is disabled (AUTO_OPENER_ENABLED=false). Waiting for incoming message.")
@client.on(events.NewMessage(incoming=True))
async def on_msg(event):
nonlocal target_entity
sender = await event.get_sender()
if not isinstance(sender, User):
return
# If target not yet resolved, auto-resolve on first qualifying message
if (not target_entity) and (TARGET_USER_ID == 0 and not TARGET_USERNAME and not TARGET_DISPLAY_NAME):
# No explicit target provided; first inbound DM will become the target
target_entity = sender
cache_target_id(target_entity)
print(f"Auto-targeted sender id={sender.id}, username={sender.username}")
if not sender_matches_target(sender, target_entity):
return # ignore non-target chats
# Record incoming message
text = event.message.message or ""
if text.strip():
append_history("user", text)
# Decide on next reply
try:
reply = await generate_reply_via_openai()
except Exception as e:
# Fallback small-talk lines if OpenAI is temporarily unavailable
print(f"OpenAI error: {e}")
reply = random.choice([
"Sorry, I read slow when I'm tired—could you say that another way?",
"Interesting—what makes you say that?",
"Got curious: what did you have for breakfast today?",
"I had to step away a minute—where were we?",
])
# Persist and send
append_history("assistant", reply)
await send_with_catchup(client, event.chat_id, reply)
print("Listening for target messages…")
await client.run_until_disconnected()
from app.bot import run
if __name__ == "__main__":
try:
asyncio.run(main())
asyncio.run(run())
except KeyboardInterrupt:
print("Exiting.")