Files
oldmarketcustomserver/main.py
drel 9eca006d8a succ
succ
2025-10-18 03:11:05 +03:00

565 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, Form, Request, status
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import create_engine, Column, Integer, String, Float, Boolean, Text, DateTime, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from datetime import datetime
import os
import json
import secrets
from typing import List, Optional
import uvicorn
from hashlib import sha256
# Настройка БД
SQLALCHEMY_DATABASE_URL = "sqlite:///./oldmarket.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Модели БД
class App(Base):
__tablename__ = "apps"
id = Column(Integer, primary_key=True, index=True)
api = Column(String(10))
apk_file = Column(String(255))
description = Column(Text)
downloads = Column(Integer, default=0)
icon = Column(String(255))
is_game = Column(Boolean, default=False)
name = Column(String(255))
package = Column(String(255))
rating = Column(Float, default=0.0)
review_count = Column(Integer, default=0)
screenshots = Column(Text)
version = Column(String(50))
versions = Column(Text)
class Review(Base):
__tablename__ = "reviews"
id = Column(Integer, primary_key=True, index=True)
app_id = Column(Integer)
avatar = Column(String(255))
comment = Column(Text)
created_at = Column(String(50))
rating = Column(Integer)
user_id = Column(Integer)
username = Column(String(255))
class AdminUser(Base):
__tablename__ = "admin_users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True)
password_hash = Column(String(255))
created_at = Column(DateTime, default=datetime.utcnow)
# Создаем таблицы
Base.metadata.create_all(bind=engine)
app = FastAPI(title="OldMarket Server")
# Создаем папки для статических файлов
os.makedirs("static/icons", exist_ok=True)
os.makedirs("static/apks", exist_ok=True)
os.makedirs("static/admin", exist_ok=True)
# Монтируем статические файлы
app.mount("/static", StaticFiles(directory="static"), name="static")
app.mount("/html/apps", StaticFiles(directory="static/icons"), name="icons")
# Настройка шаблонов
templates = Jinja2Templates(directory="templates")
# Зависимость для БД
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Аутентификация
def hash_password(password: str) -> str:
return sha256(password.encode()).hexdigest()
def verify_password(plain_password: str, hashed_password: str) -> bool:
return hash_password(plain_password) == hashed_password
# Создаем дефолтного админа если нет пользователей
def create_default_admin(db: Session):
if db.query(AdminUser).count() == 0:
default_admin = AdminUser(
username="admin",
password_hash=hash_password("admin123")
)
db.add(default_admin)
db.commit()
print("Создан дефолтный администратор: admin / admin123")
# Сессии для аутентификации
sessions = {}
def create_session(user_id: int) -> str:
session_id = secrets.token_urlsafe(32)
sessions[session_id] = {
"user_id": user_id,
"created_at": datetime.utcnow()
}
return session_id
def get_current_user(request: Request, db: Session = Depends(get_db)):
session_id = request.cookies.get("session_id")
if not session_id or session_id not in sessions:
return None
user_data = sessions[session_id]
user = db.query(AdminUser).filter(AdminUser.id == user_data["user_id"]).first()
return user
def require_auth(request: Request, db: Session = Depends(get_db)):
user = get_current_user(request, db)
if not user:
raise HTTPException(
status_code=status.HTTP_303_SEE_OTHER,
headers={"Location": "/admin/login"}
)
return user
# API endpoints для клиентов (старая совместимость)
@app.get("/api/apps")
def get_apps(db: Session = Depends(get_db)):
apps = db.query(App).all()
return [
{
**{k: v for k, v in app.__dict__.items() if k != '_sa_instance_state'},
"screenshots": json.loads(app.screenshots) if app.screenshots else [],
"versions": json.loads(app.versions) if app.versions else []
}
for app in apps
]
@app.get("/api/app/{app_id}")
def get_app(app_id: int, db: Session = Depends(get_db)):
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="App not found")
return {
**{k: v for k, v in app.__dict__.items() if k != '_sa_instance_state'},
"screenshots": json.loads(app.screenshots) if app.screenshots else [],
"versions": json.loads(app.versions) if app.versions else []
}
@app.get("/api/app/{app_id}/reviews")
def get_reviews(app_id: int, db: Session = Depends(get_db)):
reviews = db.query(Review).filter(Review.app_id == app_id).all()
return [
{k: v for k, v in review.__dict__.items() if k != '_sa_instance_state'}
for review in reviews
]
# Скачивание основной версии приложения
@app.get("/api/download/{app_id}")
def download_app(app_id: int, db: Session = Depends(get_db)):
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="App not found")
# Увеличиваем счетчик загрузок
app.downloads = (app.downloads or 0) + 1
db.commit()
apk_path = f"static/apks/{app.apk_file}"
if not os.path.exists(apk_path):
raise HTTPException(status_code=404, detail="APK file not found")
return FileResponse(
apk_path,
filename=app.apk_file,
media_type='application/vnd.android.package-archive'
)
# Скачивание конкретной версии приложения
@app.get("/api/download/{app_id}/{version}")
def download_app_version(app_id: int, version: str, db: Session = Depends(get_db)):
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="App not found")
# Ищем нужную версию в списке версий
versions = json.loads(app.versions) if app.versions else []
target_version = None
for ver in versions:
# Сравниваем версию, игнорируя регистр и пробелы
if ver.get("version", "").replace(" ", "").lower() == version.replace(" ", "").lower():
target_version = ver
break
if not target_version:
raise HTTPException(status_code=404, detail=f"Version '{version}' not found for this app")
# Увеличиваем счетчик загрузок основной версии
app.downloads = (app.downloads or 0) + 1
db.commit()
apk_filename = target_version.get("apk_file")
if not apk_filename:
raise HTTPException(status_code=404, detail="APK filename not found for this version")
apk_path = f"static/apks/{apk_filename}"
if not os.path.exists(apk_path):
raise HTTPException(status_code=404, detail="APK file not found")
return FileResponse(
apk_path,
filename=apk_filename,
media_type='application/vnd.android.package-archive'
)
# Аутентификация админ-панели
@app.get("/admin/login", response_class=HTMLResponse)
def admin_login(request: Request, db: Session = Depends(get_db)):
create_default_admin(db)
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/admin/login")
def admin_login_post(
request: Request,
username: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db)
):
user = db.query(AdminUser).filter(AdminUser.username == username).first()
if not user or not verify_password(password, user.password_hash):
return templates.TemplateResponse("login.html", {
"request": request,
"error": "Неверное имя пользователя или пароль"
})
session_id = create_session(user.id)
response = RedirectResponse(url="/admin", status_code=302)
response.set_cookie(key="session_id", value=session_id, httponly=True)
return response
@app.get("/admin/logout")
def admin_logout():
response = RedirectResponse(url="/admin/login", status_code=302)
response.delete_cookie("session_id")
return response
# Защищенные роуты админ-панели
@app.get("/admin", response_class=HTMLResponse)
def admin_panel(request: Request, user: AdminUser = Depends(require_auth), db: Session = Depends(get_db)):
apps_count = db.query(App).count()
reviews_count = db.query(Review).count()
# ИСПРАВЛЕННАЯ СТРОКА: используем func.sum для получения суммы загрузок
total_downloads_result = db.query(func.sum(App.downloads)).scalar()
total_downloads = total_downloads_result if total_downloads_result is not None else 0
return templates.TemplateResponse("admin.html", {
"request": request,
"apps_count": apps_count,
"reviews_count": reviews_count,
"total_downloads": total_downloads,
"username": user.username
})
@app.get("/admin/apps", response_class=HTMLResponse)
def admin_apps(request: Request, user: AdminUser = Depends(require_auth), db: Session = Depends(get_db)):
apps = db.query(App).all()
return templates.TemplateResponse("apps.html", {
"request": request,
"apps": apps,
"username": user.username
})
@app.get("/admin/reviews", response_class=HTMLResponse)
def admin_reviews(request: Request, user: AdminUser = Depends(require_auth), db: Session = Depends(get_db)):
reviews = db.query(Review).all()
apps = db.query(App).all()
app_names = {app.id: app.name for app in apps}
return templates.TemplateResponse("reviews.html", {
"request": request,
"reviews": reviews,
"app_names": app_names,
"apps": apps,
"username": user.username
})
# API для админ-панели (защищенные)
@app.get("/api/admin/apps")
def get_admin_apps(user: AdminUser = Depends(require_auth), db: Session = Depends(get_db)):
apps = db.query(App).all()
return apps
@app.get("/api/admin/apps/{app_id}")
def get_admin_app(app_id: int, user: AdminUser = Depends(require_auth), db: Session = Depends(get_db)):
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="App not found")
return app
@app.post("/api/admin/apps")
def create_app(
user: AdminUser = Depends(require_auth),
name: str = Form(...),
package: str = Form(...),
version: str = Form(...),
api: str = Form(...),
description: str = Form(...),
is_game: bool = Form(False),
apk_file: UploadFile = File(...),
icon: UploadFile = File(...),
db: Session = Depends(get_db)
):
# Сохраняем APK
apk_filename = f"{package}_{version.replace(' ', '_')}.apk"
apk_path = f"static/apks/{apk_filename}"
with open(apk_path, "wb") as f:
f.write(apk_file.file.read())
# Сохраняем иконку
icon_filename = f"{package}_icon.png"
icon_path = f"static/icons/{icon_filename}"
with open(icon_path, "wb") as f:
f.write(icon.file.read())
# Создаем базовую версию
base_version = {
"api": api,
"apk_file": apk_filename,
"version": version
}
# Создаем запись в БД
db_app = App(
name=name,
package=package,
version=version,
api=api,
description=description,
is_game=is_game,
apk_file=apk_filename,
icon=icon_filename,
screenshots="[]",
versions=json.dumps([base_version])
)
db.add(db_app)
db.commit()
db.refresh(db_app)
return {"message": "Приложение создано", "app_id": db_app.id}
# НОВЫЙ ENDPOINT: Обновление приложения
@app.put("/api/admin/apps/{app_id}")
def update_app(
app_id: int,
user: AdminUser = Depends(require_auth),
name: str = Form(None),
package: str = Form(None),
version: str = Form(None),
api: str = Form(None),
description: str = Form(None),
is_game: bool = Form(None),
rating: float = Form(None),
icon: UploadFile = File(None),
db: Session = Depends(get_db)
):
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="App not found")
# Обновляем поля если они переданы
if name is not None:
app.name = name
if package is not None:
app.package = package
if version is not None:
app.version = version
if api is not None:
app.api = api
if description is not None:
app.description = description
if is_game is not None:
app.is_game = is_game
if rating is not None:
app.rating = rating
# Обновляем иконку если передана
if icon and icon.filename:
icon_filename = f"{app.package}_icon.png"
icon_path = f"static/icons/{icon_filename}"
with open(icon_path, "wb") as f:
f.write(icon.file.read())
app.icon = icon_filename
db.commit()
return {"message": "Приложение обновлено"}
@app.post("/api/admin/apps/{app_id}/versions")
def add_app_version(
app_id: int,
user: AdminUser = Depends(require_auth),
version: str = Form(...),
api: str = Form(...),
apk_file: UploadFile = File(...),
db: Session = Depends(get_db)
):
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="App not found")
# Сохраняем APK
apk_filename = f"{app.package}_{version.replace(' ', '_')}.apk"
apk_path = f"static/apks/{apk_filename}"
with open(apk_path, "wb") as f:
f.write(apk_file.file.read())
# Добавляем версию
versions = json.loads(app.versions) if app.versions else []
new_version = {
"api": api,
"apk_file": apk_filename,
"version": version
}
versions.append(new_version)
app.versions = json.dumps(versions)
db.commit()
return {"message": "Версия добавлена"}
@app.delete("/api/admin/apps/{app_id}")
def delete_app(app_id: int, user: AdminUser = Depends(require_auth), db: Session = Depends(get_db)):
app = db.query(App).filter(App.id == app_id).first()
if app:
# Удаляем все APK файлы версий
versions = json.loads(app.versions) if app.versions else []
for version in versions:
apk_file = version.get("apk_file")
if apk_file and os.path.exists(f"static/apks/{apk_file}"):
os.remove(f"static/apks/{apk_file}")
# Удаляем иконку
if os.path.exists(f"static/icons/{app.icon}"):
os.remove(f"static/icons/{app.icon}")
# Удаляем связанные отзывы
db.query(Review).filter(Review.app_id == app_id).delete()
db.delete(app)
db.commit()
return {"message": "Приложение удалено"}
# НОВЫЙ ENDPOINT: Установка основной версии из списка версий
@app.post("/api/admin/apps/{app_id}/set-version")
def set_main_version(
app_id: int,
user: AdminUser = Depends(require_auth),
version: str = Form(...),
db: Session = Depends(get_db)
):
app = db.query(App).filter(App.id == app_id).first()
if not app:
raise HTTPException(status_code=404, detail="App not found")
# Ищем версию в списке версий
versions = json.loads(app.versions) if app.versions else []
target_version = None
for ver in versions:
if ver.get("version") == version:
target_version = ver
break
if not target_version:
raise HTTPException(status_code=404, detail="Version not found in app versions")
# Обновляем основную версию и APK файл
app.version = version
app.apk_file = target_version.get("apk_file")
app.api = target_version.get("api")
db.commit()
return {"message": f"Основная версия установлена на {version}"}
# Исправленный метод для добавления отзывов
@app.post("/api/admin/reviews")
def create_review(
user: AdminUser = Depends(require_auth),
app_id: int = Form(...),
username: str = Form(...),
rating: int = Form(...),
comment: str = Form(...),
avatar: UploadFile = File(None),
db: Session = Depends(get_db)
):
try:
avatar_filename = "default_avatar.png"
# Обрабатываем аватар только если он действительно загружен
if avatar and avatar.filename and avatar.filename.strip() != "":
# Проверяем, что это изображение
if not avatar.content_type or not avatar.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="Файл должен быть изображением")
# Генерируем уникальное имя файла
file_extension = os.path.splitext(avatar.filename)[1]
avatar_filename = f"avatar_{int(datetime.now().timestamp())}{file_extension}"
avatar_path = f"static/icons/{avatar_filename}"
# Сохраняем аватар
contents = avatar.file.read()
with open(avatar_path, "wb") as f:
f.write(contents)
# Создаем отзыв
db_review = Review(
app_id=app_id,
username=username,
rating=rating,
comment=comment,
avatar=avatar_filename,
created_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
user_id=1
)
db.add(db_review)
# Обновляем счетчик отзывов у приложения
app = db.query(App).filter(App.id == app_id).first()
if app:
app.review_count = db.query(Review).filter(Review.app_id == app_id).count()
db.commit()
db.refresh(db_review)
return {"message": "Отзыв создан", "review_id": db_review.id}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Ошибка при создании отзыва: {str(e)}")
@app.delete("/api/admin/reviews/{review_id}")
def delete_review(review_id: int, user: AdminUser = Depends(require_auth), db: Session = Depends(get_db)):
review = db.query(Review).filter(Review.id == review_id).first()
if review:
app_id = review.app_id
db.delete(review)
# Обновляем счетчик отзывов у приложения
app = db.query(App).filter(App.id == app_id).first()
if app:
app.review_count = db.query(Review).filter(Review.app_id == app_id).count()
db.commit()
return {"message": "Отзыв удален"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5000)