import 'dart:async'; import 'dart:developer'; import 'dart:math' as math; import 'package:flutter/widgets.dart'; import '../../../../../api/errors/error_mapper.dart'; import '../../../../../api/marianumcloud/talk/chat/get_chat_response.dart'; import '../../../../../api/marianumcloud/talk/chat/long_poll_chat.dart'; import '../../../../../api/marianumcloud/talk/room/get_room_response.dart'; import '../../../../../api/marianumcloud/talk/set_read_marker/set_read_marker.dart'; import '../../../../../api/marianumcloud/talk/set_read_marker/set_read_marker_params.dart'; import '../../../infrastructure/loadable_state/loading_error.dart'; import '../../../infrastructure/utility_widgets/loadable_hydrated_bloc/loadable_hydrated_bloc.dart'; import '../../../infrastructure/utility_widgets/loadable_hydrated_bloc/loadable_hydrated_bloc_event.dart'; import '../../chat_list/bloc/chat_list_bloc.dart'; import '../repository/chat_repository.dart'; import 'chat_event.dart'; import 'chat_state.dart'; class ChatBloc extends LoadableHydratedBloc with WidgetsBindingObserver { final ChatListBloc? _chatListBloc; String? _pollingToken; int _backoffMs = 0; int _lastKnownMessageId = 0; bool _appResumed = true; /// True only while a ChatView is mounted. Can't reuse `currentToken` — /// clearing it on leaveChat races with setToken from didPopNext when /// popping a stacked chat, causing spurious server read-markers on resume. bool _chatViewActive = false; bool get hasOpenChat => _chatViewActive; DateTime _lastTokenSet = DateTime.fromMillisecondsSinceEpoch(0); ChatBloc({ChatListBloc? chatListBloc}) : _chatListBloc = chatListBloc { WidgetsBinding.instance.addObserver(this); } @override Future close() { WidgetsBinding.instance.removeObserver(this); _stopLongPoll(); return super.close(); } @override ChatRepository repository() => ChatRepository(); @override ChatState fromNothing() => const ChatState(); @override ChatState fromStorage(Map json) => ChatState.fromJson(json); @override Map? toStorage(ChatState state) => state.toJson(); @override Future gatherData() async { final token = innerState?.currentToken ?? ''; if (token.isEmpty) { add(DataGathered((s) => s)); return; } await _loadChat(token); } void setToken(String token) { _chatViewActive = true; if (token == (innerState?.currentToken ?? '')) { refresh(); return; } _stopLongPoll(); add(Emit((s) => s.copyWith(currentToken: token, chatResponse: null))); add(RefetchStarted()); _scheduleLoad(token); } void refresh() { final token = innerState?.currentToken ?? ''; if (token.isEmpty) return; add(RefetchStarted()); _scheduleLoad(token); } void setReferenceMessageId(int? messageId) { add(Emit((s) => s.copyWith(referenceMessageId: messageId))); } /// No-op when the bloc has already moved on to a different token: when /// popping a stacked chat (B over A), A's didPopNext runs setToken(A) /// before B's dispose fires. void leaveChat(String fromToken) { if ((innerState?.currentToken ?? '') != fromToken) return; _chatViewActive = false; _stopLongPoll(); } Future sendServerReadMarker(String token, int messageId) async { try { await SetReadMarker( token, true, setReadMarkerParams: SetReadMarkerParams(lastReadMessage: messageId), ).run(); } on Object catch (e) { log('Server read-marker for $token failed: $e'); } } @override void didChangeAppLifecycleState(AppLifecycleState state) { final wasResumed = _appResumed; _appResumed = state == AppLifecycleState.resumed; if (!_appResumed) { _stopLongPoll(); return; } if (wasResumed) return; final token = innerState?.currentToken ?? ''; if (token.isNotEmpty && _chatViewActive) refresh(); } /// Microtask hop so the Bloc worker drains the preceding Emit before /// any cache callback fires — a quick cache hit otherwise runs with /// the previous token in state and fails stillCurrent(). void _scheduleLoad(String token) { Future.microtask(() { if (isClosed) return; _loadChat(token).then((_) => _startLongPoll(token)); }); } Future _loadChat(String token) async { final requestStart = DateTime.now(); _lastTokenSet = requestStart; bool stillCurrent() { if (_lastTokenSet.isAfter(requestStart)) return false; if ((innerState?.currentToken ?? '') != token) return false; return true; } Object? capturedError; try { await repo.data.getChat( token: token, onCacheData: (data) { if (!stillCurrent()) return; // Skip cache paint over already-merged long-poll data — would // visibly drop those messages until the network call resolves. if (innerState?.chatResponse != null) return; add(Emit((s) => s.copyWith(chatResponse: data))); }, onNetworkData: (data) { // Mark runs even if no longer current — otherwise a quick // navigation away leaves the server cursor stale. Cache check // skips the POST when the cursor is already at maxId. final maxId = _maxMessageId(data); if (maxId > 0) { final cached = _chatListBloc?.lastReadMessageFor(token); if (cached == null || cached < maxId) { unawaited(sendServerReadMarker(token, maxId)); } } if (!stillCurrent()) return; _applyChatResponse(data); if (maxId > 0) _chatListBloc?.markRoomAsRead(token, maxId); }, onError: (e) => capturedError = e, ); } catch (e) { capturedError = e; } if (!stillCurrent()) return; if (capturedError != null) { add( Error( LoadingError( message: errorToUserMessage(capturedError), technicalDetails: errorToTechnicalDetails(capturedError), allowRetry: errorAllowsRetry(capturedError), ), ), ); } } void _startLongPoll(String token) { if (!_appResumed) return; if (_pollingToken == token) return; _stopLongPoll(); _pollingToken = token; _backoffMs = 0; _lastKnownMessageId = _maxMessageId(innerState?.chatResponse); unawaited(_pollLoop(token)); } void _stopLongPoll() { _pollingToken = null; _backoffMs = 0; } Future _pollLoop(String token) async { while (_pollingToken == token && !isClosed) { try { final response = await LongPollChat( chatToken: token, lastKnownMessageId: _lastKnownMessageId, ).run(); if (_pollingToken != token || isClosed) return; _backoffMs = 0; if (response == null) continue; final headerId = int.tryParse( response.headers?[_kLongPollLastGivenHeader] ?? '', ); if (headerId != null && headerId > _lastKnownMessageId) { _lastKnownMessageId = headerId; } if (response.data.isEmpty) continue; _applyChatResponse(response); final maxId = _maxMessageId(response); if (maxId > _lastKnownMessageId) _lastKnownMessageId = maxId; // Long-poll's setReadMarker=on moved the server cursor; mirror locally. final preview = _pickDisplayMessage(response); if (preview != null) { _chatListBloc?.applyIncomingMessage(token, preview); } else { _chatListBloc?.markRoomAsRead(token, _lastKnownMessageId); } } on Object catch (e) { if (_pollingToken != token || isClosed) return; log('LongPoll error for $token: $e'); _backoffMs = _backoffMs == 0 ? 2000 : math.min(_backoffMs * 2, 30000); await Future.delayed(Duration(milliseconds: _backoffMs)); } } } /// Dedups by id with newer-wins so server edits/deletes propagate. void _applyChatResponse(GetChatResponse incoming) { final current = innerState?.chatResponse; if (current == null) { add(DataGathered((s) => s.copyWith(chatResponse: incoming))); return; } final byId = {}; for (final m in current.data) { byId[m.id] = m; } for (final m in incoming.data) { byId[m.id] = m; } final merged = GetChatResponse(byId.values.toSet()) ..headers = incoming.headers; add(DataGathered((s) => s.copyWith(chatResponse: merged))); } int _maxMessageId(GetChatResponse? response) { if (response == null) return 0; var max = 0; for (final m in response.data) { if (m.id > max) max = m.id; } return max; } /// Mirrors the server's own `lastMessage` selection (comments + voice only). GetChatResponseObject? _pickDisplayMessage(GetChatResponse response) { GetChatResponseObject? best; for (final m in response.data) { switch (m.messageType) { case GetRoomResponseObjectMessageType.comment: case GetRoomResponseObjectMessageType.voiceMessage: if (best == null || m.id > best.id) best = m; case GetRoomResponseObjectMessageType.deletedComment: case GetRoomResponseObjectMessageType.system: case GetRoomResponseObjectMessageType.command: break; } } return best; } } const _kLongPollLastGivenHeader = 'x-chat-last-given';