Files
openvk4webtv/main.py
2025-07-26 15:23:11 +03:00

498 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, Request, Form, HTTPException
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
import uuid
import httpx
import json
import threading
import os
import time
from PIL import Image
import io
from PIL import Image, ImageDraw, ImageFont
import textwrap
from urllib.parse import urlparse
app = FastAPI()
templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")
STORAGE_FILE = "storage.json"
os.makedirs("static/images", exist_ok=True)
FONT_PATH = "arial.ttf" # Загрузите шрифт и положите в корень
LINE_SPACING = 10 # Больше пространства между строками
FONT_SIZE = 15 # Увеличить размер шрифта
TEXT_COLOR = (0, 0, 0) # Чёрный текст
BG_COLOR = (255, 255, 255) # Белый фон
# Создаём дефолтный текст если шрифт не найден
print('тест на файлик ноу нейм')
if not os.path.exists("static/text/no_name.gif"):
print('нету ураа')
img = Image.new("RGB", (100, 20), BG_COLOR)
draw = ImageDraw.Draw(img)
draw.text((5, 5), "", fill=TEXT_COLOR)
img.save("static/text/no_name.gif", "GIF")
print('наверно всё')
def load_storage():
try:
with open(STORAGE_FILE, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_storage(data):
with open(STORAGE_FILE, "w") as f:
json.dump(data, f)
async def process_avatar(image_url: str, user_id: int):
try:
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(image_url)
response.raise_for_status()
# Извлекаем реальный ID пользователя из URL инстанса
parsed_url = urlparse(image_url)
filename = f"{user_id}.gif"
filepath = f"static/images/{filename}"
img = Image.open(io.BytesIO(response.content))
img = img.resize((128, 128)).convert("RGB")
img.save(filepath, "GIF")
return f"/static/images/{filename}"
except Exception as e:
print(f"Avatar error: {str(e)}")
# Создаём дефолтную аватарку если её нет
default_path = "static/images/default.gif"
if not os.path.exists(default_path):
img = Image.new("RGB", (128, 128), color="gray")
img.save(default_path, "GIF")
return default_path
def cleanup_file(file_path):
def delete_file():
try:
if os.path.exists(file_path):
os.remove(file_path)
print(f"Deleted {file_path}")
except Exception as e:
print(f"Error deleting {file_path}: {str(e)}")
# Запускаем удаление через 5 секунд в отдельном потоке
timer = threading.Timer(5.0, delete_file)
timer.start()
def render_text(text: str, max_width: int = 480) -> str:
if isinstance(text, int): # Добавляем обработку чисел
text = str(text)
try:
text = translit(text, 'ru', reversed=True)
except:
pass
if not text.strip():
return "/static/text/no_name.gif"
try:
font = ImageFont.truetype(FONT_PATH, FONT_SIZE)
except IOError:
font = ImageFont.load_default()
# Новый метод расчёта размеров
def get_text_size(fnt, txt):
bbox = fnt.getbbox(txt)
return (bbox[2] - bbox[0], bbox[3] - bbox[1])
# Разбиваем текст на строки
wrapper = textwrap.TextWrapper(width=max_width // (FONT_SIZE))
lines = wrapper.wrap(text)
# Рассчитываем размер изображения
line_height = get_text_size(font, "A")[1] + LINE_SPACING # Добавляем spacing к высоте линии
img_width = max(get_text_size(font, line)[0] for line in lines) + 10
img_height = (line_height * len(lines)) + 10 # Учитываем суммарный spacing
# Создаём изображение
img = Image.new("RGB", (img_width, img_height), BG_COLOR)
draw = ImageDraw.Draw(img)
# Рендерим текст
y = 5
for line in lines:
draw.text((5, y), line, font=font, fill=TEXT_COLOR)
y += line_height
# Сохраняем в static/text
os.makedirs("static/text", exist_ok=True)
filename = f"{hash(text)}.gif"
img_path = f"static/text/{filename}"
img.save(img_path, "GIF")
full_path = os.path.abspath(img_path)
cleanup_file(full_path) # Добавляем в очередь на удаление
return f"/{img_path}"
def render_separator():
separator = "-" * 32 # 40 дефисов
return render_text(separator)
templates.env.globals["render_text"] = render_text
@app.get("/", response_class=HTMLResponse)
async def login_page(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/auth")
async def authenticate(
request: Request,
instance: str = Form(...),
username: str = Form(...),
password: str = Form(...),
):
if not all([instance, username, password]):
return RedirectResponse(url="/?error=1", status_code=303)
instance = instance if instance.startswith(("http://", "https://")) else f"http://{instance}"
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{instance.rstrip('/')}/token",
params={
"username": username,
"password": password,
"grant_type": "password",
"client_name": "ovk4webtv"
}
)
response.raise_for_status()
token_data = response.json()
except Exception as e:
return RedirectResponse(url="/?error=2", status_code=303)
storage = load_storage()
device_uuid = str(uuid.uuid4())
storage[device_uuid] = {
"token": token_data["access_token"],
"instance": instance.rstrip('/'), # Сохраняем нормализованный URL
"user_id": token_data["user_id"]
}
save_storage(storage)
response = RedirectResponse(url="/profile", status_code=303)
response.set_cookie(key="webtv_uuid", value=device_uuid)
return response
@app.get("/profile", response_class=HTMLResponse)
async def profile_page(request: Request):
device_uuid = request.cookies.get("webtv_uuid")
storage = load_storage()
if not device_uuid or device_uuid not in storage:
return RedirectResponse(url="/", status_code=303)
user_data = storage[device_uuid]
try:
async with httpx.AsyncClient() as client:
# Получаем базовую информацию профиля
profile_response = await client.get(
f"{user_data['instance'].rstrip('/')}/method/Account.getProfileInfo",
params={"access_token": user_data["token"]}
)
profile_response.raise_for_status()
profile_data = profile_response.json()["response"]
# Получаем полную информацию о пользователе
users_response = await client.get(
f"{user_data['instance'].rstrip('/')}/method/users.get",
params={
"access_token": user_data["token"],
"user_ids": profile_data["id"],
"fields": "photo_200"
}
)
users_response.raise_for_status()
user_info = users_response.json()["response"][0]
except Exception as e:
print(f"Profile error: {str(e)}")
return RedirectResponse(url="/?error=3", status_code=303)
# Используем ID из профиля вместо user_id из токена
avatar_url = await process_avatar(user_info["photo_200"], user_info["id"])
return templates.TemplateResponse("profile.html", {
"request": request,
"first_name_img": render_text(profile_data.get("first_name", "")),
"last_name_img": render_text(profile_data.get("last_name", "")),
"user_id": user_info["id"],
"avatar": avatar_url
})
# Остальные обработчики остаются без изменений
@app.get("/token", response_class=HTMLResponse)
async def show_token(request: Request):
device_uuid = request.cookies.get("webtv_uuid")
if not device_uuid or device_uuid not in tokens:
raise HTTPException(status_code=400, detail="Сессия не найдена")
return templates.TemplateResponse(
"token.html",
{
"request": request,
"token": tokens[device_uuid],
"uuid": device_uuid
}
)
@app.get("/feed", response_class=HTMLResponse)
async def news_feed(
request: Request,
page: int = 0,
global_feed: bool = False
):
device_uuid = request.cookies.get("webtv_uuid")
storage = load_storage()
# Проверка авторизации
if not device_uuid or device_uuid not in storage:
return RedirectResponse(url="/", status_code=303)
user_data = storage[device_uuid]
try:
# Получаем ленту новостей
method = "newsfeed.getGlobal" if global_feed else "newsfeed.get"
async with httpx.AsyncClient() as client:
feed_response = await client.get(
f"{user_data['instance']}/method/{method}",
params={
"access_token": user_data["token"],
"count": 5,
"offset": page * 5
}
)
feed_response.raise_for_status()
feed_data = feed_response.json()["response"]
except Exception as e:
print(f"Feed error: {str(e)}")
return RedirectResponse(url="/?error=4", status_code=303)
# Собираем ID пользователей
user_ids = set()
for item in feed_data["items"]:
user_ids.add(item["from_id"])
user_ids.add(item["owner_id"])
# Получаем информацию о пользователях
users_info = {}
try:
async with httpx.AsyncClient() as client:
users_response = await client.get(
f"{user_data['instance']}/method/users.get",
params={
"access_token": user_data["token"],
"user_ids": ",".join(map(str, user_ids)),
"fields": "first_name,last_name"
}
)
users_response.raise_for_status()
for user in users_response.json()["response"]:
users_info[user["id"]] = user
except Exception as e:
print(f"Users info error: {str(e)}")
return RedirectResponse(url="/?error=5", status_code=303)
# Формируем данные для отображения
processed_posts = []
for idx, post in enumerate(feed_data["items"]): # Добавлен enumerate
from_user = users_info.get(post["from_id"])
owner_user = users_info.get(post["owner_id"])
if not from_user or not owner_user:
continue
post_text = post.get("text", "")
if post["from_id"] == post["owner_id"]:
author_text = f"{from_user['first_name']} {from_user['last_name']} wrote:"
else:
author_text = (f"{from_user['first_name']} {from_user['last_name']} "
f"posted on {owner_user['first_name']} "
f"{owner_user['last_name']}'s wall:")
processed_posts.append({
"author": render_text(author_text),
"text": render_text(post_text),
"has_attachments": len(post.get("attachments", [])) > 0,
"likes": post.get("likes", {}).get("count", 0), # Добавляем количество лайков
"separator": render_separator() if idx < len(feed_data["items"]) - 1 else None
})
return templates.TemplateResponse("feed.html", {
"request": request,
"posts": processed_posts,
"current_page": page,
"has_next": "next_from" in feed_data,
"global_feed": global_feed
})
@app.get("/feed_global", response_class=HTMLResponse)
async def global_news_feed(
request: Request,
page: int = 0,
global_feed: bool = False
):
device_uuid = request.cookies.get("webtv_uuid")
storage = load_storage()
# Проверка авторизации
if not device_uuid or device_uuid not in storage:
return RedirectResponse(url="/", status_code=303)
user_data = storage[device_uuid]
try:
# Получаем ленту новостей
method = "newsfeed.getGlobal"
async with httpx.AsyncClient() as client:
feed_response = await client.get(
f"{user_data['instance']}/method/{method}",
params={
"access_token": user_data["token"],
"count": 5,
"offset": page * 5
}
)
feed_response.raise_for_status()
feed_data = feed_response.json()["response"]
except Exception as e:
print(f"Feed error: {str(e)}")
return RedirectResponse(url="/?error=4", status_code=303)
# Собираем ID пользователей
user_ids = set()
for item in feed_data["items"]:
user_ids.add(item["from_id"])
user_ids.add(item["owner_id"])
# Получаем информацию о пользователях
users_info = {}
try:
async with httpx.AsyncClient() as client:
users_response = await client.get(
f"{user_data['instance']}/method/users.get",
params={
"access_token": user_data["token"],
"user_ids": ",".join(map(str, user_ids)),
"fields": "first_name,last_name"
}
)
users_response.raise_for_status()
for user in users_response.json()["response"]:
users_info[user["id"]] = user
except Exception as e:
print(f"Users info error: {str(e)}")
return RedirectResponse(url="/?error=5", status_code=303)
# Формируем данные для отображения
processed_posts = []
for idx, post in enumerate(feed_data["items"]): # Добавлен enumerate
from_user = users_info.get(post["from_id"])
owner_user = users_info.get(post["owner_id"])
if not from_user or not owner_user:
continue
post_text = post.get("text", "")
if post["from_id"] == post["owner_id"]:
author_text = f"{from_user['first_name']} {from_user['last_name']} wrote:"
else:
author_text = (f"{from_user['first_name']} {from_user['last_name']} "
f"posted on {owner_user['first_name']} "
f"{owner_user['last_name']}'s wall:")
processed_posts.append({
"author": render_text(author_text),
"text": render_text(post_text),
"has_attachments": len(post.get("attachments", [])) > 0,
"likes": post.get("likes", {}).get("count", 0), # Добавляем количество лайков
"separator": render_separator() if idx < len(feed_data["items"]) - 1 else None
})
return templates.TemplateResponse("feed_global.html", {
"request": request,
"posts": processed_posts,
"current_page": page,
"has_next": "next_from" in feed_data,
"global_feed": global_feed
})
@app.get("/post", response_class=HTMLResponse)
async def post_page(request: Request):
device_uuid = request.cookies.get("webtv_uuid")
storage = load_storage()
if not device_uuid or device_uuid not in storage:
return RedirectResponse(url="/", status_code=303)
return templates.TemplateResponse("post.html", {
"request": request,
"error": request.query_params.get("error"),
"success": "success" in request.query_params
})
@app.post("/post")
async def create_post(request: Request):
device_uuid = request.cookies.get("webtv_uuid")
storage = load_storage()
if not device_uuid or device_uuid not in storage:
return RedirectResponse(url="/", status_code=303)
user_data = storage[device_uuid]
form_data = await request.form()
post_text = form_data.get("text", "").strip()
if not post_text:
return RedirectResponse(url="/post?error=empty", status_code=303)
try:
# Get owner_id from profile info
async with httpx.AsyncClient() as client:
profile_response = await client.get(
f"{user_data['instance']}/method/Account.getProfileInfo",
params={"access_token": user_data["token"]}
)
profile_response.raise_for_status()
owner_id = profile_response.json()["response"]["id"]
# Send post
post_response = await client.get(
f"{user_data['instance']}/method/wall.post",
params={
"access_token": user_data["token"],
"owner_id": owner_id,
"message": post_text
}
)
post_response.raise_for_status()
except Exception as e:
print(f"Post error: {str(e)}")
return RedirectResponse(url="/post?error=api", status_code=303)
return RedirectResponse(url="/post?success=1", status_code=303)
@app.get("/about", response_class=HTMLResponse)
async def about_page(request: Request):
return templates.TemplateResponse("about.html", {"request": request})