Files
null-bot/bot.py
2026-05-11 11:34:38 +01:00

411 lines
15 KiB
Python

import os
import asyncio
import sys
import concurrent.futures
import logging
import re
from contextlib import suppress
from dotenv import load_dotenv
from functools import wraps
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler
# Import your existing logic
from agent import parse_page
from database import add_telegram_user_id, get_telegram_user_ids, upload_entry
from scraper import get_clean_content as _get_clean_content
# Run the scraper in a separate thread with its own event loop to avoid
# Windows Selector vs Proactor event loop conflicts between PTB and Playwright.
def _run_scraper_in_thread(url: str) -> str:
# Proactor is required for subprocesses (Playwright) on Windows
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(_get_clean_content(url))
finally:
loop.close()
async def get_clean_content(url: str) -> str:
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, _run_scraper_in_thread, url)
return result
load_dotenv()
logging.getLogger("httpx").setLevel(logging.WARNING)
# Configuration
TOKEN = os.getenv("TG_TOKEN")
_allowed_env = os.getenv("ALLOWED_USERS", "")
if _allowed_env:
try:
ALLOWED_IDS = {int(x.strip()) for x in _allowed_env.split(',') if x.strip()}
except Exception:
logging.warning("Failed to parse ALLOWED_USERS from .env; defaulting to empty list")
ALLOWED_IDS = set()
else:
ALLOWED_IDS = set()
if not TOKEN:
logging.warning("TG_TOKEN not set in .env; bot will not start without a token")
# Setup Logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
# --- Retry Function with Exponential Backoff ---
def retry(max_attempts=3, backoff_factor=2, initial_delay=1):
"""
Decorator for retrying async functions with exponential backoff.
Args:
max_attempts: Maximum number of retry attempts
backoff_factor: Multiplier for delay between retries
initial_delay: Initial delay in seconds
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
delay = initial_delay
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts:
logging.warning(f"Attempt {attempt}/{max_attempts} failed for {func.__name__}: {str(e)}")
await asyncio.sleep(delay)
delay *= backoff_factor
else:
logging.error(f"All {max_attempts} attempts failed for {func.__name__}")
raise last_exception
return wrapper
return decorator
# --- The Queue System ---
# This ensures only ONE crawl/parse happens at a time to save RAM
task_queue = asyncio.Queue()
def is_authorized_user(user_id: int) -> bool:
if user_id in ALLOWED_IDS:
return True
try:
return user_id in set(get_telegram_user_ids())
except Exception as exc:
logging.warning("Failed to fetch Telegram access list: %s", exc)
return False
def is_http_url(text: str) -> bool:
return bool(re.match(r'^https?://\S+$', text.strip()))
def build_url_choice_keyboard(url: str):
return InlineKeyboardMarkup([
[InlineKeyboardButton("📅 Process as Event", callback_data='choose_type:event')],
[InlineKeyboardButton("📋 Process as Opportunity", callback_data='choose_type:opportunity')],
])
def build_entry_summary(data, entry_type, saved=False):
if entry_type == "event":
event_datetime = data.get('date_time') or data.get('datetime')
return (
f"✅ **{data.get('title', 'Unknown')}**\n"
f"🦆 Org/s: {data.get('org')}\n"
f"📅 Date & Time: {event_datetime}\n"
f"📍 Location: {data.get('location')}\n"
f"🐊 Summary: {data.get('summary')}"
+ ("\n\n💾 **Saved to PocketBase!**" if saved else "")
)
return (
f"✅ **{data.get('title', 'Unknown')}**\n"
f"🦆 Org/s: {data.get('org')}\n"
f"📋 Type: {data.get('type')}\n"
f"🦢 Deadline: {data.get('deadline')}\n"
f"☁️ Location: {data.get('location')}\n"
f"🐊 Summary: {data.get('summary')}"
+ ("\n\n💾 **Saved to PocketBase!**" if saved else "")
)
async def worker():
while True:
# Get a 'task' from the queue
update, context, source_value, entry_type, source_kind = await task_queue.get()
try:
await process_link(update, context, source_value, entry_type, source_kind)
finally:
task_queue.task_done()
async def process_link(update, context, source_value, entry_type="opportunity", source_kind="url"):
# Handle both message and callback query contexts
if update.message:
status_msg = await update.message.reply_text(f"⏳ Crawling & Analyzing...")
elif update.callback_query:
status_msg = update.callback_query.message
await status_msg.edit_text(f"⏳ Crawling & Analyzing...")
else:
logging.error("Could not determine message context for status update")
return
# Store source and type for potential retry/save
context.user_data['last_source_value'] = source_value
context.user_data['last_source_kind'] = source_kind
context.user_data['last_entry_type'] = entry_type
try:
if source_kind == "url":
markdown = await get_clean_content(source_value)
else:
markdown = source_value
extracted_data = await parse_page(markdown, entry_type)
if not extracted_data:
await status_msg.edit_text("❌ Failed to extract data from that page.")
return
# 2. Store data temporarily in context to 'Save' later
context.user_data['last_extracted'] = extracted_data
context.user_data['awaiting_save_url'] = False
context.user_data['pending_save_url'] = None
# 3. Show Result with Buttons - format depends on entry type
summary = build_entry_summary(extracted_data, entry_type)
keyboard = [
[InlineKeyboardButton("💾 Save to DB", callback_data='save_db')],
[InlineKeyboardButton("🗑️ Discard", callback_data='discard')],
[InlineKeyboardButton("🔄️ Retry", callback_data='retry')]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await status_msg.edit_text(summary, reply_markup=reply_markup, parse_mode='Markdown')
except Exception as e:
await status_msg.edit_text(f"⚠️ Error: {str(e)}")
# --- Handlers ---
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not is_authorized_user(update.effective_user.id):
return
await update.message.reply_text(
"Welcome! I can extract arts opportunities and events.\n\n"
"📋 **Commands:**\n"
"/add <id> - Allow a Telegram user ID access\n"
"/op <url> - Extract an opportunity\n"
"/ev <url> - Extract an event\n\n"
"You can also send a URL directly and I will ask whether to process it as an event or opportunity."
)
async def handle_add_user(update: Update, context: ContextTypes.DEFAULT_TYPE):
requester_id = update.effective_user.id
if not is_authorized_user(requester_id):
await update.message.reply_text("Unauthorized. User ID needs to be added!")
return
if not context.args:
await update.message.reply_text("Usage: /add <telegram_user_id>")
return
raw_user_id = context.args[0].strip()
try:
user_id = int(raw_user_id)
except ValueError:
await update.message.reply_text("Please provide a valid numeric Telegram user ID. Usage: /add <telegram_user_id>")
return
if user_id in ALLOWED_IDS:
await update.message.reply_text(f"{user_id} already has access.")
return
try:
existing_ids = set(get_telegram_user_ids())
if user_id in existing_ids:
ALLOWED_IDS.add(user_id)
await update.message.reply_text(f"{user_id} already has access.")
return
add_telegram_user_id(user_id)
ALLOWED_IDS.add(user_id)
await update.message.reply_text(f"Added {user_id} to Telegram access.")
except Exception as exc:
logging.exception("Failed to add Telegram user ID %s", user_id)
await update.message.reply_text(f"Failed to add {user_id}: {exc}")
async def handle_opportunity(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if not is_authorized_user(user_id):
await update.message.reply_text("Unauthorized. User ID needs to be added!")
return
if not context.args:
await update.message.reply_text("Please provide a URL or paste text. Usage: /op <url or text>")
return
input_text = " ".join(context.args).strip()
if not input_text:
await update.message.reply_text("Please provide a URL or paste text. Usage: /op <url or text>")
return
source_kind = "url" if input_text.startswith("http") else "text"
await update.message.reply_text("📥 Link queued for processing...")
await task_queue.put((update, context, input_text, "opportunity", source_kind))
async def handle_event(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if not is_authorized_user(user_id):
await update.message.reply_text("Unauthorized. User ID needs to be added!")
return
if not context.args:
await update.message.reply_text("Please provide a URL or paste text. Usage: /ev <url or text>")
return
input_text = " ".join(context.args).strip()
if not input_text:
await update.message.reply_text("Please provide a URL or paste text. Usage: /ev <url or text>")
return
source_kind = "url" if input_text.startswith("http") else "text"
await update.message.reply_text("📥 Link queued for processing...")
await task_queue.put((update, context, input_text, "event", source_kind))
async def handle_followup_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not is_authorized_user(update.effective_user.id):
return
if not context.user_data.get('awaiting_save_url'):
text = (update.message.text or '').strip()
if not text or not is_http_url(text):
return
context.user_data['pending_url_to_process'] = text
await update.message.reply_text(
"What should I do with this URL?",
reply_markup=build_url_choice_keyboard(text)
)
return
text = update.message.text.strip()
if not text:
await update.message.reply_text("Please send a URL or type /skip to save without one.")
return
if text.lower() == '/skip':
url = None
elif text.startswith('http'):
url = text
else:
await update.message.reply_text("Please send a valid URL or type /skip to save without one.")
return
data = context.user_data.get('last_extracted')
entry_type = context.user_data.get('last_entry_type', 'opportunity')
if data:
upload_entry(data, entry_type, url)
context.user_data['awaiting_save_url'] = False
context.user_data['pending_save_url'] = None
await update.message.reply_text(build_entry_summary(data, entry_type, saved=True), parse_mode='Markdown')
async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
if not is_authorized_user(query.from_user.id):
await query.edit_message_text("Unauthorized. User ID needs to be added!")
return
if query.data.startswith('choose_type:'):
pending_url = context.user_data.get('pending_url_to_process')
if not pending_url:
await query.edit_message_text("❌ I couldn't find a pending URL to process.")
return
entry_type = query.data.split(':', 1)[1]
context.user_data['pending_url_to_process'] = None
await query.edit_message_text(f"📥 Queued URL for {entry_type} processing...")
await task_queue.put((update, context, pending_url, entry_type, 'url'))
return
if query.data == 'save_db':
data = context.user_data.get('last_extracted')
entry_type = context.user_data.get('last_entry_type', 'opportunity')
if data:
if context.user_data.get('last_source_kind') == 'text':
context.user_data['awaiting_save_url'] = True
context.user_data['pending_save_url'] = None
await query.edit_message_text(
build_entry_summary(data, entry_type)
+ "\n\nSend a source URL to attach it, or type /skip to save without one.",
parse_mode='Markdown'
)
else:
url = context.user_data.get('last_source_value')
upload_entry(data, entry_type, url) # Pass URL to save with entry
await query.edit_message_text(build_entry_summary(data, entry_type, saved=True), parse_mode='Markdown')
elif query.data == 'retry':
# Retry processing the last URL with the same entry type
source_value = context.user_data.get('last_source_value')
source_kind = context.user_data.get('last_source_kind', 'url')
entry_type = context.user_data.get('last_entry_type', 'opportunity')
if source_value:
await query.edit_message_text("⏳ Retrying...")
await task_queue.put((update, context, source_value, entry_type, source_kind))
else:
await query.edit_message_text("❌ No source content to retry.")
else:
await query.edit_message_text("🗑️ Discarded.")
# Main Entry Point
async def _main():
application = ApplicationBuilder().token(TOKEN).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("add", handle_add_user))
application.add_handler(CommandHandler("op", handle_opportunity))
application.add_handler(CommandHandler("ev", handle_event))
application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_followup_text))
application.add_handler(CallbackQueryHandler(button_handler))
async with application:
await application.initialize()
await application.start()
worker_task = asyncio.create_task(worker())
print("🤖 Bot is running...")
await application.updater.start_polling()
# Keep running until interrupted
await asyncio.Event().wait()
# Graceful shutdown
await application.updater.stop()
worker_task.cancel()
with suppress(asyncio.CancelledError):
await worker_task
await application.stop()
if __name__ == '__main__':
import sys
# if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(_main())