AI-Trend-Scout/src/bot/handlers.py

180 lines
6.8 KiB
Python

import uuid
from datetime import datetime
import html
from typing import Optional, Callable, Dict, Any, Awaitable
from aiogram import Router, BaseMiddleware, F
from aiogram.filters import CommandStart, Command, CommandObject
from aiogram.types import Message, TelegramObject, InlineKeyboardButton, InlineKeyboardMarkup, CallbackQuery
from aiogram.utils.keyboard import InlineKeyboardBuilder
from aiogram.utils.formatting import as_list, as_marked_section, Bold, TextLink
from src.processor.dto import EnrichedNewsItemDTO
from src.processor.base import ILLMProvider
from src.storage.base import IVectorStore
class AccessMiddleware(BaseMiddleware):
def __init__(self, allowed_chat_id: str):
self.allowed_chat_id = allowed_chat_id
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
if isinstance(event, Message):
if str(event.chat.id) != self.allowed_chat_id:
await event.answer("Access Denied")
return
return await handler(event, data)
def get_router(storage: IVectorStore, processor: ILLMProvider, allowed_chat_id: str) -> Router:
router = Router(name="main_router")
router.message.middleware(AccessMiddleware(allowed_chat_id))
@router.message(CommandStart())
async def command_start_handler(message: Message) -> None:
"""
This handler receives messages with `/start` command
"""
user_name = html.escape(message.from_user.full_name) if message.from_user else 'user'
await message.answer(f"Welcome to Trend-Scout AI, {user_name}!")
@router.message(Command("help"))
async def command_help_handler(message: Message) -> None:
"""
This handler receives messages with `/help` command
"""
help_text = (
"Available commands:\n"
"/start - Start the bot\n"
"/help - Show this help message\n"
"/latest [category] - Show the latest enriched news trends\n"
"/search query - Search for news\n"
"/stats - Show database statistics\n"
"/params - Show LLM processor parameters\n"
)
await message.answer(help_text)
@router.message(Command("params"))
async def command_params_handler(message: Message) -> None:
"""
This handler receives messages with `/params` command
"""
info = processor.get_info()
response = "🤖 <b>LLM Processor Parameters</b>\n\n"
response += f"<b>Model:</b> {html.escape(info.get('model', 'Unknown'))}\n"
response += f"<b>Base URL:</b> {html.escape(info.get('base_url', 'Unknown'))}\n"
response += f"<b>Prompt Summary:</b> {html.escape(info.get('prompt_summary', 'Unknown'))}"
await message.answer(response, parse_mode="HTML")
@router.message(Command("latest"))
async def command_latest_handler(message: Message, command: CommandObject) -> None:
"""
This handler receives messages with `/latest` command
"""
category = command.args if command.args else ""
items = await storage.search(query=category, limit=10)
if not items:
await message.answer("No results found.")
return
builder = InlineKeyboardBuilder()
for item in items:
item_id = str(uuid.uuid5(uuid.NAMESPACE_URL, item.url))
builder.row(InlineKeyboardButton(
text=f"[{item.relevance_score}/10] {item.title}",
callback_data=f"detail:{item_id}"
))
await message.answer("Latest news:", reply_markup=builder.as_markup())
@router.message(Command("search"))
async def command_search_handler(message: Message, command: CommandObject) -> None:
"""
This handler receives messages with `/search` command
"""
query = command.args
if not query:
await message.answer("Please provide a search query. Usage: /search query")
return
items = await storage.search(query=query, limit=10)
if not items:
await message.answer("No results found.")
return
builder = InlineKeyboardBuilder()
for item in items:
item_id = str(uuid.uuid5(uuid.NAMESPACE_URL, item.url))
builder.row(InlineKeyboardButton(
text=f"[{item.relevance_score}/10] {item.title}",
callback_data=f"detail:{item_id}"
))
await message.answer("Search results:", reply_markup=builder.as_markup())
@router.callback_query(F.data.startswith("detail:"))
async def detail_callback_handler(callback: CallbackQuery) -> None:
"""
This handler receives callback queries for news details
"""
item_id = callback.data.split(":")[1]
item = await storage.get_by_id(item_id)
if not item:
await callback.answer("Item not found.", show_alert=True)
return
title = html.escape(item.title)
source = html.escape(item.source)
summary = html.escape(item.summary_ru)
category = html.escape(item.category)
anomalies = [html.escape(a) for a in item.anomalies_detected] if item.anomalies_detected else []
anomalies_text = ", ".join(anomalies)
url = html.escape(item.url)
response_text = (
f"🌟 <b>{title}</b>\n\n"
f"<b>Source:</b> {source}\n"
f"<b>Category:</b> {category}\n"
f"<b>Relevance Score:</b> {item.relevance_score}/10\n"
f"<b>Summary:</b> {summary}\n"
)
if anomalies_text:
response_text += f"<b>Anomalies Detected:</b> {anomalies_text}\n\n"
else:
response_text += "\n"
response_text += f"<a href='{url}'>Read more</a>"
await callback.message.answer(response_text, parse_mode="HTML", disable_web_page_preview=False)
await callback.answer()
@router.message(Command("stats"))
async def command_stats_handler(message: Message) -> None:
"""
This handler receives messages with `/stats` command
"""
stats = await storage.get_stats()
total = stats.get("total_count", 0)
breakdown = []
for key, count in stats.items():
if key.startswith("category_"):
cat_name = key.replace("category_", "")
breakdown.append(f"- {cat_name}: {count}")
response = f"📊 <b>Database Statistics</b>\n\nTotal items: {total}\n"
if breakdown:
response += "\n<b>Breakdown by category:</b>\n" + "\n".join(breakdown)
await message.answer(response, parse_mode="HTML")
return router