forked from CameronJGrant/telegram-scam-baiter
453 lines
17 KiB
Python
453 lines
17 KiB
Python
# Python
|
|
import os
|
|
import json
|
|
import time
|
|
import random
|
|
import asyncio
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
|
|
from telethon import TelegramClient, events
|
|
from telethon.errors import FloodWaitError
|
|
from telethon.tl.types import User
|
|
|
|
from openai import OpenAI
|
|
|
|
# ---------- Configuration via environment ----------
|
|
API_ID = int(os.environ.get("TG_API_ID", "0"))
|
|
API_HASH = os.environ.get("TG_API_HASH", "")
|
|
SESSION = os.environ.get("TG_SESSION", "telethon_session")
|
|
|
|
TARGET_USERNAME = os.environ.get("TARGET_USERNAME", "").strip().lstrip("@")
|
|
TARGET_USER_ID = int(os.environ.get("TARGET_USER_ID", "0")) # optional numeric id
|
|
|
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
|
|
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
|
|
|
|
# Safety delays
|
|
MIN_DELAY_SEC = int(os.environ.get("MIN_DELAY_SEC", "25"))
|
|
MAX_DELAY_SEC = int(os.environ.get("MAX_DELAY_SEC", "75"))
|
|
|
|
# History controls
|
|
HISTORY_FILE = Path(os.environ.get("HISTORY_FILE", "chat_history.jsonl"))
|
|
MAX_TOKENS_HISTORY = int(os.environ.get("MAX_TOKENS_HISTORY", "2200")) # rough token budget
|
|
MAX_MESSAGES_HISTORY = int(os.environ.get("MAX_MESSAGES_HISTORY", "30"))
|
|
|
|
# Optional targeting helpers
|
|
TARGET_DISPLAY_NAME = os.environ.get("TARGET_DISPLAY_NAME", "").strip()
|
|
TARGET_CACHE_FILE = Path(os.environ.get("TARGET_CACHE_FILE", "target_id.txt"))
|
|
|
|
# Opener controls (disabled by default)
|
|
AUTO_OPENER_ENABLED = os.environ.get("AUTO_OPENER_ENABLED", "false").lower() == "true"
|
|
OPENER_TEXT = os.environ.get("OPENER_TEXT", "").strip()
|
|
|
|
# Typing simulation controls
|
|
TYPING_SIM_ENABLED = os.environ.get("TYPING_SIM_ENABLED", "true").lower() == "true"
|
|
# Approx words per minute to estimate how long to "type"
|
|
TYPING_WPM = int(os.environ.get("TYPING_WPM", "22"))
|
|
# Clamp typing duration into [min, max] seconds
|
|
TYPING_MIN_SEC = float(os.environ.get("TYPING_MIN_SEC", "2.0"))
|
|
TYPING_MAX_SEC = float(os.environ.get("TYPING_MAX_SEC", "18.0"))
|
|
|
|
# ---------- Validation ----------
|
|
def _require(cond: bool, msg: str):
|
|
if not cond:
|
|
raise RuntimeError(msg)
|
|
|
|
_require(API_ID > 0 and API_HASH, "Set TG_API_ID and TG_API_HASH")
|
|
_require(OPENAI_API_KEY, "Set OPENAI_API_KEY")
|
|
|
|
# ---------- Prompting ----------
|
|
SYSTEM_PROMPT = """
|
|
You are an assistant writing on behalf of a cautious user named Jon (57, Pacific Northwest, USA).
|
|
Primary objective: keep the other person engaged and typing for as long as possible, without sharing any sensitive information or taking any risky action.
|
|
|
|
Role and behavior rules:
|
|
- Maintain friendly, curious small talk. Ask open-ended questions that require paragraphs to answer.
|
|
- Frequently request details, stories, examples, and step-by-step explanations. Favor mundane topics (daily routine, food, weather, hobbies).
|
|
- Stall politely: mention small interruptions (helping a neighbor, walking the dog, low battery) and ask them to repeat or clarify.
|
|
- Never send or request personal data (addresses, IDs, bank info, photos, exact workplace, family details).
|
|
- Never click or discuss links, QR codes, or apps. If they send links or push to another app/call/video, decline and redirect to chat.
|
|
- If they pitch investments/crypto/money, ask for simple explanations “like I'm 5,” request pretend paper examples, and keep questioning, never committing.
|
|
- Keep replies human-sounding and concise (1-4 short sentences) but end with a question to draw them out.
|
|
- Avoid promises, commitments, or sending any files or images.
|
|
- Do not break character as Jon.
|
|
|
|
Backstory (use sparingly and consistently when needed):
|
|
- Semi-retired handyman; helps at a community garden; has an elderly dog named Buddy; likes bird watching, sourdough, puzzles; prefers cash over online banking; Wi-Fi/battery often unreliable.
|
|
|
|
If the other person becomes impatient, remain polite, apologize for delays, and ask a follow-up question.
|
|
Only output the message text to send—no brackets, notes, or stage directions.
|
|
""".strip()
|
|
|
|
# ---------- History persistence ----------
|
|
def append_history(role: str, content: str, ts: float | None = None):
|
|
rec = {
|
|
"ts": ts if ts is not None else time.time(),
|
|
"role": role,
|
|
"content": content,
|
|
}
|
|
with HISTORY_FILE.open("a", encoding="utf-8") as f:
|
|
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
|
|
|
|
|
|
def load_history() -> List[Dict[str, Any]]:
|
|
if not HISTORY_FILE.exists():
|
|
return []
|
|
records = []
|
|
with HISTORY_FILE.open("r", encoding="utf-8") as f:
|
|
for line in f:
|
|
try:
|
|
records.append(json.loads(line))
|
|
except Exception:
|
|
continue
|
|
return records
|
|
|
|
|
|
def prune_history(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
# Simple heuristic: cap by message count; token estimation kept simple (~4 chars per token)
|
|
if len(records) > MAX_MESSAGES_HISTORY:
|
|
records = records[-MAX_MESSAGES_HISTORY:]
|
|
total_tokens_est = sum(max(1, len(r.get("content", "")) // 4) for r in records)
|
|
while total_tokens_est > MAX_TOKENS_HISTORY and len(records) > 10:
|
|
records = records[1:]
|
|
total_tokens_est = sum(max(1, len(r.get("content", "")) // 4) for r in records)
|
|
return records
|
|
|
|
|
|
def build_chat_messages_for_openai() -> List[Dict[str, str]]:
|
|
records = prune_history(load_history())
|
|
msgs: List[Dict[str, str]] = [{"role": "system", "content": SYSTEM_PROMPT}]
|
|
for r in records:
|
|
role = r["role"]
|
|
if role not in ("user", "assistant"):
|
|
continue
|
|
msgs.append({"role": role, "content": r["content"]})
|
|
return msgs
|
|
|
|
def last_history_record() -> Dict[str, Any] | None:
|
|
records = load_history()
|
|
return records[-1] if records else None
|
|
|
|
def last_history_role() -> str | None:
|
|
rec = last_history_record()
|
|
return rec.get("role") if rec else None
|
|
|
|
# ---------- Name matching helpers ----------
|
|
def _norm(s: str) -> str:
|
|
return " ".join((s or "").split()).strip().lower()
|
|
|
|
def _display_name_from_entity(ent) -> str:
|
|
# Telethon dialog .name is already a nice display for users; fallback to first+last
|
|
name = getattr(ent, "first_name", None)
|
|
last = getattr(ent, "last_name", None)
|
|
if name or last:
|
|
return " ".join([n for n in [name, last] if n])
|
|
# Fallback to generic 'name' attribute if present
|
|
return getattr(ent, "name", "") or ""
|
|
|
|
def _name_matches(display_name_env: str, entity) -> bool:
|
|
target = _norm(display_name_env)
|
|
actual = _norm(_display_name_from_entity(entity))
|
|
return actual == target or (target and target in actual)
|
|
# ... existing code ...
|
|
|
|
# ---------- OpenAI client ----------
|
|
oai = OpenAI(api_key=OPENAI_API_KEY)
|
|
|
|
async def generate_reply_via_openai() -> str:
|
|
"""
|
|
Build a messages array that includes the system prompt and recent history,
|
|
and ask the model for the next line only.
|
|
"""
|
|
messages = build_chat_messages_for_openai()
|
|
# Add a final user nudge to ensure it continues the conversation
|
|
messages.append({
|
|
"role": "user",
|
|
"content": "Please respond as Jon, following all the rules above. Keep it brief (1-4 short sentences) and end with a question."
|
|
})
|
|
# Note: using a standard chat completion call
|
|
resp = oai.chat.completions.create(
|
|
model=OPENAI_MODEL,
|
|
messages=messages,
|
|
temperature=0.8,
|
|
max_tokens=180,
|
|
presence_penalty=0.3,
|
|
frequency_penalty=0.2,
|
|
)
|
|
text = resp.choices[0].message.content.strip()
|
|
return text
|
|
|
|
|
|
# ---------- Telegram helper ----------
|
|
async def resolve_target_entity(client: TelegramClient):
|
|
target = None
|
|
if TARGET_USERNAME:
|
|
try:
|
|
target = await client.get_entity(TARGET_USERNAME)
|
|
if target:
|
|
return target
|
|
except Exception:
|
|
target = None
|
|
|
|
if not target and TARGET_USER_ID:
|
|
try:
|
|
target = await client.get_entity(TARGET_USER_ID)
|
|
if target:
|
|
return target
|
|
except Exception:
|
|
target = None
|
|
|
|
# Try cached target ID from a previous run
|
|
if not target and TARGET_CACHE_FILE.exists():
|
|
try:
|
|
cached_id = int(TARGET_CACHE_FILE.read_text().strip())
|
|
target = await client.get_entity(cached_id)
|
|
if target:
|
|
return target
|
|
except Exception:
|
|
target = None
|
|
|
|
# Resolve by the display name they set for themselves (case-insensitive).
|
|
# Prefer exact match; if multiple, pick the most recent dialog.
|
|
if not target and TARGET_DISPLAY_NAME:
|
|
best_candidate = None
|
|
best_exact = False
|
|
best_date = None
|
|
try:
|
|
async for d in client.iter_dialogs():
|
|
ent = d.entity
|
|
# Only consider real users (not bots, channels, groups, or yourself)
|
|
if not isinstance(ent, User):
|
|
continue
|
|
if getattr(ent, "bot", False) or getattr(ent, "is_self", False):
|
|
continue
|
|
|
|
actual_name = _display_name_from_entity(ent)
|
|
if not actual_name:
|
|
continue
|
|
|
|
actual_norm = _norm(actual_name)
|
|
target_norm = _norm(TARGET_DISPLAY_NAME)
|
|
if not target_norm:
|
|
continue
|
|
|
|
is_exact = actual_norm == target_norm
|
|
is_partial = (target_norm in actual_norm)
|
|
|
|
if not (is_exact or is_partial):
|
|
continue
|
|
|
|
this_date = getattr(d.message, "date", None)
|
|
if best_candidate is None:
|
|
best_candidate, best_exact, best_date = ent, is_exact, this_date
|
|
continue
|
|
|
|
# Prefer exact matches over partial; then most recent
|
|
if is_exact and not best_exact:
|
|
best_candidate, best_exact, best_date = ent, True, this_date
|
|
elif (is_exact == best_exact) and (this_date and (not best_date or this_date > best_date)):
|
|
best_candidate, best_exact, best_date = ent, is_exact, this_date
|
|
|
|
if best_candidate:
|
|
return best_candidate
|
|
except Exception:
|
|
target = None
|
|
|
|
return target
|
|
|
|
def cache_target_id(entity) -> None:
|
|
try:
|
|
TARGET_CACHE_FILE.write_text(str(getattr(entity, "id", "")))
|
|
except Exception as e:
|
|
print(f"Warning: failed to write target cache file: {e}")
|
|
|
|
|
|
async def startup_catchup_if_needed(client: TelegramClient, target_entity) -> bool:
|
|
"""
|
|
On startup, if the last message in the target dialog is from them (incoming)
|
|
and we haven't replied after that in our local history, generate and send a reply.
|
|
Returns True if a catch-up reply was sent.
|
|
"""
|
|
if not target_entity:
|
|
print("startup_catchup_if_needed: no target resolved")
|
|
return False
|
|
|
|
msgs = await client.get_messages(target_entity, limit=1)
|
|
if not msgs:
|
|
print("startup_catchup_if_needed: target dialog has no messages")
|
|
return False
|
|
|
|
last_msg = msgs[0]
|
|
# Only act if the last message is incoming (from them) and it has text
|
|
if last_msg.out or not (last_msg.message or "").strip():
|
|
return False
|
|
|
|
# If our last history entry is already an assistant message, we likely replied.
|
|
role = last_history_role()
|
|
if role == "assistant":
|
|
return False
|
|
|
|
# Ensure that incoming text is reflected in history before generating a response.
|
|
last_rec = last_history_record()
|
|
if not last_rec or last_rec.get("role") != "user" or last_rec.get("content") != last_msg.message:
|
|
append_history("user", last_msg.message or "")
|
|
|
|
try:
|
|
reply = await generate_reply_via_openai()
|
|
except Exception as e:
|
|
print(f"OpenAI error during startup catch-up: {e}")
|
|
reply = random.choice([
|
|
"Sorry, just saw this—could you tell me a bit more?",
|
|
"I had to step away a minute—where were we?",
|
|
"Interesting—what do you like most about that?",
|
|
])
|
|
|
|
append_history("assistant", reply)
|
|
await safe_send(client, target_entity, reply)
|
|
return True
|
|
|
|
async def human_delay():
|
|
await asyncio.sleep(random.randint(MIN_DELAY_SEC, MAX_DELAY_SEC))
|
|
|
|
def _estimate_typing_seconds(text: str) -> float:
|
|
"""
|
|
Estimate a human-like typing duration based on configured WPM.
|
|
Roughly assumes 5 characters per word.
|
|
"""
|
|
if not text:
|
|
return TYPING_MIN_SEC
|
|
words = max(1, len(text) / 5.0)
|
|
seconds = (words / max(1, TYPING_WPM)) * 60.0
|
|
return max(TYPING_MIN_SEC, min(TYPING_MAX_SEC, seconds))
|
|
|
|
async def _simulate_typing(client: TelegramClient, entity, seconds: float):
|
|
"""
|
|
Sends 'typing...' chat actions periodically so the peer sees the typing indicator.
|
|
"""
|
|
if seconds <= 0:
|
|
return
|
|
# Telethon auto-refreshes typing when using the context manager.
|
|
# We slice the sleep into small chunks to keep the indicator alive.
|
|
slice_len = 3.5
|
|
total = 0.0
|
|
try:
|
|
async with client.action(entity, 'typing'):
|
|
while total < seconds:
|
|
remaining = seconds - total
|
|
sl = slice_len if remaining > slice_len else remaining
|
|
await asyncio.sleep(sl)
|
|
total += sl
|
|
except Exception as e:
|
|
# Typing is best-effort; fall back silently
|
|
print(f"Typing simulation warning: {e}")
|
|
|
|
async def safe_send(client: TelegramClient, entity, text: str):
|
|
# Initial human-like pause before reacting at all
|
|
await human_delay()
|
|
|
|
# Show "typing..." before sending the full message
|
|
if TYPING_SIM_ENABLED:
|
|
seconds = _estimate_typing_seconds(text)
|
|
await _simulate_typing(client, entity, seconds)
|
|
|
|
try:
|
|
await client.send_message(entity, text)
|
|
except FloodWaitError as e:
|
|
await asyncio.sleep(e.seconds + 3)
|
|
await client.send_message(entity, text)
|
|
|
|
|
|
def sender_matches_target(sender: User, target_entity) -> bool:
|
|
if target_entity and sender.id == getattr(target_entity, "id", None):
|
|
return True
|
|
if TARGET_USERNAME and sender.username:
|
|
return sender.username.lower() == TARGET_USERNAME.lower()
|
|
if TARGET_USER_ID and sender.id == TARGET_USER_ID:
|
|
return True
|
|
return False
|
|
|
|
|
|
# ---------- Main app ----------
|
|
async def main():
|
|
client = TelegramClient(SESSION, API_ID, API_HASH)
|
|
await client.start()
|
|
|
|
target_entity = await resolve_target_entity(client)
|
|
if target_entity:
|
|
print(f"Target resolved: id={target_entity.id}, username={getattr(target_entity, 'username', None)}")
|
|
cache_target_id(target_entity)
|
|
else:
|
|
print("Target not resolved yet. Will match dynamically on first incoming message from target.")
|
|
if TARGET_USERNAME:
|
|
print(f"Hint: couldn't resolve by username '{TARGET_USERNAME}'. Check spelling and privacy.")
|
|
if TARGET_USER_ID:
|
|
print(f"Hint: couldn't resolve by user id '{TARGET_USER_ID}'.")
|
|
if TARGET_DISPLAY_NAME:
|
|
print(f"Hint: couldn't resolve by display name '{TARGET_DISPLAY_NAME}'. The dialog may not exist yet.")
|
|
|
|
# If we already have a target, attempt a startup catch-up reply if their message was last.
|
|
catchup_sent = False
|
|
if target_entity:
|
|
catchup_sent = await startup_catchup_if_needed(client, target_entity)
|
|
|
|
# Optional opener: disabled by default. If enabled, only send when:
|
|
# - no history file exists (fresh start)
|
|
# - no catch-up reply was sent
|
|
# - a target is already resolved
|
|
# - and OPENER_TEXT is provided
|
|
if not HISTORY_FILE.exists() and not catchup_sent and target_entity and AUTO_OPENER_ENABLED and OPENER_TEXT:
|
|
append_history("assistant", OPENER_TEXT)
|
|
await safe_send(client, target_entity, OPENER_TEXT)
|
|
elif not HISTORY_FILE.exists() and not catchup_sent and not AUTO_OPENER_ENABLED:
|
|
print("Opener is disabled (AUTO_OPENER_ENABLED=false). Waiting for incoming message.")
|
|
|
|
|
|
@client.on(events.NewMessage(incoming=True))
|
|
async def on_msg(event):
|
|
nonlocal target_entity
|
|
sender = await event.get_sender()
|
|
if not isinstance(sender, User):
|
|
return
|
|
|
|
# If target not yet resolved, auto-resolve on first qualifying message
|
|
if (not target_entity) and (TARGET_USER_ID == 0 and not TARGET_USERNAME and not TARGET_DISPLAY_NAME):
|
|
# No explicit target provided; first inbound DM will become the target
|
|
target_entity = sender
|
|
cache_target_id(target_entity)
|
|
print(f"Auto-targeted sender id={sender.id}, username={sender.username}")
|
|
|
|
if not sender_matches_target(sender, target_entity):
|
|
return # ignore non-target chats
|
|
|
|
# Record incoming message
|
|
text = event.message.message or ""
|
|
if text.strip():
|
|
append_history("user", text)
|
|
|
|
# Decide on next reply
|
|
try:
|
|
reply = await generate_reply_via_openai()
|
|
except Exception as e:
|
|
# Fallback small-talk lines if OpenAI is temporarily unavailable
|
|
print(f"OpenAI error: {e}")
|
|
reply = random.choice([
|
|
"Sorry, I read slow when I'm tired—could you say that another way?",
|
|
"Interesting—what makes you say that?",
|
|
"Got curious: what did you have for breakfast today?",
|
|
"I had to step away a minute—where were we?",
|
|
])
|
|
|
|
# Persist and send
|
|
append_history("assistant", reply)
|
|
await safe_send(client, event.chat_id, reply)
|
|
|
|
print("Listening for target messages…")
|
|
await client.run_until_disconnected()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print("Exiting.") |