dfdsfsafdsfsdfasdsf
This commit is contained in:
498
main.py
Normal file
498
main.py
Normal file
@@ -0,0 +1,498 @@
|
||||
from fastapi import FastAPI, Request, Form, HTTPException
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
import uuid
|
||||
import httpx
|
||||
import json
|
||||
import threading
|
||||
import os
|
||||
import time
|
||||
from PIL import Image
|
||||
import io
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
import textwrap
|
||||
from urllib.parse import urlparse
|
||||
|
||||
app = FastAPI()
|
||||
templates = Jinja2Templates(directory="templates")
|
||||
app.mount("/static", StaticFiles(directory="static"), name="static")
|
||||
|
||||
STORAGE_FILE = "storage.json"
|
||||
os.makedirs("static/images", exist_ok=True)
|
||||
|
||||
FONT_PATH = "arial.ttf" # Загрузите шрифт и положите в корень
|
||||
LINE_SPACING = 10 # Больше пространства между строками
|
||||
FONT_SIZE = 15 # Увеличить размер шрифта
|
||||
TEXT_COLOR = (0, 0, 0) # Чёрный текст
|
||||
BG_COLOR = (255, 255, 255) # Белый фон
|
||||
|
||||
# Создаём дефолтный текст если шрифт не найден
|
||||
print('тест на файлик ноу нейм')
|
||||
if not os.path.exists("static/text/no_name.gif"):
|
||||
print('нету ураа')
|
||||
img = Image.new("RGB", (100, 20), BG_COLOR)
|
||||
draw = ImageDraw.Draw(img)
|
||||
draw.text((5, 5), "", fill=TEXT_COLOR)
|
||||
img.save("static/text/no_name.gif", "GIF")
|
||||
print('наверно всё')
|
||||
|
||||
def load_storage():
|
||||
try:
|
||||
with open(STORAGE_FILE, "r") as f:
|
||||
return json.load(f)
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
return {}
|
||||
|
||||
def save_storage(data):
|
||||
with open(STORAGE_FILE, "w") as f:
|
||||
json.dump(data, f)
|
||||
|
||||
async def process_avatar(image_url: str, user_id: int):
|
||||
try:
|
||||
async with httpx.AsyncClient(follow_redirects=True) as client:
|
||||
response = await client.get(image_url)
|
||||
response.raise_for_status()
|
||||
|
||||
# Извлекаем реальный ID пользователя из URL инстанса
|
||||
parsed_url = urlparse(image_url)
|
||||
filename = f"{user_id}.gif"
|
||||
filepath = f"static/images/{filename}"
|
||||
|
||||
img = Image.open(io.BytesIO(response.content))
|
||||
img = img.resize((128, 128)).convert("RGB")
|
||||
img.save(filepath, "GIF")
|
||||
|
||||
return f"/static/images/{filename}"
|
||||
except Exception as e:
|
||||
print(f"Avatar error: {str(e)}")
|
||||
# Создаём дефолтную аватарку если её нет
|
||||
default_path = "static/images/default.gif"
|
||||
if not os.path.exists(default_path):
|
||||
img = Image.new("RGB", (128, 128), color="gray")
|
||||
img.save(default_path, "GIF")
|
||||
return default_path
|
||||
|
||||
def cleanup_file(file_path):
|
||||
def delete_file():
|
||||
try:
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
print(f"Deleted {file_path}")
|
||||
except Exception as e:
|
||||
print(f"Error deleting {file_path}: {str(e)}")
|
||||
|
||||
# Запускаем удаление через 5 секунд в отдельном потоке
|
||||
timer = threading.Timer(5.0, delete_file)
|
||||
timer.start()
|
||||
|
||||
def render_text(text: str, max_width: int = 480) -> str:
|
||||
if isinstance(text, int): # Добавляем обработку чисел
|
||||
text = str(text)
|
||||
try:
|
||||
text = translit(text, 'ru', reversed=True)
|
||||
except:
|
||||
pass
|
||||
|
||||
if not text.strip():
|
||||
return "/static/text/no_name.gif"
|
||||
|
||||
try:
|
||||
font = ImageFont.truetype(FONT_PATH, FONT_SIZE)
|
||||
except IOError:
|
||||
font = ImageFont.load_default()
|
||||
|
||||
# Новый метод расчёта размеров
|
||||
def get_text_size(fnt, txt):
|
||||
bbox = fnt.getbbox(txt)
|
||||
return (bbox[2] - bbox[0], bbox[3] - bbox[1])
|
||||
|
||||
# Разбиваем текст на строки
|
||||
wrapper = textwrap.TextWrapper(width=max_width // (FONT_SIZE))
|
||||
lines = wrapper.wrap(text)
|
||||
|
||||
# Рассчитываем размер изображения
|
||||
line_height = get_text_size(font, "A")[1] + LINE_SPACING # Добавляем spacing к высоте линии
|
||||
img_width = max(get_text_size(font, line)[0] for line in lines) + 10
|
||||
img_height = (line_height * len(lines)) + 10 # Учитываем суммарный spacing
|
||||
|
||||
# Создаём изображение
|
||||
img = Image.new("RGB", (img_width, img_height), BG_COLOR)
|
||||
draw = ImageDraw.Draw(img)
|
||||
|
||||
# Рендерим текст
|
||||
y = 5
|
||||
for line in lines:
|
||||
draw.text((5, y), line, font=font, fill=TEXT_COLOR)
|
||||
y += line_height
|
||||
|
||||
# Сохраняем в static/text
|
||||
os.makedirs("static/text", exist_ok=True)
|
||||
filename = f"{hash(text)}.gif"
|
||||
img_path = f"static/text/{filename}"
|
||||
img.save(img_path, "GIF")
|
||||
|
||||
full_path = os.path.abspath(img_path)
|
||||
cleanup_file(full_path) # Добавляем в очередь на удаление
|
||||
|
||||
return f"/{img_path}"
|
||||
|
||||
def render_separator():
|
||||
separator = "-" * 32 # 40 дефисов
|
||||
return render_text(separator)
|
||||
|
||||
templates.env.globals["render_text"] = render_text
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def login_page(request: Request):
|
||||
return templates.TemplateResponse("index.html", {"request": request})
|
||||
|
||||
@app.post("/auth")
|
||||
async def authenticate(
|
||||
request: Request,
|
||||
instance: str = Form(...),
|
||||
username: str = Form(...),
|
||||
password: str = Form(...),
|
||||
):
|
||||
if not all([instance, username, password]):
|
||||
return RedirectResponse(url="/?error=1", status_code=303)
|
||||
|
||||
instance = instance if instance.startswith(("http://", "https://")) else f"http://{instance}"
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
f"{instance.rstrip('/')}/token",
|
||||
params={
|
||||
"username": username,
|
||||
"password": password,
|
||||
"grant_type": "password",
|
||||
"client_name": "ovk4webtv"
|
||||
}
|
||||
)
|
||||
response.raise_for_status()
|
||||
token_data = response.json()
|
||||
except Exception as e:
|
||||
return RedirectResponse(url="/?error=2", status_code=303)
|
||||
|
||||
storage = load_storage()
|
||||
device_uuid = str(uuid.uuid4())
|
||||
|
||||
storage[device_uuid] = {
|
||||
"token": token_data["access_token"],
|
||||
"instance": instance.rstrip('/'), # Сохраняем нормализованный URL
|
||||
"user_id": token_data["user_id"]
|
||||
}
|
||||
|
||||
save_storage(storage)
|
||||
|
||||
response = RedirectResponse(url="/profile", status_code=303)
|
||||
response.set_cookie(key="webtv_uuid", value=device_uuid)
|
||||
return response
|
||||
|
||||
@app.get("/profile", response_class=HTMLResponse)
|
||||
async def profile_page(request: Request):
|
||||
device_uuid = request.cookies.get("webtv_uuid")
|
||||
storage = load_storage()
|
||||
|
||||
if not device_uuid or device_uuid not in storage:
|
||||
return RedirectResponse(url="/", status_code=303)
|
||||
|
||||
user_data = storage[device_uuid]
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
# Получаем базовую информацию профиля
|
||||
profile_response = await client.get(
|
||||
f"{user_data['instance'].rstrip('/')}/method/Account.getProfileInfo",
|
||||
params={"access_token": user_data["token"]}
|
||||
)
|
||||
profile_response.raise_for_status()
|
||||
profile_data = profile_response.json()["response"]
|
||||
|
||||
# Получаем полную информацию о пользователе
|
||||
users_response = await client.get(
|
||||
f"{user_data['instance'].rstrip('/')}/method/users.get",
|
||||
params={
|
||||
"access_token": user_data["token"],
|
||||
"user_ids": profile_data["id"],
|
||||
"fields": "photo_200"
|
||||
}
|
||||
)
|
||||
users_response.raise_for_status()
|
||||
user_info = users_response.json()["response"][0]
|
||||
except Exception as e:
|
||||
print(f"Profile error: {str(e)}")
|
||||
return RedirectResponse(url="/?error=3", status_code=303)
|
||||
|
||||
# Используем ID из профиля вместо user_id из токена
|
||||
avatar_url = await process_avatar(user_info["photo_200"], user_info["id"])
|
||||
|
||||
return templates.TemplateResponse("profile.html", {
|
||||
"request": request,
|
||||
"first_name_img": render_text(profile_data.get("first_name", "")),
|
||||
"last_name_img": render_text(profile_data.get("last_name", "")),
|
||||
"user_id": user_info["id"],
|
||||
"avatar": avatar_url
|
||||
})
|
||||
|
||||
# Остальные обработчики остаются без изменений
|
||||
|
||||
@app.get("/token", response_class=HTMLResponse)
|
||||
async def show_token(request: Request):
|
||||
device_uuid = request.cookies.get("webtv_uuid")
|
||||
if not device_uuid or device_uuid not in tokens:
|
||||
raise HTTPException(status_code=400, detail="Сессия не найдена")
|
||||
|
||||
return templates.TemplateResponse(
|
||||
"token.html",
|
||||
{
|
||||
"request": request,
|
||||
"token": tokens[device_uuid],
|
||||
"uuid": device_uuid
|
||||
}
|
||||
)
|
||||
|
||||
@app.get("/feed", response_class=HTMLResponse)
|
||||
async def news_feed(
|
||||
request: Request,
|
||||
page: int = 0,
|
||||
global_feed: bool = False
|
||||
):
|
||||
device_uuid = request.cookies.get("webtv_uuid")
|
||||
storage = load_storage()
|
||||
|
||||
# Проверка авторизации
|
||||
if not device_uuid or device_uuid not in storage:
|
||||
return RedirectResponse(url="/", status_code=303)
|
||||
|
||||
user_data = storage[device_uuid]
|
||||
|
||||
try:
|
||||
# Получаем ленту новостей
|
||||
method = "newsfeed.getGlobal" if global_feed else "newsfeed.get"
|
||||
async with httpx.AsyncClient() as client:
|
||||
feed_response = await client.get(
|
||||
f"{user_data['instance']}/method/{method}",
|
||||
params={
|
||||
"access_token": user_data["token"],
|
||||
"count": 5,
|
||||
"offset": page * 5
|
||||
}
|
||||
)
|
||||
feed_response.raise_for_status()
|
||||
feed_data = feed_response.json()["response"]
|
||||
except Exception as e:
|
||||
print(f"Feed error: {str(e)}")
|
||||
return RedirectResponse(url="/?error=4", status_code=303)
|
||||
|
||||
# Собираем ID пользователей
|
||||
user_ids = set()
|
||||
for item in feed_data["items"]:
|
||||
user_ids.add(item["from_id"])
|
||||
user_ids.add(item["owner_id"])
|
||||
|
||||
# Получаем информацию о пользователях
|
||||
users_info = {}
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
users_response = await client.get(
|
||||
f"{user_data['instance']}/method/users.get",
|
||||
params={
|
||||
"access_token": user_data["token"],
|
||||
"user_ids": ",".join(map(str, user_ids)),
|
||||
"fields": "first_name,last_name"
|
||||
}
|
||||
)
|
||||
users_response.raise_for_status()
|
||||
for user in users_response.json()["response"]:
|
||||
users_info[user["id"]] = user
|
||||
except Exception as e:
|
||||
print(f"Users info error: {str(e)}")
|
||||
return RedirectResponse(url="/?error=5", status_code=303)
|
||||
|
||||
# Формируем данные для отображения
|
||||
processed_posts = []
|
||||
for idx, post in enumerate(feed_data["items"]): # Добавлен enumerate
|
||||
from_user = users_info.get(post["from_id"])
|
||||
owner_user = users_info.get(post["owner_id"])
|
||||
|
||||
if not from_user or not owner_user:
|
||||
continue
|
||||
|
||||
post_text = post.get("text", "")
|
||||
|
||||
if post["from_id"] == post["owner_id"]:
|
||||
author_text = f"{from_user['first_name']} {from_user['last_name']} wrote:"
|
||||
else:
|
||||
author_text = (f"{from_user['first_name']} {from_user['last_name']} "
|
||||
f"posted on {owner_user['first_name']} "
|
||||
f"{owner_user['last_name']}'s wall:")
|
||||
|
||||
processed_posts.append({
|
||||
"author": render_text(author_text),
|
||||
"text": render_text(post_text),
|
||||
"has_attachments": len(post.get("attachments", [])) > 0,
|
||||
"likes": post.get("likes", {}).get("count", 0), # Добавляем количество лайков
|
||||
"separator": render_separator() if idx < len(feed_data["items"]) - 1 else None
|
||||
})
|
||||
|
||||
return templates.TemplateResponse("feed.html", {
|
||||
"request": request,
|
||||
"posts": processed_posts,
|
||||
"current_page": page,
|
||||
"has_next": "next_from" in feed_data,
|
||||
"global_feed": global_feed
|
||||
})
|
||||
|
||||
@app.get("/feed_global", response_class=HTMLResponse)
|
||||
async def global_news_feed(
|
||||
request: Request,
|
||||
page: int = 0,
|
||||
global_feed: bool = False
|
||||
):
|
||||
device_uuid = request.cookies.get("webtv_uuid")
|
||||
storage = load_storage()
|
||||
|
||||
# Проверка авторизации
|
||||
if not device_uuid or device_uuid not in storage:
|
||||
return RedirectResponse(url="/", status_code=303)
|
||||
|
||||
user_data = storage[device_uuid]
|
||||
|
||||
try:
|
||||
# Получаем ленту новостей
|
||||
method = "newsfeed.getGlobal"
|
||||
async with httpx.AsyncClient() as client:
|
||||
feed_response = await client.get(
|
||||
f"{user_data['instance']}/method/{method}",
|
||||
params={
|
||||
"access_token": user_data["token"],
|
||||
"count": 5,
|
||||
"offset": page * 5
|
||||
}
|
||||
)
|
||||
feed_response.raise_for_status()
|
||||
feed_data = feed_response.json()["response"]
|
||||
except Exception as e:
|
||||
print(f"Feed error: {str(e)}")
|
||||
return RedirectResponse(url="/?error=4", status_code=303)
|
||||
|
||||
# Собираем ID пользователей
|
||||
user_ids = set()
|
||||
for item in feed_data["items"]:
|
||||
user_ids.add(item["from_id"])
|
||||
user_ids.add(item["owner_id"])
|
||||
|
||||
# Получаем информацию о пользователях
|
||||
users_info = {}
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
users_response = await client.get(
|
||||
f"{user_data['instance']}/method/users.get",
|
||||
params={
|
||||
"access_token": user_data["token"],
|
||||
"user_ids": ",".join(map(str, user_ids)),
|
||||
"fields": "first_name,last_name"
|
||||
}
|
||||
)
|
||||
users_response.raise_for_status()
|
||||
for user in users_response.json()["response"]:
|
||||
users_info[user["id"]] = user
|
||||
except Exception as e:
|
||||
print(f"Users info error: {str(e)}")
|
||||
return RedirectResponse(url="/?error=5", status_code=303)
|
||||
|
||||
# Формируем данные для отображения
|
||||
processed_posts = []
|
||||
for idx, post in enumerate(feed_data["items"]): # Добавлен enumerate
|
||||
from_user = users_info.get(post["from_id"])
|
||||
owner_user = users_info.get(post["owner_id"])
|
||||
|
||||
if not from_user or not owner_user:
|
||||
continue
|
||||
|
||||
post_text = post.get("text", "")
|
||||
|
||||
if post["from_id"] == post["owner_id"]:
|
||||
author_text = f"{from_user['first_name']} {from_user['last_name']} wrote:"
|
||||
else:
|
||||
author_text = (f"{from_user['first_name']} {from_user['last_name']} "
|
||||
f"posted on {owner_user['first_name']} "
|
||||
f"{owner_user['last_name']}'s wall:")
|
||||
|
||||
processed_posts.append({
|
||||
"author": render_text(author_text),
|
||||
"text": render_text(post_text),
|
||||
"has_attachments": len(post.get("attachments", [])) > 0,
|
||||
"likes": post.get("likes", {}).get("count", 0), # Добавляем количество лайков
|
||||
"separator": render_separator() if idx < len(feed_data["items"]) - 1 else None
|
||||
})
|
||||
|
||||
return templates.TemplateResponse("feed_global.html", {
|
||||
"request": request,
|
||||
"posts": processed_posts,
|
||||
"current_page": page,
|
||||
"has_next": "next_from" in feed_data,
|
||||
"global_feed": global_feed
|
||||
})
|
||||
|
||||
@app.get("/post", response_class=HTMLResponse)
|
||||
async def post_page(request: Request):
|
||||
device_uuid = request.cookies.get("webtv_uuid")
|
||||
storage = load_storage()
|
||||
|
||||
if not device_uuid or device_uuid not in storage:
|
||||
return RedirectResponse(url="/", status_code=303)
|
||||
|
||||
return templates.TemplateResponse("post.html", {
|
||||
"request": request,
|
||||
"error": request.query_params.get("error"),
|
||||
"success": "success" in request.query_params
|
||||
})
|
||||
|
||||
@app.post("/post")
|
||||
async def create_post(request: Request):
|
||||
device_uuid = request.cookies.get("webtv_uuid")
|
||||
storage = load_storage()
|
||||
|
||||
if not device_uuid or device_uuid not in storage:
|
||||
return RedirectResponse(url="/", status_code=303)
|
||||
|
||||
user_data = storage[device_uuid]
|
||||
form_data = await request.form()
|
||||
post_text = form_data.get("text", "").strip()
|
||||
|
||||
if not post_text:
|
||||
return RedirectResponse(url="/post?error=empty", status_code=303)
|
||||
|
||||
try:
|
||||
# Get owner_id from profile info
|
||||
async with httpx.AsyncClient() as client:
|
||||
profile_response = await client.get(
|
||||
f"{user_data['instance']}/method/Account.getProfileInfo",
|
||||
params={"access_token": user_data["token"]}
|
||||
)
|
||||
profile_response.raise_for_status()
|
||||
owner_id = profile_response.json()["response"]["id"]
|
||||
|
||||
# Send post
|
||||
post_response = await client.get(
|
||||
f"{user_data['instance']}/method/wall.post",
|
||||
params={
|
||||
"access_token": user_data["token"],
|
||||
"owner_id": owner_id,
|
||||
"message": post_text
|
||||
}
|
||||
)
|
||||
post_response.raise_for_status()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Post error: {str(e)}")
|
||||
return RedirectResponse(url="/post?error=api", status_code=303)
|
||||
|
||||
return RedirectResponse(url="/post?success=1", status_code=303)
|
||||
|
||||
@app.get("/about", response_class=HTMLResponse)
|
||||
async def about_page(request: Request):
|
||||
return templates.TemplateResponse("about.html", {"request": request})
|
||||
Reference in New Issue
Block a user