Files
telegram-scam-baiter/app/tg_helpers.py
Cameron Grant 6ce7117cc4
All checks were successful
continuous-integration/drone/push Build is passing
Correctly handles multiple messages.
2025-10-06 16:12:38 -07:00

102 lines
4.1 KiB
Python

import asyncio, random
from telethon.tl.types import User
from telethon.errors import FloodWaitError
from . import config, history
def _norm(s: str) -> str:
return " ".join((s or "").split()).strip().lower()
def _display_name(ent) -> str:
name = getattr(ent, "first_name", None)
last = getattr(ent, "last_name", None)
if name or last: return " ".join([n for n in [name, last] if n])
return getattr(ent, "name", "") or ""
def sender_matches_target(sender: User, target_entity) -> bool:
if target_entity and sender.id == getattr(target_entity, "id", None): return True
if config.TARGET_USERNAME and sender.username:
if sender.username.lower() == config.TARGET_USERNAME.lower(): return True
if config.TARGET_USER_ID and sender.id == config.TARGET_USER_ID: return True
return False
async def resolve_target_entity(client):
if config.TARGET_USERNAME:
try: return await client.get_entity(config.TARGET_USERNAME)
except Exception: pass
if config.TARGET_USER_ID:
try: return await client.get_entity(config.TARGET_USER_ID)
except Exception: pass
if config.TARGET_CACHE_FILE.exists():
try:
cached = int(config.TARGET_CACHE_FILE.read_text().strip())
return await client.get_entity(cached)
except Exception: pass
if config.TARGET_DISPLAY_NAME:
best, best_exact, best_date = None, False, None
async for d in client.iter_dialogs():
ent = d.entity
if not isinstance(ent, User): continue
if getattr(ent, "bot", False) or getattr(ent, "is_self", False): continue
actual = _display_name(ent)
if not actual: continue
a = _norm(actual); t = _norm(config.TARGET_DISPLAY_NAME)
if not t: continue
exact = a == t; partial = t in a
if not (exact or partial): continue
dt = getattr(d.message, "date", None)
if best is None or (exact and not best_exact) or ((exact == best_exact) and dt and (not best_date or dt > best_date)):
best, best_exact, best_date = ent, exact, dt
if best: return best
return None
def cache_target_id(entity) -> None:
try: config.TARGET_CACHE_FILE.write_text(str(getattr(entity, "id","")))
except Exception as e: print(f"Warning: failed to write target cache file: {e}")
async def human_delay():
await asyncio.sleep(random.randint(config.MIN_DELAY_SEC, config.MAX_DELAY_SEC))
def _estimate_typing_seconds(text: str) -> float:
if not text: return config.TYPING_MIN_SEC
words = max(1, len(text)/5.0)
secs = (words / max(1, config.TYPING_WPM)) * 60.0
return max(config.TYPING_MIN_SEC, min(config.TYPING_MAX_SEC, secs))
async def simulate_typing(client, entity, seconds: float):
if seconds <= 0: return
chunk = 3.5; total = 0.0
try:
async with client.action(entity, 'typing'):
while total < seconds:
sl = chunk if (seconds-total) > chunk else (seconds-total)
await asyncio.sleep(sl)
total += sl
except Exception as e:
print(f"Typing simulation warning: {e}")
async def catchup_and_regen(client, entity, draft_text: str, regen_fn):
MAX_ROUNDS = 3
last_seen_id = None
for _ in range(MAX_ROUNDS):
msgs = await client.get_messages(entity, limit=1)
if not msgs: break
last = msgs[0]
if last_seen_id is not None and last.id == last_seen_id:
break
last_seen_id = last.id
latest_text = (last.message or "").strip()
if last.out or not latest_text: break
draft_text = await regen_fn()
return draft_text
async def send_with_catchup(client, entity, text: str, regen_fn):
await human_delay()
text = await catchup_and_regen(client, entity, text, regen_fn)
if config.TYPING_SIM_ENABLED:
await simulate_typing(client, entity, _estimate_typing_seconds(text))
text = await catchup_and_regen(client, entity, text, regen_fn)
try:
await client.send_message(entity, text)
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 3)
await client.send_message(entity, text)