Files
telegram-bot-for-manipulate…/app/handlers/registration.py
ronis_0505 61fea795e1
Some checks failed
continuous-integration/drone/push Build is failing
env fix
2025-09-26 13:39:16 +03:00

67 lines
3.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from asyncio import Event, wait_for, TimeoutError
from aiogram import Router, Bot
from aiogram.filters import CommandStart
from aiogram.types import Message, User
from sqlalchemy import insert, select
from loguru import logger
from keyboards import create_inline_kb
from database import async_session_, Worker
from filters import IsRegister
registration_router = Router()
registration_router.message.filter(IsRegister() or CommandStart())
registration_confirm: dict[int, Event] = {}
user_info_template = ("Новый пользователь ждет регистрации:\n"
"Имя: {}\n"
"Фамилия: {}\n"
"Юзернейм: @{}\n"
"ID: @msg_{}\n")
logger.error(os.listdir("/"))
logger.error(os.listdir("/app"))
admins_ids = list(map(int, os.getenv("BOT_ADMINS").split(",")))
@registration_router.message(CommandStart())
async def start_command(message: Message, bot: Bot):
async with async_session_() as session:
result = await session.execute(select(Worker).where(Worker.telegram_id == message.from_user.id))
user = result.scalars().first()
if not user:
user = message.from_user
dict_for_inline = {f'reg_@{user.id}': 'Allow', f'del_@{user.id}': 'Reject'}
user_info = user_info_template.format(user.first_name, user.last_name if user.last_name else 'Не указана',
user.username if user.username else 'Не указан', user.id)
for admin in admins_ids:
try:
await bot.send_message(chat_id=admin, text=user_info)
await bot.send_message(chat_id=admin, text='Зарегистрировать пользователя',
reply_markup=create_inline_kb(width=2, **dict_for_inline))
await message.answer("Запрос на регистрацию отправлен администратору, ожидайте подтверждения.")
except Exception:
logger.error(f"{start_command.__name__} failed")
reg_confirm = Event()
registration_confirm[user.id] = reg_confirm
try:
await wait_for(reg_confirm.wait(), timeout=60)
async with async_session_() as local_session:
async with local_session.begin():
local_session.add(Worker(telegram_id=user.id, name=user.first_name))
await message.answer("Регистрация подтверждена, для просмотра доступных действий нажмите кнопку 'MENU'")
except TimeoutError:
await message.answer("Время ожидания истекло.")
del registration_confirm[user.id]
else:
await message.answer("Работа бота возобновлена")
@registration_router.message()
async def catch_message(message: Message):
await message.answer("Для работы с ботом, требуется регистрация\nНажмите /start для регистрации")