send notification about upcoming event

This commit is contained in:
Oleg Oleg 2025-11-09 18:19:30 +04:00
parent e20f789497
commit 9bf50b3772
7 changed files with 98 additions and 21 deletions

View File

@ -178,6 +178,7 @@ class PaymentsRegistered(Base):
class NotificationType(enum.Enum): class NotificationType(enum.Enum):
PAYMENT_RECEIVED = "payment_received" PAYMENT_RECEIVED = "payment_received"
FIRST_REMINDER_SENT = "first_reminder_sent"
class SessionNotification(Base): class SessionNotification(Base):
__tablename__ = "session_notifications" __tablename__ = "session_notifications"

View File

@ -313,7 +313,7 @@ async def pay_consultation(update: Update, context: ContextTypes.DEFAULT_TYPE) -
phone=patient.phone, phone=patient.phone,
consultation_date_time=consultation_date_time, consultation_date_time=consultation_date_time,
patient=patient, patient=patient,
doctor_id=doctor.id if doctor else None doctor=doctor
) )
except DatabaseError as e: except DatabaseError as e:
logger.error(f"Failed to create session for user {user_id}: {e}") logger.error(f"Failed to create session for user {user_id}: {e}")

View File

@ -0,0 +1,8 @@
from typing import TypedDict
from datetime import datetime
class UpcomingSessionRow(TypedDict):
date: datetime
code: str
telegram_id: int
time_zone: str

View File

@ -45,6 +45,33 @@ async def create_payment_received_once(session_code: str) -> tuple[bool, int | N
await session.commit() await session.commit()
return created, patient.telegram_id, notif_obj return created, patient.telegram_id, notif_obj
async def create_session_reminder_once(patient_id: int, session_id: int) -> tuple[bool, SessionNotification | None]:
"""
Пытается создать запись уведомления 'session_reminder' для сессии.
Возвращает:
(created: bool, chat_id: int|None, notif_obj: SessionNotification|None)
Если уже существует created=False.
chat_id telegram_id пациента, если найден.
"""
async with AsyncSessionLocal() as session:
# upsert по (session_id, type)
stmt = pg_insert(SessionNotification).values(
session_id=session_id,
patient_id=patient_id,
type=NotificationType.FIRST_REMINDER_SENT,
created_at=datetime.now(timezone.utc),
).on_conflict_do_nothing(
index_elements=[SessionNotification.session_id, SessionNotification.type]
).returning(SessionNotification)
n_res = await session.execute(stmt)
notif_obj = n_res.scalar_one_or_none()
created = notif_obj is not None
if created:
# сохраним факт создания
await session.commit()
return created, notif_obj
async def mark_notification_sent(notif_id): async def mark_notification_sent(notif_id):
async with AsyncSessionLocal() as session: async with AsyncSessionLocal() as session:
async with session.begin(): async with session.begin():

View File

@ -1,20 +1,19 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime, timezone from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from typing import Sequence from typing import Sequence
from sqlalchemy import Sequence, Row, select from sqlalchemy import Sequence, Row, select
from db.session import AsyncSessionLocal from db.session import AsyncSessionLocal
from db.models import ( from db.models import (
Sessions, Patients, SessionStatusHistory, SessionDateTimeHistory, Sessions, Patients, SessionStatusHistory, SessionDateTimeHistory,
Doctors Doctors
) )
from core.utils import (generate_session_code, date_time_formatter) from core.utils import generate_session_code, date_time_formatter
from core.logging import logger from core.logging import logger
from docbot.services.dto.sessions_dto import UpcomingSessionRow
async def create_session(telegram_id: int, phone: str, consultation_date_time: str, patient: Patients, doctor_id: str) -> str: async def create_session(telegram_id: int, phone: str, consultation_date_time: str, patient: Patients, doctor: Doctors) -> str:
""" """
Генерирует уникальный код, сохраняет его вместе с Telegram ID. Генерирует уникальный код, сохраняет его вместе с Telegram ID.
Возвращает этот код. Возвращает этот код.
@ -27,7 +26,7 @@ async def create_session(telegram_id: int, phone: str, consultation_date_time: s
code=code, code=code,
patient=patient, patient=patient,
sent_at=datetime.now(timezone.utc), sent_at=datetime.now(timezone.utc),
doctor_id=doctor_id doctor_id=doctor.id if doctor else None
) )
sessions_code_history = SessionStatusHistory( sessions_code_history = SessionStatusHistory(
@ -40,7 +39,7 @@ async def create_session(telegram_id: int, phone: str, consultation_date_time: s
sessions_date_time_history = SessionDateTimeHistory( sessions_date_time_history = SessionDateTimeHistory(
sessions=sessions, sessions=sessions,
updated_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc),
consultation_date_time=date_time_formatter(consultation_date_time), consultation_date_time=date_time_formatter(consultation_date_time).replace(tzinfo=ZoneInfo(doctor.time_zone)),
who_updated="bot" who_updated="bot"
) )
@ -66,7 +65,7 @@ async def mark_consulted(code: str) -> bool:
return True return True
async def get_all_upcomming_sessions() -> Sequence[Row[Sessions]] | bool: async def get_all_upcomming_sessions() -> Sequence[UpcomingSessionRow] | None:
""" """
Возвращает все сессии, у которых consulted_at ещё не заполнен. Возвращает все сессии, у которых consulted_at ещё не заполнен.
""" """
@ -77,7 +76,9 @@ async def get_all_upcomming_sessions() -> Sequence[Row[Sessions]] | bool:
SessionDateTimeHistory.consultation_date_time.label("date"), SessionDateTimeHistory.consultation_date_time.label("date"),
Sessions.code.label("code"), Sessions.code.label("code"),
Patients.telegram_id.label("telegram_id"), Patients.telegram_id.label("telegram_id"),
Doctors.time_zone.label("time_zone") Doctors.time_zone.label("time_zone"),
Sessions.id.label("session_id"),
Patients.id.label("patient_id"),
) )
.join(Sessions, Sessions.id == SessionDateTimeHistory.sessions_id) .join(Sessions, Sessions.id == SessionDateTimeHistory.sessions_id)
.join(Patients, Patients.id == Sessions.patient_id) .join(Patients, Patients.id == Sessions.patient_id)
@ -87,7 +88,7 @@ async def get_all_upcomming_sessions() -> Sequence[Row[Sessions]] | bool:
) )
stmt = ( stmt = (
select(q.c.telegram_id, q.c.code, q.c.date, q.c.time_zone) select(q.c.telegram_id, q.c.code, q.c.date, q.c.time_zone, q.c.session_id, q.c.patient_id)
.distinct(q.c.code) .distinct(q.c.code)
.order_by(q.c.code, q.c.date.desc()) .order_by(q.c.code, q.c.date.desc())
) )
@ -95,8 +96,8 @@ async def get_all_upcomming_sessions() -> Sequence[Row[Sessions]] | bool:
result = await session.execute(stmt) result = await session.execute(stmt)
sc = result.scalars() sc = result.scalars()
if not sc: if not sc:
return False return None
return result.all() return [UpcomingSessionRow(**m) for m in result.mappings().all()]
async def get_pending_session(telegram_id: int) -> Sessions | None: async def get_pending_session(telegram_id: int) -> Sessions | None:

View File

@ -1,6 +1,4 @@
from telegram.ext import ( from telegram.ext import ContextTypes
ContextTypes
)
from typing import Sequence, List from typing import Sequence, List
from docbot.services.payments_service import get_not_mapped_payments, update_payment_and_session from docbot.services.payments_service import get_not_mapped_payments, update_payment_and_session
from docbot.services.session_service import get_sessions_awaiting_payments from docbot.services.session_service import get_sessions_awaiting_payments
@ -9,7 +7,6 @@ from docbot.services.notifications_service import (
mark_notification_sent, mark_notification_sent,
mark_notification_error, mark_notification_error,
) )
from core.logging import logger from core.logging import logger
from sqlalchemy import Row from sqlalchemy import Row
from db.models import PaymentsRegistered, Sessions from db.models import PaymentsRegistered, Sessions

View File

@ -1,7 +1,10 @@
from telegram.ext import ( import pytz
ContextTypes from datetime import datetime
) from telegram.ext import ContextTypes
from docbot.services.session_service import get_all_upcomming_sessions from docbot.services.session_service import get_all_upcomming_sessions
from docbot.services.notifications_service import (
create_session_reminder_once, mark_notification_sent, mark_notification_error
)
from core.logging import logger from core.logging import logger
async def get_sessions_with_consultation_datetime(context: ContextTypes.DEFAULT_TYPE) -> None: async def get_sessions_with_consultation_datetime(context: ContextTypes.DEFAULT_TYPE) -> None:
@ -10,4 +13,44 @@ async def get_sessions_with_consultation_datetime(context: ContextTypes.DEFAULT_
if sessions: if sessions:
logger.info(f"Found {len(sessions)} upcoming sessions:") logger.info(f"Found {len(sessions)} upcoming sessions:")
for session in sessions: for session in sessions:
logger.info(f"Telegram: {session.telegram_id} Session code: {session.code}, datetime: {session.date}, time_zone: {session.time_zone}") now_in_tz = datetime.now(pytz.timezone(session.get('time_zone')))
session_date_in_tz: datetime = session.get('date').astimezone(tz=pytz.timezone(session.get('time_zone')))
session_code=session.get('code')
telegram_id=session.get('telegram_id')
logger.info(
f"Telegram: {telegram_id}, " \
f"Session code: {session_code}, " \
f"current time in tz: {now_in_tz}, " \
f"session time in tz: {session_date_in_tz}"
)
time_diff = session_date_in_tz - now_in_tz
logger.info(f"Time difference: {time_diff}")
patient_id=session.get('patient_id')
session_id=session.get('session_id')
if session_date_in_tz >= now_in_tz and time_diff.total_seconds() <= 3600:
logger.info("Session is upcoming")
created, notif_obj = await create_session_reminder_once(patient_id=patient_id, session_id=session_id)
if not created:
logger.info(f"Notification already exists for {session_code}, skip sending")
continue
try:
await context.bot.send_message(
chat_id=telegram_id,
text=(
f"⏰ Напоминание: ваша консультация с врачом начнётся {session_date_in_tz}.\n"
"Пожалуйста, будьте готовы и проверьте подключение к интернету.\n"
)
)
await mark_notification_sent(notif_obj.id)
logger.info(f"Sent reminder to {telegram_id} for session {session_code}")
except Exception as e:
await mark_notification_error(notif_obj.id, str(e))
logger.error(f"Error sending reminder to {telegram_id} for session {session_code}: {e}")
else:
logger.info("Session time has passed or is not within the next hour")