Files
null-bot/database.py
2026-05-11 11:34:38 +01:00

208 lines
6.7 KiB
Python

import os
from dotenv import load_dotenv
from pocketbase import PocketBase
from schemas import EntrySchema
from datetime import datetime
load_dotenv()
pb = PocketBase(os.getenv('POCKETBASE_URL'))
admin_data = pb.admins.auth_with_password(os.getenv('POCKETBASE_ADMIN_EMAIL'), os.getenv('POCKETBASE_ADMIN_PASSWORD'))
show_debug_msg = False
TELEGRAM_COLLECTION_NAME = 'Telegram'
def _record_value(record, key, default=None):
def _lookup(mapping):
if not isinstance(mapping, dict):
return default
if key in mapping:
return mapping.get(key, default)
lower_key = key.lower()
for existing_key, existing_value in mapping.items():
if str(existing_key).lower() == lower_key:
return existing_value
return default
if isinstance(record, dict):
return _lookup(record)
if hasattr(record, 'get'):
try:
value = record.get(key, default)
if value is not default:
return value
except Exception:
pass
if hasattr(record, 'data'):
try:
value = _lookup(record.data)
if value is not default:
return value
except Exception:
pass
if hasattr(record, 'model_dump'):
try:
value = _lookup(record.model_dump())
if value is not default:
return value
except Exception:
pass
if hasattr(record, 'to_dict'):
try:
value = _lookup(record.to_dict())
if value is not default:
return value
except Exception:
pass
if hasattr(record, '__dict__'):
value = _lookup(record.__dict__)
if value is not default:
return value
if hasattr(record, key):
return getattr(record, key)
return default
def _normalize_telegram_id(value):
if value is None:
return None
try:
return int(str(value).strip())
except (TypeError, ValueError):
return None
def get_telegram_user_ids():
records = pb.collection(TELEGRAM_COLLECTION_NAME).get_full_list()
telegram_ids = set()
for record in records:
raw_telegram_id = _record_value(record, 'TGID')
telegram_id = _normalize_telegram_id(raw_telegram_id)
if telegram_id is not None:
telegram_ids.add(telegram_id)
record_id = _normalize_telegram_id(_record_value(record, 'id'))
if record_id is not None:
telegram_ids.add(record_id)
return sorted(telegram_ids)
def add_telegram_user_id(user_id: int):
collection = pb.collection(TELEGRAM_COLLECTION_NAME)
payload_candidates = [
{'TGID': user_id},
{'id': str(user_id)},
]
last_error = None
for payload in payload_candidates:
try:
return collection.create(payload)
except Exception as exc:
last_error = exc
raise last_error
def convert_datetime_to_pocketbase(date_time_str):
"""
Convert datetime string from DD-MM-YYYY HH:MM format to PocketBase datetime format.
PocketBase (Local) expects: YYYY-MM-DD HH:MM:SS
"""
if date_time_str == 'N/A' or not date_time_str:
return None
try:
if show_debug_msg:
print(f"[DEBUG] Converting datetime: '{date_time_str}' (type: {type(date_time_str)})")
# Parse the input format: "DD-MM-YYYY HH:MM" or "DD-MM-YYYY (HH:MM)"
date_time_str = date_time_str.replace("(", "").replace(")", "").strip()
# Try with time first
if " " in date_time_str:
dt = datetime.strptime(date_time_str, "%d-%m-%Y %H:%M")
else:
# If only date is provided, set time to 00:00
dt = datetime.strptime(date_time_str, "%d-%m-%Y")
# Convert to PocketBase local datetime format: YYYY-MM-DD HH:MM:SS
pb_format = dt.strftime("%Y-%m-%d %H:%M:%S")
if show_debug_msg:
print(f"[DEBUG] Converted to PocketBase format: '{pb_format}'")
return pb_format
except Exception as e:
print(f"[ERROR] Error converting datetime '{date_time_str}': {e}")
import traceback
traceback.print_exc()
return None
def upload_entry(data, entry_type='opportunity', url=None):
"""
Upload entry to the appropriate PocketBase collection.
Args:
data: Dictionary containing the entry data
entry_type: 'opportunity' or 'event'
url: The source URL of the entry
"""
print(f"[DEBUG] Uploading {entry_type} entry. Data: {data['title']}")
data = dict(data)
# Add URL to data if provided
if url:
data['url'] = url
print(f"[DEBUG] Added URL: {url}")
try:
if entry_type == 'event':
# Map 'date_time' from agent to 'datetime' for PocketBase
if 'date_time' in data:
original_dt = data['date_time']
# Convert and map to PocketBase field name
data['datetime'] = convert_datetime_to_pocketbase(data['date_time'])
# Remove the original field since PocketBase expects 'datetime'
del data['date_time']
if show_debug_msg:
print(f"[DEBUG] Event datetime: '{original_dt}' -> '{data['datetime']}'")
else:
print(f"[WARNING] No 'date_time' field found in event data")
# Upload to events collection
print(f"[DEBUG] Creating record in 'events' collection with data: {data}")
result = pb.collection('events').create(data)
print(f"[DEBUG] Successfully created record: {result}")
return result
else:
# Opportunities - convert deadline to datetime format
if 'deadline' in data:
original_deadline = data['deadline']
# Convert deadline to PocketBase datetime format
data['deadline'] = convert_datetime_to_pocketbase(data['deadline'])
if show_debug_msg:
print(f"[DEBUG] Opportunity deadline: '{original_deadline}' -> '{data['deadline']}'")
else:
print(f"[WARNING] No 'deadline' field found in opportunity data")
# Upload to opportunities collection
print(f"[DEBUG] Creating record in 'opportunities' collection with data: {data}")
result = pb.collection('opportunities').create(data)
print(f"[DEBUG] Successfully created record: {result}")
return result
except Exception as e:
print(f"[ERROR] Failed to upload entry to PocketBase: {e}")
import traceback
traceback.print_exc()
raise