Files
Client/lib/state/app/modules/chat/bloc/chat_bloc.dart
T

299 lines
9.4 KiB
Dart

import 'dart:async';
import 'dart:developer';
import 'dart:math' as math;
import 'package:flutter/widgets.dart';
import '../../../../../api/errors/error_mapper.dart';
import '../../../../../api/marianumcloud/talk/chat/get_chat_response.dart';
import '../../../../../api/marianumcloud/talk/chat/long_poll_chat.dart';
import '../../../../../api/marianumcloud/talk/room/get_room_response.dart';
import '../../../../../api/marianumcloud/talk/set_read_marker/set_read_marker.dart';
import '../../../../../api/marianumcloud/talk/set_read_marker/set_read_marker_params.dart';
import '../../../infrastructure/loadable_state/loading_error.dart';
import '../../../infrastructure/utility_widgets/loadable_hydrated_bloc/loadable_hydrated_bloc.dart';
import '../../../infrastructure/utility_widgets/loadable_hydrated_bloc/loadable_hydrated_bloc_event.dart';
import '../../chat_list/bloc/chat_list_bloc.dart';
import '../repository/chat_repository.dart';
import 'chat_event.dart';
import 'chat_state.dart';
class ChatBloc
extends LoadableHydratedBloc<ChatEvent, ChatState, ChatRepository>
with WidgetsBindingObserver {
final ChatListBloc? _chatListBloc;
String? _pollingToken;
int _backoffMs = 0;
int _lastKnownMessageId = 0;
bool _appResumed = true;
/// True only while a ChatView is mounted. Can't reuse `currentToken` —
/// clearing it on leaveChat races with setToken from didPopNext when
/// popping a stacked chat, causing spurious server read-markers on resume.
bool _chatViewActive = false;
bool get hasOpenChat => _chatViewActive;
DateTime _lastTokenSet = DateTime.fromMillisecondsSinceEpoch(0);
ChatBloc({ChatListBloc? chatListBloc}) : _chatListBloc = chatListBloc {
WidgetsBinding.instance.addObserver(this);
}
@override
Future<void> close() {
WidgetsBinding.instance.removeObserver(this);
_stopLongPoll();
return super.close();
}
@override
ChatRepository repository() => ChatRepository();
@override
ChatState fromNothing() => const ChatState();
@override
ChatState fromStorage(Map<String, dynamic> json) => ChatState.fromJson(json);
@override
Map<String, dynamic>? toStorage(ChatState state) => state.toJson();
@override
Future<void> gatherData() async {
final token = innerState?.currentToken ?? '';
if (token.isEmpty) {
add(DataGathered((s) => s));
return;
}
await _loadChat(token);
}
void setToken(String token) {
_chatViewActive = true;
if (token == (innerState?.currentToken ?? '')) {
refresh();
return;
}
_stopLongPoll();
add(Emit((s) => s.copyWith(currentToken: token, chatResponse: null)));
add(RefetchStarted<ChatState>());
_scheduleLoad(token);
}
void refresh() {
final token = innerState?.currentToken ?? '';
if (token.isEmpty) return;
add(RefetchStarted<ChatState>());
_scheduleLoad(token);
}
void setReferenceMessageId(int? messageId) {
add(Emit((s) => s.copyWith(referenceMessageId: messageId)));
}
/// No-op when the bloc has already moved on to a different token: when
/// popping a stacked chat (B over A), A's didPopNext runs setToken(A)
/// before B's dispose fires.
void leaveChat(String fromToken) {
if ((innerState?.currentToken ?? '') != fromToken) return;
_chatViewActive = false;
_stopLongPoll();
}
Future<void> sendServerReadMarker(String token, int messageId) async {
try {
await SetReadMarker(
token,
true,
setReadMarkerParams: SetReadMarkerParams(lastReadMessage: messageId),
).run();
} on Object catch (e) {
log('Server read-marker for $token failed: $e');
}
}
@override
void didChangeAppLifecycleState(AppLifecycleState state) {
final wasResumed = _appResumed;
_appResumed = state == AppLifecycleState.resumed;
if (!_appResumed) {
_stopLongPoll();
return;
}
if (wasResumed) return;
final token = innerState?.currentToken ?? '';
if (token.isNotEmpty && _chatViewActive) refresh();
}
/// Microtask hop so the Bloc worker drains the preceding Emit before
/// any cache callback fires — a quick cache hit otherwise runs with
/// the previous token in state and fails stillCurrent().
void _scheduleLoad(String token) {
Future<void>.microtask(() {
if (isClosed) return;
_loadChat(token).then((_) => _startLongPoll(token));
});
}
Future<void> _loadChat(String token) async {
final requestStart = DateTime.now();
_lastTokenSet = requestStart;
bool stillCurrent() {
if (_lastTokenSet.isAfter(requestStart)) return false;
if ((innerState?.currentToken ?? '') != token) return false;
return true;
}
Object? capturedError;
try {
await repo.data.getChat(
token: token,
onCacheData: (data) {
if (!stillCurrent()) return;
// Skip cache paint over already-merged long-poll data — would
// visibly drop those messages until the network call resolves.
if (innerState?.chatResponse != null) return;
add(Emit((s) => s.copyWith(chatResponse: data)));
},
onNetworkData: (data) {
// Mark runs even if no longer current — otherwise a quick
// navigation away leaves the server cursor stale. Cache check
// skips the POST when the cursor is already at maxId.
final maxId = _maxMessageId(data);
if (maxId > 0) {
final cached = _chatListBloc?.lastReadMessageFor(token);
if (cached == null || cached < maxId) {
unawaited(sendServerReadMarker(token, maxId));
}
}
if (!stillCurrent()) return;
_applyChatResponse(data);
if (maxId > 0) _chatListBloc?.markRoomAsRead(token, maxId);
},
onError: (e) => capturedError = e,
);
} catch (e) {
capturedError = e;
}
if (!stillCurrent()) return;
if (capturedError != null) {
add(
Error(
LoadingError(
message: errorToUserMessage(capturedError),
technicalDetails: errorToTechnicalDetails(capturedError),
allowRetry: errorAllowsRetry(capturedError),
),
),
);
}
}
void _startLongPoll(String token) {
if (!_appResumed) return;
if (_pollingToken == token) return;
_stopLongPoll();
_pollingToken = token;
_backoffMs = 0;
_lastKnownMessageId = _maxMessageId(innerState?.chatResponse);
unawaited(_pollLoop(token));
}
void _stopLongPoll() {
_pollingToken = null;
_backoffMs = 0;
}
Future<void> _pollLoop(String token) async {
while (_pollingToken == token && !isClosed) {
try {
final response = await LongPollChat(
chatToken: token,
lastKnownMessageId: _lastKnownMessageId,
).run();
if (_pollingToken != token || isClosed) return;
_backoffMs = 0;
if (response == null) continue;
final headerId = int.tryParse(
response.headers?[_kLongPollLastGivenHeader] ?? '',
);
if (headerId != null && headerId > _lastKnownMessageId) {
_lastKnownMessageId = headerId;
}
if (response.data.isEmpty) continue;
_applyChatResponse(response);
final maxId = _maxMessageId(response);
if (maxId > _lastKnownMessageId) _lastKnownMessageId = maxId;
// Long-poll's setReadMarker=on moved the server cursor; mirror locally.
final preview = _pickDisplayMessage(response);
if (preview != null) {
_chatListBloc?.applyIncomingMessage(token, preview);
} else {
_chatListBloc?.markRoomAsRead(token, _lastKnownMessageId);
}
} on Object catch (e) {
if (_pollingToken != token || isClosed) return;
log('LongPoll error for $token: $e');
_backoffMs = _backoffMs == 0 ? 2000 : math.min(_backoffMs * 2, 30000);
await Future.delayed(Duration(milliseconds: _backoffMs));
}
}
}
/// Dedups by id with newer-wins so server edits/deletes propagate.
void _applyChatResponse(GetChatResponse incoming) {
final current = innerState?.chatResponse;
if (current == null) {
add(DataGathered((s) => s.copyWith(chatResponse: incoming)));
return;
}
final byId = <int, GetChatResponseObject>{};
for (final m in current.data) {
byId[m.id] = m;
}
for (final m in incoming.data) {
byId[m.id] = m;
}
final merged = GetChatResponse(byId.values.toSet())
..headers = incoming.headers;
add(DataGathered((s) => s.copyWith(chatResponse: merged)));
}
int _maxMessageId(GetChatResponse? response) {
if (response == null) return 0;
var max = 0;
for (final m in response.data) {
if (m.id > max) max = m.id;
}
return max;
}
/// Mirrors the server's own `lastMessage` selection (comments + voice only).
GetChatResponseObject? _pickDisplayMessage(GetChatResponse response) {
GetChatResponseObject? best;
for (final m in response.data) {
switch (m.messageType) {
case GetRoomResponseObjectMessageType.comment:
case GetRoomResponseObjectMessageType.voiceMessage:
if (best == null || m.id > best.id) best = m;
case GetRoomResponseObjectMessageType.deletedComment:
case GetRoomResponseObjectMessageType.system:
case GetRoomResponseObjectMessageType.command:
break;
}
}
return best;
}
}
const _kLongPollLastGivenHeader = 'x-chat-last-given';