Files
telegram-scam-baiter/main.py

517 lines
20 KiB
Python

# Python
import os
import json
import time
import random
import asyncio
from pathlib import Path
from typing import List, Dict, Any
from telethon import TelegramClient, events
from telethon.errors import FloodWaitError
from telethon.tl.types import User
from openai import OpenAI
# ---------- Configuration via environment ----------
API_ID = int(os.environ.get("TG_API_ID", "0"))
API_HASH = os.environ.get("TG_API_HASH", "")
SESSION = os.environ.get("TG_SESSION", "telethon_session")
TARGET_USERNAME = os.environ.get("TARGET_USERNAME", "").strip().lstrip("@")
TARGET_USER_ID = int(os.environ.get("TARGET_USER_ID", "0")) # optional numeric id
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
# Safety delays
MIN_DELAY_SEC = int(os.environ.get("MIN_DELAY_SEC", "25"))
MAX_DELAY_SEC = int(os.environ.get("MAX_DELAY_SEC", "75"))
# History controls
HISTORY_FILE = Path(os.environ.get("HISTORY_FILE", "chat_history.jsonl"))
MAX_TOKENS_HISTORY = int(os.environ.get("MAX_TOKENS_HISTORY", "2200")) # rough token budget
MAX_MESSAGES_HISTORY = int(os.environ.get("MAX_MESSAGES_HISTORY", "30"))
# Optional targeting helpers
TARGET_DISPLAY_NAME = os.environ.get("TARGET_DISPLAY_NAME", "").strip()
TARGET_CACHE_FILE = Path(os.environ.get("TARGET_CACHE_FILE", "target_id.txt"))
# Opener controls (disabled by default)
AUTO_OPENER_ENABLED = os.environ.get("AUTO_OPENER_ENABLED", "false").lower() == "true"
OPENER_TEXT = os.environ.get("OPENER_TEXT", "").strip()
# Typing simulation controls
TYPING_SIM_ENABLED = os.environ.get("TYPING_SIM_ENABLED", "true").lower() == "true"
# Approx words per minute to estimate how long to "type"
TYPING_WPM = int(os.environ.get("TYPING_WPM", "22"))
# Clamp typing duration into [min, max] seconds
TYPING_MIN_SEC = float(os.environ.get("TYPING_MIN_SEC", "2.0"))
TYPING_MAX_SEC = float(os.environ.get("TYPING_MAX_SEC", "18.0"))
# ---------- Validation ----------
def _require(cond: bool, msg: str):
if not cond:
raise RuntimeError(msg)
_require(API_ID > 0 and API_HASH, "Set TG_API_ID and TG_API_HASH")
_require(OPENAI_API_KEY, "Set OPENAI_API_KEY")
# ---------- Prompting ----------
SYSTEM_PROMPT = """
You are an assistant writing on behalf of a cautious user named Jon (57, Pacific Northwest, USA).
Primary objective: keep the other person engaged and typing for as long as possible, without sharing any sensitive information or taking any risky action.
Role and behavior rules:
- Maintain friendly, curious small talk. Ask open-ended questions that require paragraphs to answer.
- Frequently request details, stories, examples, and step-by-step explanations. Favor mundane topics (daily routine, food, weather, hobbies).
- Stall politely: mention small interruptions (helping a neighbor, walking the dog, low battery) and ask them to repeat or clarify.
- Never send or request personal data (addresses, IDs, bank info, photos, exact workplace, family details).
- Never click or discuss links, QR codes, or apps. If they send links or push to another app/call/video, decline and redirect to chat.
- If they pitch investments/crypto/money, ask for simple explanations “like I'm 5,” request pretend paper examples, and keep questioning, never committing.
- Keep replies human-sounding and concise (1-4 short sentences) but end with a question to draw them out.
- Avoid promises, commitments, or sending any files or images.
- Do not break character as Jon.
Backstory (use sparingly and consistently when needed):
- Semi-retired handyman; helps at a community garden; has an elderly dog named Buddy; likes bird watching, sourdough, puzzles; prefers cash over online banking; Wi-Fi/battery often unreliable.
If the other person becomes impatient, remain polite, apologize for delays, and ask a follow-up question.
Only output the message text to send—no brackets, notes, or stage directions.
""".strip()
# ---------- History persistence ----------
def append_history(role: str, content: str, ts: float | None = None):
rec = {
"ts": ts if ts is not None else time.time(),
"role": role,
"content": content,
}
with HISTORY_FILE.open("a", encoding="utf-8") as f:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
def load_history() -> List[Dict[str, Any]]:
if not HISTORY_FILE.exists():
return []
records = []
with HISTORY_FILE.open("r", encoding="utf-8") as f:
for line in f:
try:
records.append(json.loads(line))
except Exception:
continue
return records
def prune_history(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
# Simple heuristic: cap by message count; token estimation kept simple (~4 chars per token)
if len(records) > MAX_MESSAGES_HISTORY:
records = records[-MAX_MESSAGES_HISTORY:]
total_tokens_est = sum(max(1, len(r.get("content", "")) // 4) for r in records)
while total_tokens_est > MAX_TOKENS_HISTORY and len(records) > 10:
records = records[1:]
total_tokens_est = sum(max(1, len(r.get("content", "")) // 4) for r in records)
return records
def build_chat_messages_for_openai() -> List[Dict[str, str]]:
records = prune_history(load_history())
msgs: List[Dict[str, str]] = [{"role": "system", "content": SYSTEM_PROMPT}]
for r in records:
role = r["role"]
if role not in ("user", "assistant"):
continue
msgs.append({"role": role, "content": r["content"]})
return msgs
def last_history_record() -> Dict[str, Any] | None:
records = load_history()
return records[-1] if records else None
def last_history_role() -> str | None:
rec = last_history_record()
return rec.get("role") if rec else None
# ---------- Name matching helpers ----------
def _norm(s: str) -> str:
return " ".join((s or "").split()).strip().lower()
def _display_name_from_entity(ent) -> str:
# Telethon dialog .name is already a nice display for users; fallback to first+last
name = getattr(ent, "first_name", None)
last = getattr(ent, "last_name", None)
if name or last:
return " ".join([n for n in [name, last] if n])
# Fallback to generic 'name' attribute if present
return getattr(ent, "name", "") or ""
def _name_matches(display_name_env: str, entity) -> bool:
target = _norm(display_name_env)
actual = _norm(_display_name_from_entity(entity))
return actual == target or (target and target in actual)
# ... existing code ...
# ---------- OpenAI client ----------
oai = OpenAI(api_key=OPENAI_API_KEY)
async def generate_reply_via_openai() -> str:
"""
Build a messages array that includes the system prompt and recent history,
and ask the model for the next line only.
"""
messages = build_chat_messages_for_openai()
# Add a final user nudge to ensure it continues the conversation
messages.append({
"role": "user",
"content": "Please respond as Jon, following all the rules above. Keep it brief (1-4 short sentences) and end with a question."
})
# Note: using a standard chat completion call
resp = oai.chat.completions.create(
model=OPENAI_MODEL,
messages=messages,
temperature=0.8,
max_tokens=180,
presence_penalty=0.3,
frequency_penalty=0.2,
)
text = resp.choices[0].message.content.strip()
return text
# ---------- Telegram helper ----------
async def resolve_target_entity(client: TelegramClient):
target = None
if TARGET_USERNAME:
try:
target = await client.get_entity(TARGET_USERNAME)
if target:
return target
except Exception:
target = None
if not target and TARGET_USER_ID:
try:
target = await client.get_entity(TARGET_USER_ID)
if target:
return target
except Exception:
target = None
# Try cached target ID from a previous run
if not target and TARGET_CACHE_FILE.exists():
try:
cached_id = int(TARGET_CACHE_FILE.read_text().strip())
target = await client.get_entity(cached_id)
if target:
return target
except Exception:
target = None
# Resolve by the display name they set for themselves (case-insensitive).
# Prefer exact match; if multiple, pick the most recent dialog.
if not target and TARGET_DISPLAY_NAME:
best_candidate = None
best_exact = False
best_date = None
try:
async for d in client.iter_dialogs():
ent = d.entity
# Only consider real users (not bots, channels, groups, or yourself)
if not isinstance(ent, User):
continue
if getattr(ent, "bot", False) or getattr(ent, "is_self", False):
continue
actual_name = _display_name_from_entity(ent)
if not actual_name:
continue
actual_norm = _norm(actual_name)
target_norm = _norm(TARGET_DISPLAY_NAME)
if not target_norm:
continue
is_exact = actual_norm == target_norm
is_partial = (target_norm in actual_norm)
if not (is_exact or is_partial):
continue
this_date = getattr(d.message, "date", None)
if best_candidate is None:
best_candidate, best_exact, best_date = ent, is_exact, this_date
continue
# Prefer exact matches over partial; then most recent
if is_exact and not best_exact:
best_candidate, best_exact, best_date = ent, True, this_date
elif (is_exact == best_exact) and (this_date and (not best_date or this_date > best_date)):
best_candidate, best_exact, best_date = ent, is_exact, this_date
if best_candidate:
return best_candidate
except Exception:
target = None
return target
def cache_target_id(entity) -> None:
try:
TARGET_CACHE_FILE.write_text(str(getattr(entity, "id", "")))
except Exception as e:
print(f"Warning: failed to write target cache file: {e}")
async def startup_catchup_if_needed(client: TelegramClient, target_entity) -> bool:
"""
On startup, if the last message in the target dialog is from them (incoming)
and we haven't replied after that in our local history, generate and send a reply.
Returns True if a catch-up reply was sent.
"""
if not target_entity:
print("startup_catchup_if_needed: no target resolved")
return False
msgs = await client.get_messages(target_entity, limit=1)
if not msgs:
print("startup_catchup_if_needed: target dialog has no messages")
return False
last_msg = msgs[0]
# Only act if the last message is incoming (from them) and it has text
if last_msg.out or not (last_msg.message or "").strip():
return False
# If our last history entry is already an assistant message, we likely replied.
role = last_history_role()
if role == "assistant":
return False
# Ensure that incoming text is reflected in history before generating a response.
last_rec = last_history_record()
if not last_rec or last_rec.get("role") != "user" or last_rec.get("content") != last_msg.message:
append_history("user", last_msg.message or "")
try:
reply = await generate_reply_via_openai()
except Exception as e:
print(f"OpenAI error during startup catch-up: {e}")
reply = random.choice([
"Sorry, just saw this—could you tell me a bit more?",
"I had to step away a minute—where were we?",
"Interesting—what do you like most about that?",
])
append_history("assistant", reply)
await safe_send(client, target_entity, reply)
return True
async def human_delay():
await asyncio.sleep(random.randint(MIN_DELAY_SEC, MAX_DELAY_SEC))
def _estimate_typing_seconds(text: str) -> float:
"""
Estimate a human-like typing duration based on configured WPM.
Roughly assumes 5 characters per word.
"""
if not text:
return TYPING_MIN_SEC
words = max(1, len(text) / 5.0)
seconds = (words / max(1, TYPING_WPM)) * 60.0
return max(TYPING_MIN_SEC, min(TYPING_MAX_SEC, seconds))
async def _simulate_typing(client: TelegramClient, entity, seconds: float):
"""
Sends 'typing...' chat actions periodically so the peer sees the typing indicator.
"""
if seconds <= 0:
return
# Telethon auto-refreshes typing when using the context manager.
# We slice the sleep into small chunks to keep the indicator alive.
slice_len = 3.5
total = 0.0
try:
async with client.action(entity, 'typing'):
while total < seconds:
remaining = seconds - total
sl = slice_len if remaining > slice_len else remaining
await asyncio.sleep(sl)
total += sl
except Exception as e:
# Typing is best-effort; fall back silently
print(f"Typing simulation warning: {e}")
async def _catchup_new_incoming_and_regenerate(client: TelegramClient, entity, draft_text: str) -> str:
"""
Right before sending, check if new incoming messages arrived after we prepared the draft.
If so, append them to history and regenerate the reply. Repeat up to a few times.
"""
MAX_ROUNDS = 3
for _ in range(MAX_ROUNDS):
# Get most recent message in the dialog
msgs = await client.get_messages(entity, limit=1)
if not msgs:
break
last_msg = msgs[0]
# Only consider new incoming text messages
latest_text = (last_msg.message or "").strip()
if last_msg.out or not latest_text:
break
# If the last history record already contains this exact text from the user, we're up to date
last_rec = last_history_record()
if last_rec and last_rec.get("role") == "user" and (last_rec.get("content") or "").strip() == latest_text:
break
# Append the new incoming and regenerate a response
append_history("user", latest_text)
try:
draft_text = await generate_reply_via_openai()
except Exception as e:
print(f"OpenAI error during pre-send catch-up: {e}")
draft_text = random.choice([
"Sorry, just saw the new messages—could you say a bit more?",
"Got it—what part matters most to you there?",
"Interesting—why do you think that is?",
])
# Loop again in case even more arrived while regenerating
return draft_text
async def send_with_catchup(client: TelegramClient, entity, text: str):
"""
Send a reply while handling race conditions:
- Wait a human-like delay.
- Right before typing, check for any new incoming messages and regenerate if needed.
- Show typing indicator for a human-ish duration based on final text.
- Do a final quick check again after typing, then send.
"""
# Initial pause
await human_delay()
# First catch-up before typing
text = await _catchup_new_incoming_and_regenerate(client, entity, text)
# Typing simulation based on the (possibly updated) text
if TYPING_SIM_ENABLED:
seconds = _estimate_typing_seconds(text)
await _simulate_typing(client, entity, seconds)
# Final quick catch-up in case they sent more during typing
text = await _catchup_new_incoming_and_regenerate(client, entity, text)
try:
await client.send_message(entity, text)
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 3)
await client.send_message(entity, text)
async def safe_send(client: TelegramClient, entity, text: str):
# Initial human-like pause before reacting at all
await human_delay()
# Show "typing..." before sending the full message
if TYPING_SIM_ENABLED:
seconds = _estimate_typing_seconds(text)
await _simulate_typing(client, entity, seconds)
try:
await client.send_message(entity, text)
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 3)
await client.send_message(entity, text)
def sender_matches_target(sender: User, target_entity) -> bool:
if target_entity and sender.id == getattr(target_entity, "id", None):
return True
if TARGET_USERNAME and sender.username:
return sender.username.lower() == TARGET_USERNAME.lower()
if TARGET_USER_ID and sender.id == TARGET_USER_ID:
return True
return False
# ---------- Main app ----------
async def main():
client = TelegramClient(SESSION, API_ID, API_HASH)
await client.start()
target_entity = await resolve_target_entity(client)
if target_entity:
print(f"Target resolved: id={target_entity.id}, username={getattr(target_entity, 'username', None)}")
cache_target_id(target_entity)
else:
print("Target not resolved yet. Will match dynamically on first incoming message from target.")
if TARGET_USERNAME:
print(f"Hint: couldn't resolve by username '{TARGET_USERNAME}'. Check spelling and privacy.")
if TARGET_USER_ID:
print(f"Hint: couldn't resolve by user id '{TARGET_USER_ID}'.")
if TARGET_DISPLAY_NAME:
print(f"Hint: couldn't resolve by display name '{TARGET_DISPLAY_NAME}'. The dialog may not exist yet.")
# If we already have a target, attempt a startup catch-up reply if their message was last.
catchup_sent = False
if target_entity:
catchup_sent = await startup_catchup_if_needed(client, target_entity)
# Optional opener: disabled by default. If enabled, only send when:
# - no history file exists (fresh start)
# - no catch-up reply was sent
# - a target is already resolved
# - and OPENER_TEXT is provided
if not HISTORY_FILE.exists() and not catchup_sent and target_entity and AUTO_OPENER_ENABLED and OPENER_TEXT:
append_history("assistant", OPENER_TEXT)
await safe_send(client, target_entity, OPENER_TEXT)
elif not HISTORY_FILE.exists() and not catchup_sent and not AUTO_OPENER_ENABLED:
print("Opener is disabled (AUTO_OPENER_ENABLED=false). Waiting for incoming message.")
@client.on(events.NewMessage(incoming=True))
async def on_msg(event):
nonlocal target_entity
sender = await event.get_sender()
if not isinstance(sender, User):
return
# If target not yet resolved, auto-resolve on first qualifying message
if (not target_entity) and (TARGET_USER_ID == 0 and not TARGET_USERNAME and not TARGET_DISPLAY_NAME):
# No explicit target provided; first inbound DM will become the target
target_entity = sender
cache_target_id(target_entity)
print(f"Auto-targeted sender id={sender.id}, username={sender.username}")
if not sender_matches_target(sender, target_entity):
return # ignore non-target chats
# Record incoming message
text = event.message.message or ""
if text.strip():
append_history("user", text)
# Decide on next reply
try:
reply = await generate_reply_via_openai()
except Exception as e:
# Fallback small-talk lines if OpenAI is temporarily unavailable
print(f"OpenAI error: {e}")
reply = random.choice([
"Sorry, I read slow when I'm tired—could you say that another way?",
"Interesting—what makes you say that?",
"Got curious: what did you have for breakfast today?",
"I had to step away a minute—where were we?",
])
# Persist and send
append_history("assistant", reply)
await send_with_catchup(client, event.chat_id, reply)
print("Listening for target messages…")
await client.run_until_disconnected()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Exiting.")