implemented chat long-polling and optimistic updates, centralized notification management, optimized avatar caching
This commit is contained in:
@@ -1,15 +1,54 @@
|
||||
import 'dart:async';
|
||||
import 'dart:developer';
|
||||
import 'dart:math' as math;
|
||||
|
||||
import 'package:flutter/widgets.dart';
|
||||
|
||||
import '../../../../../api/errors/error_mapper.dart';
|
||||
import '../../../../../api/marianumcloud/talk/chat/get_chat_response.dart';
|
||||
import '../../../../../api/marianumcloud/talk/chat/long_poll_chat.dart';
|
||||
import '../../../../../api/marianumcloud/talk/room/get_room_response.dart';
|
||||
import '../../../../../api/marianumcloud/talk/set_read_marker/set_read_marker.dart';
|
||||
import '../../../../../api/marianumcloud/talk/set_read_marker/set_read_marker_params.dart';
|
||||
import '../../../infrastructure/loadable_state/loading_error.dart';
|
||||
import '../../../infrastructure/utility_widgets/loadable_hydrated_bloc/loadable_hydrated_bloc.dart';
|
||||
import '../../../infrastructure/utility_widgets/loadable_hydrated_bloc/loadable_hydrated_bloc_event.dart';
|
||||
import '../../chat_list/bloc/chat_list_bloc.dart';
|
||||
import '../repository/chat_repository.dart';
|
||||
import 'chat_event.dart';
|
||||
import 'chat_state.dart';
|
||||
|
||||
class ChatBloc
|
||||
extends LoadableHydratedBloc<ChatEvent, ChatState, ChatRepository> {
|
||||
extends LoadableHydratedBloc<ChatEvent, ChatState, ChatRepository>
|
||||
with WidgetsBindingObserver {
|
||||
final ChatListBloc? _chatListBloc;
|
||||
|
||||
String? _pollingToken;
|
||||
int _backoffMs = 0;
|
||||
int _lastKnownMessageId = 0;
|
||||
bool _appResumed = true;
|
||||
|
||||
/// Distinguishes "the bloc tracks a chat the user has open" from "the
|
||||
/// bloc remembers the last opened chat". App-resume only refreshes when
|
||||
/// true — otherwise we'd silently mark a long-since-left chat as read
|
||||
/// on the server. Can't reuse `currentToken` for this signal because
|
||||
/// clearing it on leaveChat raced with setToken-from-didPopNext when
|
||||
/// popping a stacked chat.
|
||||
bool _chatViewActive = false;
|
||||
|
||||
DateTime _lastTokenSet = DateTime.fromMillisecondsSinceEpoch(0);
|
||||
|
||||
ChatBloc({ChatListBloc? chatListBloc}) : _chatListBloc = chatListBloc {
|
||||
WidgetsBinding.instance.addObserver(this);
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> close() {
|
||||
WidgetsBinding.instance.removeObserver(this);
|
||||
_stopLongPoll();
|
||||
return super.close();
|
||||
}
|
||||
|
||||
@override
|
||||
ChatRepository repository() => ChatRepository();
|
||||
|
||||
@@ -33,24 +72,74 @@ class ChatBloc
|
||||
}
|
||||
|
||||
void setToken(String token) {
|
||||
_chatViewActive = true;
|
||||
if (token == (innerState?.currentToken ?? '')) {
|
||||
refresh();
|
||||
return;
|
||||
}
|
||||
_stopLongPoll();
|
||||
add(Emit((s) => s.copyWith(currentToken: token, chatResponse: null)));
|
||||
add(RefetchStarted<ChatState>());
|
||||
_loadChat(token);
|
||||
}
|
||||
|
||||
void setReferenceMessageId(int? messageId) {
|
||||
add(Emit((s) => s.copyWith(referenceMessageId: messageId)));
|
||||
_scheduleLoad(token);
|
||||
}
|
||||
|
||||
void refresh() {
|
||||
final token = innerState?.currentToken ?? '';
|
||||
if (token.isEmpty) return;
|
||||
add(RefetchStarted<ChatState>());
|
||||
_loadChat(token);
|
||||
_scheduleLoad(token);
|
||||
}
|
||||
|
||||
void setReferenceMessageId(int? messageId) {
|
||||
add(Emit((s) => s.copyWith(referenceMessageId: messageId)));
|
||||
}
|
||||
|
||||
/// Token-aware: only acts when the bloc still points at [fromToken].
|
||||
/// When popping a stacked chat (notification opened B over A), A's
|
||||
/// didPopNext has already run setToken(A) by the time B's dispose
|
||||
/// fires — at that point currentToken is A and we must leave it alone.
|
||||
void leaveChat(String fromToken) {
|
||||
if ((innerState?.currentToken ?? '') != fromToken) return;
|
||||
_chatViewActive = false;
|
||||
_stopLongPoll();
|
||||
}
|
||||
|
||||
/// Fire-and-forget server-side read-marker. Exposed so view-side
|
||||
/// callers (long-press menu, ChatView dispose) hit the same path.
|
||||
Future<void> sendServerReadMarker(String token, int messageId) async {
|
||||
try {
|
||||
await SetReadMarker(
|
||||
token,
|
||||
true,
|
||||
setReadMarkerParams: SetReadMarkerParams(lastReadMessage: messageId),
|
||||
).run();
|
||||
} on Object catch (e) {
|
||||
log('Server read-marker for $token failed: $e');
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
void didChangeAppLifecycleState(AppLifecycleState state) {
|
||||
final wasResumed = _appResumed;
|
||||
_appResumed = state == AppLifecycleState.resumed;
|
||||
if (!_appResumed) {
|
||||
_stopLongPoll();
|
||||
return;
|
||||
}
|
||||
if (wasResumed) return;
|
||||
final token = innerState?.currentToken ?? '';
|
||||
if (token.isNotEmpty && _chatViewActive) refresh();
|
||||
}
|
||||
|
||||
/// Defer _loadChat by one microtask so the Bloc worker processes the
|
||||
/// preceding Emit/RefetchStarted before any cache/network callback
|
||||
/// fires — otherwise a quick cache hit can run with the previous
|
||||
/// token in state, fail stillCurrent(), and never emit a DataGathered.
|
||||
void _scheduleLoad(String token) {
|
||||
Future<void>.microtask(() {
|
||||
if (isClosed) return;
|
||||
_loadChat(token).then((_) => _startLongPoll(token));
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> _loadChat(String token) async {
|
||||
@@ -69,14 +158,21 @@ class ChatBloc
|
||||
token: token,
|
||||
onCacheData: (data) {
|
||||
if (!stillCurrent()) return;
|
||||
// Cache hit: show data immediately but preserve lastFetch — the
|
||||
// cached payload may be stale and we don't want the UI to claim a
|
||||
// fresh fetch just happened.
|
||||
// Only paint cache when the state is empty — restoring a stale
|
||||
// disk snapshot over already-merged long-poll data would visibly
|
||||
// drop those messages until the network call resolves.
|
||||
if (innerState?.chatResponse != null) return;
|
||||
add(Emit((s) => s.copyWith(chatResponse: data)));
|
||||
},
|
||||
onNetworkData: (data) {
|
||||
// Server-side mark runs unconditionally with the freshly-fetched
|
||||
// maxId. Skipping it on stillCurrent==false would leave the
|
||||
// server cursor wherever a quick navigation away left it.
|
||||
final maxId = _maxMessageId(data);
|
||||
if (maxId > 0) unawaited(sendServerReadMarker(token, maxId));
|
||||
if (!stillCurrent()) return;
|
||||
add(DataGathered((s) => s.copyWith(chatResponse: data)));
|
||||
_applyChatResponse(data);
|
||||
if (maxId > 0) _chatListBloc?.markRoomAsRead(token, maxId);
|
||||
},
|
||||
onError: (e) => capturedError = e,
|
||||
);
|
||||
@@ -98,4 +194,115 @@ class ChatBloc
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Long-poll loop
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
void _startLongPoll(String token) {
|
||||
if (!_appResumed) return;
|
||||
if (_pollingToken == token) return;
|
||||
_stopLongPoll();
|
||||
_pollingToken = token;
|
||||
_backoffMs = 0;
|
||||
_lastKnownMessageId = _maxMessageId(innerState?.chatResponse);
|
||||
unawaited(_pollLoop(token));
|
||||
}
|
||||
|
||||
void _stopLongPoll() {
|
||||
_pollingToken = null;
|
||||
_backoffMs = 0;
|
||||
}
|
||||
|
||||
Future<void> _pollLoop(String token) async {
|
||||
while (_pollingToken == token && !isClosed) {
|
||||
try {
|
||||
final response = await LongPollChat(
|
||||
chatToken: token,
|
||||
lastKnownMessageId: _lastKnownMessageId,
|
||||
).run();
|
||||
|
||||
if (_pollingToken != token || isClosed) return;
|
||||
_backoffMs = 0;
|
||||
|
||||
if (response == null) continue;
|
||||
|
||||
final headerId = int.tryParse(
|
||||
response.headers?[_kLongPollLastGivenHeader] ?? '',
|
||||
);
|
||||
if (headerId != null && headerId > _lastKnownMessageId) {
|
||||
_lastKnownMessageId = headerId;
|
||||
}
|
||||
|
||||
if (response.data.isEmpty) continue;
|
||||
_applyChatResponse(response);
|
||||
final maxId = _maxMessageId(response);
|
||||
if (maxId > _lastKnownMessageId) _lastKnownMessageId = maxId;
|
||||
// Long-poll's setReadMarker=on already moved the server cursor;
|
||||
// mirror locally.
|
||||
final preview = _pickDisplayMessage(response);
|
||||
if (preview != null) {
|
||||
_chatListBloc?.applyIncomingMessage(token, preview);
|
||||
} else {
|
||||
_chatListBloc?.markRoomAsRead(token, _lastKnownMessageId);
|
||||
}
|
||||
} on Object catch (e) {
|
||||
if (_pollingToken != token || isClosed) return;
|
||||
log('LongPoll error for $token: $e');
|
||||
_backoffMs = _backoffMs == 0 ? 2000 : math.min(_backoffMs * 2, 30000);
|
||||
await Future.delayed(Duration(milliseconds: _backoffMs));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Merges [incoming] into the existing chatResponse and emits as a
|
||||
/// fresh fetch. Dedups by id (newer wins, so server edits/deletes
|
||||
/// propagate). Shared by initial-load and long-poll so neither wipes
|
||||
/// messages the other already committed.
|
||||
void _applyChatResponse(GetChatResponse incoming) {
|
||||
final current = innerState?.chatResponse;
|
||||
if (current == null) {
|
||||
add(DataGathered((s) => s.copyWith(chatResponse: incoming)));
|
||||
return;
|
||||
}
|
||||
final byId = <int, GetChatResponseObject>{};
|
||||
for (final m in current.data) {
|
||||
byId[m.id] = m;
|
||||
}
|
||||
for (final m in incoming.data) {
|
||||
byId[m.id] = m;
|
||||
}
|
||||
final merged = GetChatResponse(byId.values.toSet())
|
||||
..headers = incoming.headers;
|
||||
add(DataGathered((s) => s.copyWith(chatResponse: merged)));
|
||||
}
|
||||
|
||||
int _maxMessageId(GetChatResponse? response) {
|
||||
if (response == null) return 0;
|
||||
var max = 0;
|
||||
for (final m in response.data) {
|
||||
if (m.id > max) max = m.id;
|
||||
}
|
||||
return max;
|
||||
}
|
||||
|
||||
/// Highest-id message worth showing as the room preview — comments
|
||||
/// and voice messages, matching what the server picks for `lastMessage`.
|
||||
GetChatResponseObject? _pickDisplayMessage(GetChatResponse response) {
|
||||
GetChatResponseObject? best;
|
||||
for (final m in response.data) {
|
||||
switch (m.messageType) {
|
||||
case GetRoomResponseObjectMessageType.comment:
|
||||
case GetRoomResponseObjectMessageType.voiceMessage:
|
||||
if (best == null || m.id > best.id) best = m;
|
||||
case GetRoomResponseObjectMessageType.deletedComment:
|
||||
case GetRoomResponseObjectMessageType.system:
|
||||
case GetRoomResponseObjectMessageType.command:
|
||||
break;
|
||||
}
|
||||
}
|
||||
return best;
|
||||
}
|
||||
}
|
||||
|
||||
const _kLongPollLastGivenHeader = 'x-chat-last-given';
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import 'dart:async';
|
||||
import 'dart:developer';
|
||||
|
||||
import 'package:flutter_app_badge/flutter_app_badge.dart';
|
||||
|
||||
import '../../../../../api/errors/error_mapper.dart';
|
||||
import '../../../../../api/marianumcloud/talk/chat/get_chat_response.dart';
|
||||
import '../../../../../api/marianumcloud/talk/room/get_room_response.dart';
|
||||
import '../../../infrastructure/loadable_state/loading_error.dart';
|
||||
import '../../../infrastructure/utility_widgets/loadable_hydrated_bloc/loadable_hydrated_bloc.dart';
|
||||
@@ -15,6 +17,8 @@ class ChatListBloc
|
||||
extends
|
||||
LoadableHydratedBloc<ChatListEvent, ChatListState, ChatListRepository> {
|
||||
bool _forceRenew = false;
|
||||
Timer? _autoRefreshTimer;
|
||||
Duration? _autoRefreshInterval;
|
||||
|
||||
@override
|
||||
void retry() {
|
||||
@@ -22,6 +26,27 @@ class ChatListBloc
|
||||
super.retry();
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> close() {
|
||||
_autoRefreshTimer?.cancel();
|
||||
return super.close();
|
||||
}
|
||||
|
||||
/// Sets (or clears) the recurring background refresh. Silent so the
|
||||
/// loading bar doesn't blink several times a minute; pull-to-refresh
|
||||
/// and tab-activation refreshes are non-silent for explicit feedback.
|
||||
void setAutoRefreshInterval(Duration? interval) {
|
||||
if (interval == _autoRefreshInterval) return;
|
||||
_autoRefreshInterval = interval;
|
||||
_autoRefreshTimer?.cancel();
|
||||
_autoRefreshTimer = null;
|
||||
if (interval == null) return;
|
||||
_autoRefreshTimer = Timer.periodic(interval, (_) {
|
||||
if (isClosed) return;
|
||||
refresh(silent: true);
|
||||
});
|
||||
}
|
||||
|
||||
@override
|
||||
ChatListRepository repository() => ChatListRepository();
|
||||
|
||||
@@ -51,8 +76,8 @@ class ChatListBloc
|
||||
if (capturedError != null) throw capturedError!;
|
||||
}
|
||||
|
||||
Future<void> refresh({bool renew = true}) async {
|
||||
add(RefetchStarted<ChatListState>());
|
||||
Future<void> refresh({bool renew = true, bool silent = false}) async {
|
||||
if (!silent) add(RefetchStarted<ChatListState>());
|
||||
Object? capturedError;
|
||||
try {
|
||||
final rooms = await repo.data.getRooms(
|
||||
@@ -82,6 +107,64 @@ class ChatListBloc
|
||||
await refresh();
|
||||
}
|
||||
|
||||
/// Optimistically clears the unread counter for [token] so the tile
|
||||
/// reacts before a refresh roundtrip lands. Server-side mark-as-read
|
||||
/// is the caller's job (see [ChatBloc.sendServerReadMarker]).
|
||||
void markRoomAsRead(String token, int lastMessageId) {
|
||||
_mutateRoom(token, (r) {
|
||||
if (r.unreadMessages == 0 && r.lastReadMessage >= lastMessageId) {
|
||||
return false;
|
||||
}
|
||||
r.unreadMessages = 0;
|
||||
r.unreadMention = false;
|
||||
r.unreadMentionDirect = false;
|
||||
if (lastMessageId > r.lastReadMessage) r.lastReadMessage = lastMessageId;
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
/// Pushes a freshly-received message into the matching room tile so the
|
||||
/// list shows the right preview text + activity timestamp before the
|
||||
/// next full refresh lands. Also clears unread because the long-poll
|
||||
/// only feeds this in for an actively-open chat.
|
||||
void applyIncomingMessage(String token, GetChatResponseObject message) {
|
||||
_mutateRoom(token, (r) {
|
||||
final wasRead =
|
||||
r.unreadMessages == 0 && r.lastReadMessage >= message.id;
|
||||
final hasNewer = r.lastMessage.id >= message.id;
|
||||
if (wasRead && hasNewer) return false;
|
||||
r.unreadMessages = 0;
|
||||
r.unreadMention = false;
|
||||
r.unreadMentionDirect = false;
|
||||
if (message.id > r.lastReadMessage) r.lastReadMessage = message.id;
|
||||
if (message.id > r.lastMessage.id) r.lastMessage = message;
|
||||
if (message.timestamp > r.lastActivity) r.lastActivity = message.timestamp;
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
/// Mutates the room with [token] in-place via [mutator] (returning
|
||||
/// true if anything changed) and re-emits if so. Builds a fresh
|
||||
/// [GetRoomResponse] so equality-by-identity in the bloc state
|
||||
/// recognises the change and rebuilds.
|
||||
void _mutateRoom(
|
||||
String token,
|
||||
bool Function(GetRoomResponseObject room) mutator,
|
||||
) {
|
||||
final rooms = innerState?.rooms;
|
||||
if (rooms == null) return;
|
||||
var changed = false;
|
||||
final updated = rooms.data.map((r) {
|
||||
if (r.token != token) return r;
|
||||
if (mutator(r)) changed = true;
|
||||
return r;
|
||||
}).toSet();
|
||||
if (!changed) return;
|
||||
final newRooms = GetRoomResponse(updated)..headers = rooms.headers;
|
||||
add(Emit((s) => s.copyWith(rooms: newRooms)));
|
||||
_updateAppBadge(newRooms);
|
||||
}
|
||||
|
||||
void _updateAppBadge(GetRoomResponse rooms) {
|
||||
try {
|
||||
final unread = rooms.data.fold<int>(
|
||||
|
||||
Reference in New Issue
Block a user