From d76e12b215eb62e4eda5a0f92fbf5c1bd7c1848e Mon Sep 17 00:00:00 2001 From: Marvin W Date: Tue, 31 Jan 2023 15:13:12 +0100 Subject: Add priority for and allow cancellation of outgoing stanzas --- libdino/src/service/history_sync.vala | 22 +++++++++++----------- libdino/src/service/muc_manager.vala | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) (limited to 'libdino/src/service') diff --git a/libdino/src/service/history_sync.vala b/libdino/src/service/history_sync.vala index c7bfee88..d9ecd41b 100644 --- a/libdino/src/service/history_sync.vala +++ b/libdino/src/service/history_sync.vala @@ -118,7 +118,7 @@ public class Dino.HistorySync { } } - public async void fetch_everything(Account account, Jid mam_server, DateTime until_earliest_time = new DateTime.from_unix_utc(0)) { + public async void fetch_everything(Account account, Jid mam_server, Cancellable? cancellable = null, DateTime until_earliest_time = new DateTime.from_unix_utc(0)) { debug("Fetch everything for %s %s", mam_server.to_string(), until_earliest_time != null ? @"(until $until_earliest_time)" : ""); RowOption latest_row_opt = db.mam_catchup.select() .with(db.mam_catchup.account_id, "=", account.id) @@ -128,7 +128,7 @@ public class Dino.HistorySync { .single().row(); Row? latest_row = latest_row_opt.is_present() ? latest_row_opt.inner : null; - Row? new_row = yield fetch_latest_page(account, mam_server, latest_row, until_earliest_time); + Row? new_row = yield fetch_latest_page(account, mam_server, latest_row, until_earliest_time, cancellable); if (new_row != null) { current_catchup_id[account][mam_server] = new_row[db.mam_catchup.id]; @@ -182,7 +182,7 @@ public class Dino.HistorySync { } // Fetches the latest page (up to previous db row). Extends the previous db row if it was reached, creates a new row otherwise. - public async Row? fetch_latest_page(Account account, Jid mam_server, Row? latest_row, DateTime? until_earliest_time) { + public async Row? fetch_latest_page(Account account, Jid mam_server, Row? latest_row, DateTime? until_earliest_time, Cancellable? cancellable = null) { debug("[%s | %s] Fetching latest page", account.bare_jid.to_string(), mam_server.to_string()); int latest_row_id = -1; @@ -203,7 +203,7 @@ public class Dino.HistorySync { var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_latest(mam_server, latest_message_time, latest_message_id); - PageRequestResult page_result = yield get_mam_page(account, query_params, null); + PageRequestResult page_result = yield get_mam_page(account, query_params, null, cancellable); debug("[%s | %s] Latest page result: %s", account.bare_jid.to_string(), mam_server.to_string(), page_result.page_result.to_string()); if (page_result.page_result == PageResult.Error) { @@ -299,7 +299,7 @@ public class Dino.HistorySync { return null; } - private async void fetch_before_range(Account account, Jid mam_server, Row range, DateTime? until_earliest_time) { + private async void fetch_before_range(Account account, Jid mam_server, Row range, DateTime? until_earliest_time, Cancellable? cancellable = null) { DateTime latest_time = new DateTime.from_unix_utc(range[db.mam_catchup.from_time]); string latest_id = range[db.mam_catchup.from_id]; debug("[%s | %s] Fetching before range < %s, %s", account.bare_jid.to_string(), mam_server.to_string(), latest_time.to_string(), latest_id); @@ -314,18 +314,18 @@ public class Dino.HistorySync { latest_time, latest_id ); } - yield fetch_query(account, query_params, range[db.mam_catchup.id]); + yield fetch_query(account, query_params, range[db.mam_catchup.id], cancellable); } /** * Iteratively fetches all pages returned for a query (until a PageResult other than MorePagesAvailable is returned) * @return The last PageRequestResult result **/ - private async PageRequestResult fetch_query(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int db_id) { + private async PageRequestResult fetch_query(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int db_id, Cancellable? cancellable = null) { debug("[%s | %s] Fetch query %s - %s", account.bare_jid.to_string(), query_params.mam_server.to_string(), query_params.start != null ? query_params.start.to_string() : "", query_params.end != null ? query_params.end.to_string() : ""); PageRequestResult? page_result = null; do { - page_result = yield get_mam_page(account, query_params, page_result); + page_result = yield get_mam_page(account, query_params, page_result, cancellable); debug("Page result %s %b", page_result.page_result.to_string(), page_result.stanzas == null); if (page_result.page_result == PageResult.Error || page_result.stanzas == null) return page_result; @@ -360,7 +360,7 @@ public class Dino.HistorySync { /** * prev_page_result: null if this is the first page request **/ - private async PageRequestResult get_mam_page(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, PageRequestResult? prev_page_result) { + private async PageRequestResult get_mam_page(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, PageRequestResult? prev_page_result, Cancellable? cancellable = null) { XmppStream stream = stream_interactor.get_stream(account); Xmpp.MessageArchiveManagement.QueryResult query_result = null; if (prev_page_result == null) { @@ -368,10 +368,10 @@ public class Dino.HistorySync { } else { query_result = yield Xmpp.MessageArchiveManagement.V2.page_through_results(stream, query_params, prev_page_result.query_result); } - return yield process_query_result(account, query_params, query_result); + return yield process_query_result(account, query_params, query_result, cancellable); } - private async PageRequestResult process_query_result(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, Xmpp.MessageArchiveManagement.QueryResult query_result) { + private async PageRequestResult process_query_result(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, Xmpp.MessageArchiveManagement.QueryResult query_result, Cancellable? cancellable = null) { PageResult page_result = PageResult.MorePagesAvailable; if (query_result.malformed || query_result.error) { diff --git a/libdino/src/service/muc_manager.vala b/libdino/src/service/muc_manager.vala index ff6ac941..f95e10f2 100644 --- a/libdino/src/service/muc_manager.vala +++ b/libdino/src/service/muc_manager.vala @@ -109,7 +109,7 @@ public class MucManager : StreamInteractionModule, Object { } else { // Fetch everything up to the last time the user actively joined stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync - .fetch_everything.begin(account, jid.bare_jid, conversation.active_last_changed); + .fetch_everything.begin(account, jid.bare_jid, null, conversation.active_last_changed); } } } else if (res.muc_error != null) { -- cgit v1.2.3-70-g09d2