import json import re import aiohttp from datetime import datetime, timezone from typing import List from .base import ICrawler from .dto import NewsItemDTO class SkolkovoCrawler(ICrawler): def __init__(self, url: str, source: str = "Skolkovo"): self.url = url self.source = source async def fetch_latest(self) -> List[NewsItemDTO]: async with aiohttp.ClientSession() as session: try: async with session.get(self.url, timeout=aiohttp.ClientTimeout(total=30)) as response: if response.status != 200: return [] html = await response.text() return self.parse_nextjs(html) except Exception: return [] def parse_nextjs(self, html: str) -> List[NewsItemDTO]: match = re.search(r'', html) if not match: return [] try: data = json.loads(match.group(1)) news_data = data["props"]["pageProps"]["initialProps"]["homeStore"]["news"] items_list = news_data.get("items", []) except (KeyError, TypeError, json.JSONDecodeError): return [] news_items = [] for item in items_list: title = item.get("title", "") # Slug is used for URL slug = item.get("slug", "") url = f"https://sk.ru/news/{slug}/" if slug else self.url content_text = item.get("description", "") # Clean up simple HTML if present content_text = re.sub(r'<[^>]+>', ' ', content_text) content_text = ' '.join(content_text.split()) # Timestamp ts_str = item.get("published_at") or item.get("created_at") timestamp = datetime.now(timezone.utc) if ts_str: try: timestamp = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) except ValueError: pass news_items.append(NewsItemDTO( title=title, url=url, content_text=content_text, source=self.source, timestamp=timestamp )) return news_items