import os import asyncio import sys import concurrent.futures import logging import re from contextlib import suppress from dotenv import load_dotenv from functools import wraps from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler # Import your existing logic from agent import parse_page from database import upload_entry from scraper import get_clean_content as _get_clean_content # Run the scraper in a separate thread with its own event loop to avoid # Windows Selector vs Proactor event loop conflicts between PTB and Playwright. def _run_scraper_in_thread(url: str) -> str: # Proactor is required for subprocesses (Playwright) on Windows if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(_get_clean_content(url)) finally: loop.close() async def get_clean_content(url: str) -> str: loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor(pool, _run_scraper_in_thread, url) return result load_dotenv() logging.getLogger("httpx").setLevel(logging.WARNING) # Configuration TOKEN = os.getenv("TG_TOKEN") _allowed_env = os.getenv("ALLOWED_USERS", "") if _allowed_env: try: ALLOWED_IDS = [int(x.strip()) for x in _allowed_env.split(',') if x.strip()] except Exception: logging.warning("Failed to parse ALLOWED_USERS from .env; defaulting to empty list") ALLOWED_IDS = [] else: ALLOWED_IDS = [] if not TOKEN: logging.warning("TG_TOKEN not set in .env; bot will not start without a token") # Setup Logging logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) # --- Retry Function with Exponential Backoff --- def retry(max_attempts=3, backoff_factor=2, initial_delay=1): """ Decorator for retrying async functions with exponential backoff. Args: max_attempts: Maximum number of retry attempts backoff_factor: Multiplier for delay between retries initial_delay: Initial delay in seconds """ def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): delay = initial_delay last_exception = None for attempt in range(1, max_attempts + 1): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_attempts: logging.warning(f"Attempt {attempt}/{max_attempts} failed for {func.__name__}: {str(e)}") await asyncio.sleep(delay) delay *= backoff_factor else: logging.error(f"All {max_attempts} attempts failed for {func.__name__}") raise last_exception return wrapper return decorator # --- The Queue System --- # This ensures only ONE crawl/parse happens at a time to save RAM task_queue = asyncio.Queue() def is_http_url(text: str) -> bool: return bool(re.match(r'^https?://\S+$', text.strip())) def build_url_choice_keyboard(url: str): return InlineKeyboardMarkup([ [InlineKeyboardButton("šŸ“… Process as Event", callback_data='choose_type:event')], [InlineKeyboardButton("šŸ“‹ Process as Opportunity", callback_data='choose_type:opportunity')], ]) def build_entry_summary(data, entry_type, saved=False): if entry_type == "event": event_datetime = data.get('date_time') or data.get('datetime') return ( f"āœ… **{data.get('title', 'Unknown')}**\n" f"šŸ¦† Org/s: {data.get('org')}\n" f"šŸ“… Date & Time: {event_datetime}\n" f"šŸ“ Location: {data.get('location')}\n" f"🐊 Summary: {data.get('summary')}" + ("\n\nšŸ’¾ **Saved to PocketBase!**" if saved else "") ) return ( f"āœ… **{data.get('title', 'Unknown')}**\n" f"šŸ¦† Org/s: {data.get('org')}\n" f"šŸ“‹ Type: {data.get('type')}\n" f"🦢 Deadline: {data.get('deadline')}\n" f"ā˜ļø Location: {data.get('location')}\n" f"🐊 Summary: {data.get('summary')}" + ("\n\nšŸ’¾ **Saved to PocketBase!**" if saved else "") ) async def worker(): while True: # Get a 'task' from the queue update, context, source_value, entry_type, source_kind = await task_queue.get() try: await process_link(update, context, source_value, entry_type, source_kind) finally: task_queue.task_done() async def process_link(update, context, source_value, entry_type="opportunity", source_kind="url"): # Handle both message and callback query contexts if update.message: status_msg = await update.message.reply_text(f"ā³ Crawling & Analyzing...") elif update.callback_query: status_msg = update.callback_query.message await status_msg.edit_text(f"ā³ Crawling & Analyzing...") else: logging.error("Could not determine message context for status update") return # Store source and type for potential retry/save context.user_data['last_source_value'] = source_value context.user_data['last_source_kind'] = source_kind context.user_data['last_entry_type'] = entry_type try: if source_kind == "url": markdown = await get_clean_content(source_value) else: markdown = source_value extracted_data = await parse_page(markdown, entry_type) if not extracted_data: await status_msg.edit_text("āŒ Failed to extract data from that page.") return # 2. Store data temporarily in context to 'Save' later context.user_data['last_extracted'] = extracted_data context.user_data['awaiting_save_url'] = False context.user_data['pending_save_url'] = None # 3. Show Result with Buttons - format depends on entry type summary = build_entry_summary(extracted_data, entry_type) keyboard = [ [InlineKeyboardButton("šŸ’¾ Save to DB", callback_data='save_db')], [InlineKeyboardButton("šŸ—‘ļø Discard", callback_data='discard')], [InlineKeyboardButton("šŸ”„ļø Retry", callback_data='retry')] ] reply_markup = InlineKeyboardMarkup(keyboard) await status_msg.edit_text(summary, reply_markup=reply_markup, parse_mode='Markdown') except Exception as e: await status_msg.edit_text(f"āš ļø Error: {str(e)}") # --- Handlers --- async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_user.id not in ALLOWED_IDS: return await update.message.reply_text( "Welcome! I can extract arts opportunities and events.\n\n" "šŸ“‹ **Commands:**\n" "/op - Extract an opportunity\n" "/ev - Extract an event\n\n" "You can also send a URL directly and I will ask whether to process it as an event or opportunity." ) async def handle_opportunity(update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = update.effective_user.id if user_id not in ALLOWED_IDS: await update.message.reply_text("Unauthorized. User ID needs to be added!") return if not context.args: await update.message.reply_text("Please provide a URL or paste text. Usage: /op ") return input_text = " ".join(context.args).strip() if not input_text: await update.message.reply_text("Please provide a URL or paste text. Usage: /op ") return source_kind = "url" if input_text.startswith("http") else "text" await update.message.reply_text("šŸ“„ Link queued for processing...") await task_queue.put((update, context, input_text, "opportunity", source_kind)) async def handle_event(update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = update.effective_user.id if user_id not in ALLOWED_IDS: await update.message.reply_text("Unauthorized. User ID needs to be added!") return if not context.args: await update.message.reply_text("Please provide a URL or paste text. Usage: /ev ") return input_text = " ".join(context.args).strip() if not input_text: await update.message.reply_text("Please provide a URL or paste text. Usage: /ev ") return source_kind = "url" if input_text.startswith("http") else "text" await update.message.reply_text("šŸ“„ Link queued for processing...") await task_queue.put((update, context, input_text, "event", source_kind)) async def handle_followup_text(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.effective_user.id not in ALLOWED_IDS: return if not context.user_data.get('awaiting_save_url'): text = (update.message.text or '').strip() if not text or not is_http_url(text): return context.user_data['pending_url_to_process'] = text await update.message.reply_text( "What should I do with this URL?", reply_markup=build_url_choice_keyboard(text) ) return text = update.message.text.strip() if not text: await update.message.reply_text("Please send a URL or type /skip to save without one.") return if text.lower() == '/skip': url = None elif text.startswith('http'): url = text else: await update.message.reply_text("Please send a valid URL or type /skip to save without one.") return data = context.user_data.get('last_extracted') entry_type = context.user_data.get('last_entry_type', 'opportunity') if data: upload_entry(data, entry_type, url) context.user_data['awaiting_save_url'] = False context.user_data['pending_save_url'] = None await update.message.reply_text(build_entry_summary(data, entry_type, saved=True), parse_mode='Markdown') async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.callback_query await query.answer() if query.data.startswith('choose_type:'): pending_url = context.user_data.get('pending_url_to_process') if not pending_url: await query.edit_message_text("āŒ I couldn't find a pending URL to process.") return entry_type = query.data.split(':', 1)[1] context.user_data['pending_url_to_process'] = None await query.edit_message_text(f"šŸ“„ Queued URL for {entry_type} processing...") await task_queue.put((update, context, pending_url, entry_type, 'url')) return if query.data == 'save_db': data = context.user_data.get('last_extracted') entry_type = context.user_data.get('last_entry_type', 'opportunity') if data: if context.user_data.get('last_source_kind') == 'text': context.user_data['awaiting_save_url'] = True context.user_data['pending_save_url'] = None await query.edit_message_text( build_entry_summary(data, entry_type) + "\n\nSend a source URL to attach it, or type /skip to save without one.", parse_mode='Markdown' ) else: url = context.user_data.get('last_source_value') upload_entry(data, entry_type, url) # Pass URL to save with entry await query.edit_message_text(build_entry_summary(data, entry_type, saved=True), parse_mode='Markdown') elif query.data == 'retry': # Retry processing the last URL with the same entry type source_value = context.user_data.get('last_source_value') source_kind = context.user_data.get('last_source_kind', 'url') entry_type = context.user_data.get('last_entry_type', 'opportunity') if source_value: await query.edit_message_text("ā³ Retrying...") await task_queue.put((update, context, source_value, entry_type, source_kind)) else: await query.edit_message_text("āŒ No source content to retry.") else: await query.edit_message_text("šŸ—‘ļø Discarded.") # Main Entry Point async def _main(): application = ApplicationBuilder().token(TOKEN).build() application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("op", handle_opportunity)) application.add_handler(CommandHandler("ev", handle_event)) application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_followup_text)) application.add_handler(CallbackQueryHandler(button_handler)) async with application: await application.initialize() await application.start() worker_task = asyncio.create_task(worker()) print("šŸ¤– Bot is running...") await application.updater.start_polling() # Keep running until interrupted await asyncio.Event().wait() # Graceful shutdown await application.updater.stop() worker_task.cancel() with suppress(asyncio.CancelledError): await worker_task await application.stop() if __name__ == '__main__': import sys # if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(_main())