From 6c6e7e3aa7935ec513b7e5ea9b53a92b741ecf92 Mon Sep 17 00:00:00 2001 From: fiaxh Date: Fri, 8 Jul 2022 16:33:40 +0200 Subject: Rewrite MAM logic and add MUC MAM --- libdino/CMakeLists.txt | 1 + libdino/src/entity/conversation.vala | 9 +- libdino/src/service/chat_interaction.vala | 2 +- libdino/src/service/conversation_manager.vala | 2 +- libdino/src/service/database.vala | 33 +- libdino/src/service/history_sync.vala | 557 ++++++++++++++++++++++++++ libdino/src/service/message_processor.vala | 276 +------------ libdino/src/service/module_manager.vala | 2 +- libdino/src/service/muc_manager.vala | 23 +- 9 files changed, 638 insertions(+), 267 deletions(-) create mode 100644 libdino/src/service/history_sync.vala (limited to 'libdino') diff --git a/libdino/CMakeLists.txt b/libdino/CMakeLists.txt index 20f5ffee..6c120346 100644 --- a/libdino/CMakeLists.txt +++ b/libdino/CMakeLists.txt @@ -42,6 +42,7 @@ SOURCES src/service/entity_info.vala src/service/file_manager.vala src/service/file_transfer_storage.vala + src/service/history_sync.vala src/service/jingle_file_transfers.vala src/service/message_correction.vala src/service/message_processor.vala diff --git a/libdino/src/entity/conversation.vala b/libdino/src/entity/conversation.vala index 800a28a2..9376dca9 100644 --- a/libdino/src/entity/conversation.vala +++ b/libdino/src/entity/conversation.vala @@ -22,6 +22,7 @@ public class Conversation : Object { public Jid counterpart { get; private set; } public string? nickname { get; set; } public bool active { get; set; default = false; } + public DateTime active_last_changed { get; private set; } private DateTime? _last_active; public DateTime? last_active { get { return _last_active; } @@ -63,6 +64,7 @@ public class Conversation : Object { if (type_ == Conversation.Type.GROUPCHAT_PM) counterpart = counterpart.with_resource(resource); nickname = type_ == Conversation.Type.GROUPCHAT ? resource : null; active = row[db.conversation.active]; + active_last_changed = new DateTime.from_unix_utc(row[db.conversation.active_last_changed]); int64? last_active = row[db.conversation.last_active]; if (last_active != null) this.last_active = new DateTime.from_unix_utc(last_active); encryption = (Encryption) row[db.conversation.encryption]; @@ -78,12 +80,15 @@ public class Conversation : Object { public void persist(Database db) { this.db = db; + this.active_last_changed = new DateTime.now_utc(); + var insert = db.conversation.insert() .value(db.conversation.account_id, account.id) .value(db.conversation.jid_id, db.get_jid_id(counterpart)) .value(db.conversation.type_, type_) .value(db.conversation.encryption, encryption) .value(db.conversation.active, active) + .value(db.conversation.active_last_changed, (long) active_last_changed.to_unix()) .value(db.conversation.notification, notify_setting) .value(db.conversation.send_typing, send_typing) .value(db.conversation.send_marker, send_marker); @@ -176,7 +181,9 @@ public class Conversation : Object { case "nickname": update.set(db.conversation.resource, nickname); break; case "active": - update.set(db.conversation.active, active); break; + update.set(db.conversation.active, active); + update.set(db.conversation.active_last_changed, (long) new DateTime.now_utc().to_unix()); + break; case "last-active": if (last_active != null) { update.set(db.conversation.last_active, (long) last_active.to_unix()); diff --git a/libdino/src/service/chat_interaction.vala b/libdino/src/service/chat_interaction.vala index 00c611db..1254a574 100644 --- a/libdino/src/service/chat_interaction.vala +++ b/libdino/src/service/chat_interaction.vala @@ -188,7 +188,7 @@ public class ChatInteraction : StreamInteractionModule, Object { } public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { - if (Xep.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null) return false; + if (Xmpp.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null) return false; ChatInteraction outer = stream_interactor.get_module(ChatInteraction.IDENTITY); outer.send_delivery_receipt(message, stanza, conversation); diff --git a/libdino/src/service/conversation_manager.vala b/libdino/src/service/conversation_manager.vala index 99cc9039..59ccbac4 100644 --- a/libdino/src/service/conversation_manager.vala +++ b/libdino/src/service/conversation_manager.vala @@ -176,7 +176,7 @@ public class ConversationManager : StreamInteractionModule, Object { conversation.last_active = message.time; if (stanza != null) { - bool is_mam_message = Xep.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null; + bool is_mam_message = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null; bool is_recent = message.time.compare(new DateTime.now_utc().add_days(-3)) > 0; if (is_mam_message && !is_recent) return false; } diff --git a/libdino/src/service/database.vala b/libdino/src/service/database.vala index 0300112a..25a6b477 100644 --- a/libdino/src/service/database.vala +++ b/libdino/src/service/database.vala @@ -7,7 +7,7 @@ using Dino.Entities; namespace Dino { public class Database : Qlite.Database { - private const int VERSION = 22; + private const int VERSION = 23; public class AccountTable : Table { public Column id = new Column.Integer("id") { primary_key = true, auto_increment = true }; @@ -193,6 +193,7 @@ public class Database : Qlite.Database { public Column jid_id = new Column.Integer("jid_id") { not_null = true }; public Column resource = new Column.Text("resource") { min_version=1 }; public Column active = new Column.BoolInt("active"); + public Column active_last_changed = new Column.Integer("active_last_changed") { not_null=true, default="0", min_version=23 }; public Column last_active = new Column.Long("last_active"); public Column type_ = new Column.Integer("type"); public Column encryption = new Column.Integer("encryption"); @@ -204,7 +205,7 @@ public class Database : Qlite.Database { internal ConversationTable(Database db) { base(db, "conversation"); - init({id, account_id, jid_id, resource, active, last_active, type_, encryption, read_up_to, read_up_to_item, notification, send_typing, send_marker}); + init({id, account_id, jid_id, resource, active, active_last_changed, last_active, type_, encryption, read_up_to, read_up_to_item, notification, send_typing, send_marker}); } } @@ -263,15 +264,16 @@ public class Database : Qlite.Database { public class MamCatchupTable : Table { public Column id = new Column.Integer("id") { primary_key = true, auto_increment = true }; public Column account_id = new Column.Integer("account_id") { not_null = true }; - public Column from_end = new Column.BoolInt("from_end"); - public Column from_id = new Column.Text("from_id"); + public Column server_jid = new Column.Text("server_jid") { not_null = true }; + public Column from_id = new Column.Text("from_id") { not_null = true }; public Column from_time = new Column.Long("from_time") { not_null = true }; - public Column to_id = new Column.Text("to_id"); + public Column from_end = new Column.BoolInt("from_end") { not_null = true }; + public Column to_id = new Column.Text("to_id") { not_null = true }; public Column to_time = new Column.Long("to_time") { not_null = true }; internal MamCatchupTable(Database db) { base(db, "mam_catchup"); - init({id, account_id, from_end, from_id, from_time, to_id, to_time}); + init({id, account_id, server_jid, from_end, from_id, from_time, to_id, to_time}); } } @@ -474,6 +476,25 @@ public class Database : Qlite.Database { // FROM call2"); // exec("DROP TABLE call2"); } + if (oldVersion < 23) { + try { + exec("ALTER TABLE mam_catchup RENAME TO mam_catchup2"); + mam_catchup.create_table_at_version(VERSION); + exec("""INSERT INTO mam_catchup (id, account_id, server_jid, from_id, from_time, from_end, to_id, to_time) + SELECT mam_catchup2.id, account_id, bare_jid, ifnull(from_id, ""), from_time, ifnull(from_end, 0), ifnull(to_id, ""), to_time + FROM mam_catchup2 JOIN account ON mam_catchup2.account_id=account.id"""); + exec("DROP TABLE mam_catchup2"); + } catch (Error e) { + error("Failed to upgrade to database version 23 (mam_catchup): %s", e.message); + } + + try { + long active_last_updated = (long) new DateTime.now_utc().to_unix(); + exec(@"UPDATE conversation SET active_last_changed=$active_last_updated WHERE active_last_changed=0"); + } catch (Error e) { + error("Failed to upgrade to database version 23 (conversation): %s", e.message); + } + } } public ArrayList get_accounts() { diff --git a/libdino/src/service/history_sync.vala b/libdino/src/service/history_sync.vala new file mode 100644 index 00000000..92a9e9e4 --- /dev/null +++ b/libdino/src/service/history_sync.vala @@ -0,0 +1,557 @@ +using Gee; + +using Xmpp; +using Xmpp.Xep; +using Dino.Entities; +using Qlite; + +public class Dino.HistorySync { + + private StreamInteractor stream_interactor; + private Database db; + + public HashMap> current_catchup_id = new HashMap>(Account.hash_func, Account.equals_func); + public HashMap> mam_times = new HashMap>(); + public HashMap hitted_range = new HashMap(); + + // Server ID of the latest message of the previous segment + public HashMap catchup_until_id = new HashMap(Account.hash_func, Account.equals_func); + // Time of the latest message of the previous segment + public HashMap catchup_until_time = new HashMap(Account.hash_func, Account.equals_func); + + private HashMap> stanzas = new HashMap>(); + + public class HistorySync(Database db, StreamInteractor stream_interactor) { + this.stream_interactor = stream_interactor; + this.db = db; + + stream_interactor.account_added.connect(on_account_added); + + stream_interactor.connection_manager.stream_opened.connect((account, stream) => { + debug("MAM: [%s] Reset catchup_id", account.bare_jid.to_string()); + current_catchup_id.unset(account); + }); + } + + public bool process(Account account, Xmpp.MessageStanza message_stanza) { + var mam_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message_stanza); + + if (mam_flag != null) { + process_mam_message(account, message_stanza, mam_flag); + return true; + } else { + update_latest_db_range(account, message_stanza); + return false; + } + } + + public void update_latest_db_range(Account account, Xmpp.MessageStanza message_stanza) { + Jid mam_server = stream_interactor.get_module(MucManager.IDENTITY).might_be_groupchat(message_stanza.from, account) ? message_stanza.from.bare_jid : account.bare_jid; + + if (!current_catchup_id.has_key(account) || !current_catchup_id[account].has_key(mam_server)) return; + + string? stanza_id = UniqueStableStanzaIDs.get_stanza_id(message_stanza, mam_server); + if (stanza_id == null) return; + + db.mam_catchup.update() + .with(db.mam_catchup.id, "=", current_catchup_id[account][mam_server]) + .set(db.mam_catchup.to_time, (long)new DateTime.now_utc().to_unix()) + .set(db.mam_catchup.to_id, stanza_id) + .perform(); + } + + public void process_mam_message(Account account, Xmpp.MessageStanza message_stanza, Xmpp.MessageArchiveManagement.MessageFlag mam_flag) { + Jid mam_server = mam_flag.sender_jid; + Jid message_author = message_stanza.from; + + // MUC servers may only send MAM messages from that MUC + bool is_muc_mam = stream_interactor.get_module(MucManager.IDENTITY).might_be_groupchat(mam_server, account) && + message_author.equals_bare(mam_server); + + bool from_our_server = mam_server.equals_bare(account.bare_jid); + + if (!is_muc_mam && !from_our_server) { + warning("Received alleged MAM message from %s, ignoring", mam_server.to_string()); + return; + } + + if (!stanzas.has_key(mam_flag.query_id)) stanzas[mam_flag.query_id] = new ArrayList(); + stanzas[mam_flag.query_id].add(message_stanza); + + print(@"[$(message_stanza.from)] qid $(mam_flag.query_id) time $(mam_flag.server_time) $(mam_flag.mam_id) $(message_stanza.body ?? "[none]")\n"); + } + + private void on_unprocessed_message(Account account, XmppStream stream, MessageStanza message) { + // Check that it's a legit MAM server + bool is_muc_mam = stream_interactor.get_module(MucManager.IDENTITY).might_be_groupchat(message.from, account); + bool from_our_server = message.from.equals_bare(account.bare_jid); + if (!is_muc_mam && !from_our_server) return; + + // Get the server time of the message and store it in `mam_times` + Xmpp.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xmpp.MessageArchiveManagement.Flag.IDENTITY) : null; + if (mam_flag == null) return; + string? id = message.stanza.get_deep_attribute(mam_flag.ns_ver + ":result", "id"); + if (id == null) return; + StanzaNode? delay_node = message.stanza.get_deep_subnode(mam_flag.ns_ver + ":result", StanzaForwarding.NS_URI + ":forwarded", DelayedDelivery.NS_URI + ":delay"); + if (delay_node == null) { + warning("MAM result did not contain delayed time %s", message.stanza.to_string()); + return; + } + DateTime? time = DelayedDelivery.get_time_for_node(delay_node); + if (time == null) return; + mam_times[account][id] = time; + + // Check if this is the target message + string? query_id = message.stanza.get_deep_attribute(mam_flag.ns_ver + ":result", mam_flag.ns_ver + ":queryid"); + if (query_id != null && id == catchup_until_id[account]) { + debug("MAM: [%s] Hitted range (id) %s", account.bare_jid.to_string(), id); + hitted_range[query_id] = -2; + } + } + + public void on_server_id_duplicate(Account account, Xmpp.MessageStanza message_stanza, Entities.Message message) { + Xmpp.MessageArchiveManagement.MessageFlag? mam_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message_stanza); + if (mam_flag == null) return; + +// debug(@"MAM: [%s] Hitted range duplicate server id. id %s qid %s", account.bare_jid.to_string(), message.server_id, mam_flag.query_id); + if (catchup_until_time.has_key(account) && mam_flag.server_time.compare(catchup_until_time[account]) < 0) { + hitted_range[mam_flag.query_id] = -1; +// debug(@"MAM: [%s] In range (time) %s < %s", account.bare_jid.to_string(), mam_flag.server_time.to_string(), catchup_until_time[account].to_string()); + } + } + + public async void fetch_everything(Account account, Jid mam_server, DateTime until_earliest_time = new DateTime.from_unix_utc(0)) { + print(@"Fetch everything for $(mam_server) %s\n".printf(until_earliest_time != null ? @"(until $until_earliest_time)" : "")); + RowOption latest_row_opt = db.mam_catchup.select() + .with(db.mam_catchup.account_id, "=", account.id) + .with(db.mam_catchup.server_jid, "=", mam_server.to_string()) + .with(db.mam_catchup.to_time, ">=", (long) until_earliest_time.to_unix()) + .order_by(db.mam_catchup.to_time, "DESC") + .single().row(); + Row? latest_row = latest_row_opt.is_present() ? latest_row_opt.inner : null; + + Row? new_row = yield fetch_latest_page(account, mam_server, latest_row, until_earliest_time); + + if (new_row != null) { + current_catchup_id[account][mam_server] = new_row[db.mam_catchup.id]; + } else if (latest_row != null) { + current_catchup_id[account][mam_server] = latest_row[db.mam_catchup.id]; + } + + // Set the previous and current row + print(@"$(new_row == null) $(latest_row == null)\n"); + Row? previous_row = null; + Row? current_row = null; + if (new_row != null) { + print(@"Fetch everything $(mam_server) a\n"); + current_row = new_row; + previous_row = latest_row; + } else if (latest_row != null) { + print(@"Fetch everything $(mam_server) b\n"); + current_row = latest_row; + RowOption previous_row_opt = db.mam_catchup.select() + .with(db.mam_catchup.account_id, "=", account.id) + .with(db.mam_catchup.server_jid, "=", mam_server.to_string()) + .with(db.mam_catchup.to_time, "<", current_row[db.mam_catchup.from_time]) + .with(db.mam_catchup.to_time, ">=", (long) until_earliest_time.to_unix()) + .order_by(db.mam_catchup.to_time, "DESC") + .single().row(); + previous_row = previous_row_opt.is_present() ? previous_row_opt.inner : null; + } + + print(@"Fetch everything $(mam_server) c $(current_row == null) $(previous_row == null)\n"); + // Fetch messages between two db ranges and merge them + while (current_row != null && previous_row != null) { + if (current_row[db.mam_catchup.from_end]) return; + + print("FETCH BETWEEN RANGES\n"); + current_row = yield fetch_between_ranges(account, mam_server, previous_row, current_row); + if (current_row == null) return; + + RowOption previous_row_opt = db.mam_catchup.select() + .with(db.mam_catchup.account_id, "=", account.id) + .with(db.mam_catchup.server_jid, "=", mam_server.to_string()) + .with(db.mam_catchup.to_time, "<", current_row[db.mam_catchup.from_time]) + .with(db.mam_catchup.to_time, ">=", (long) until_earliest_time.to_unix()) + .order_by(db.mam_catchup.to_time, "DESC") + .single().row(); + previous_row = previous_row_opt.is_present() ? previous_row_opt.inner : null; + } + + // We're at the earliest range. Try to expand it even further back. + if (current_row == null || current_row[db.mam_catchup.from_end]) return; + // We don't want to fetch before the earliest range over and over again in MUCs if it's after until_earliest_time. + // For now, don't query if we are within a week of until_earliest_time + if (until_earliest_time != null && + current_row[db.mam_catchup.from_time] > until_earliest_time.add(-TimeSpan.DAY * 7).to_unix()) return; + print("FETCH BEFORE RANGE\n"); + yield fetch_before_range(account, mam_server, current_row, until_earliest_time); + } + + // Fetches the latest page (up to previous db row). Extends the previous db row if it was reached, creates a new row otherwise. + public async Row? fetch_latest_page(Account account, Jid mam_server, Row? latest_row, DateTime? until_earliest_time) { + debug("MAM: [%s | %s] Fetching latest page", mam_server.to_string(), mam_server.to_string()); + + int latest_row_id = -1; + DateTime latest_message_time = until_earliest_time; + string? latest_message_id = null; + + if (latest_row != null) { + latest_row_id = latest_row[db.mam_catchup.id]; + latest_message_time = (new DateTime.from_unix_utc(latest_row[db.mam_catchup.to_time])).add_minutes(-5); + print(@"latest msg time $latest_message_time\n"); + latest_message_id = latest_row[db.mam_catchup.to_id]; + + // Make sure we only fetch to until_earliest_time if latest_message_time is further back + if (until_earliest_time != null && latest_message_time.compare(until_earliest_time) < 0) { + latest_message_time = until_earliest_time.add_minutes(-5); + latest_message_id = null; + } + } + + var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_latest(mam_server, latest_message_time, latest_message_id); + + PageRequestResult page_result = yield get_mam_page(account, query_params, null); + + if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Duplicate) { + debug("MAM [%s | %s] Failed fetching latest page %s", mam_server.to_string(), mam_server.to_string(), page_result.page_result.to_string()); + return null; + } + + print(@"MAM result: $(page_result.page_result))\n"); + + // Catchup finished within first page. Update latest db entry. + if (page_result.page_result in new PageResult[] { PageResult.TargetReached, PageResult.NoMoreMessages } && latest_row_id != -1) { + if (page_result.stanzas == null || page_result.stanzas.is_empty) return null; + + string first_mam_id = page_result.query_result.first; + long first_mam_time = (long) mam_times[account][first_mam_id].to_unix(); + + print(@"Updating $mam_server to $first_mam_time, $first_mam_id\n"); + var query = db.mam_catchup.update() + .with(db.mam_catchup.id, "=", latest_row_id) + .set(db.mam_catchup.to_time, first_mam_time) + .set(db.mam_catchup.to_id, first_mam_id); + + if (page_result.page_result == PageResult.NoMoreMessages) { + // If the server doesn't have more messages, store that this range is at its end. + query.set(db.mam_catchup.from_end, true); + } + query.perform(); + return null; + } + + if (page_result.query_result.first == null || page_result.query_result.last == null) { + print(@"from/to id null\n"); + return null; + } + + // Either we need to fetch more pages or this is the first db entry ever + debug("MAM: [%s | %s] Creating new db range for latest page", mam_server.to_string(), mam_server.to_string()); + + string from_id = page_result.query_result.first; + string to_id = page_result.query_result.last; + + if (!mam_times[account].has_key(from_id) || !mam_times[account].has_key(to_id)) { + print(@"Missing from/to id $from_id $to_id\n"); + return null; + } + + long from_time = (long) mam_times[account][from_id].to_unix(); + long to_time = (long) mam_times[account][to_id].to_unix(); + + int new_row_id = (int) db.mam_catchup.insert() + .value(db.mam_catchup.account_id, account.id) + .value(db.mam_catchup.server_jid, mam_server.to_string()) + .value(db.mam_catchup.from_id, from_id) + .value(db.mam_catchup.from_time, from_time) + .value(db.mam_catchup.from_end, false) + .value(db.mam_catchup.to_id, to_id) + .value(db.mam_catchup.to_time, to_time) + .perform(); + return db.mam_catchup.select().with(db.mam_catchup.id, "=", new_row_id).single().row().inner; + } + + /** Fetches messages between the end of `earlier_range` and start of `later_range` + ** Merges the `earlier_range` db row into the `later_range` db row. + ** @return The resulting range comprising `earlier_range`, `later_rage`, and everything in between. null if fetching/merge failed. + **/ + private async Row? fetch_between_ranges(Account account, Jid mam_server, Row earlier_range, Row later_range) { + int later_range_id = (int) later_range[db.mam_catchup.id]; + DateTime earliest_time = new DateTime.from_unix_utc(earlier_range[db.mam_catchup.to_time]); + DateTime latest_time = new DateTime.from_unix_utc(later_range[db.mam_catchup.from_time]); + debug("MAM [%s | %s] Fetching between %s (%s) and %s (%s)", mam_server.to_string(), mam_server.to_string(), earliest_time.to_string(), earlier_range[db.mam_catchup.to_id], latest_time.to_string(), later_range[db.mam_catchup.from_id]); + var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_between(mam_server, + earliest_time, earlier_range[db.mam_catchup.to_id], + latest_time, later_range[db.mam_catchup.from_id]); + + print("fetch between ranges\n"); + PageRequestResult page_result = yield fetch_query(account, query_params, later_range_id); + print(@"page result null? $(page_result == null)\n"); + + if (page_result.page_result == PageResult.TargetReached) { + debug("MAM [%s | %s] Merging range %i into %i", mam_server.to_string(), mam_server.to_string(), earlier_range[db.mam_catchup.id], later_range_id); + // Merge earlier range into later one. + db.mam_catchup.update() + .with(db.mam_catchup.id, "=", later_range_id) + .set(db.mam_catchup.from_time, earlier_range[db.mam_catchup.from_time]) + .set(db.mam_catchup.from_id, earlier_range[db.mam_catchup.from_id]) + .set(db.mam_catchup.from_end, earlier_range[db.mam_catchup.from_end]) + .perform(); + + db.mam_catchup.delete().with(db.mam_catchup.id, "=", earlier_range[db.mam_catchup.id]).perform(); + + // Return the updated version of the later range + return db.mam_catchup.select().with(db.mam_catchup.id, "=", later_range_id).single().row().inner; + } + + return null; + } + + private async void fetch_before_range(Account account, Jid mam_server, Row range, DateTime? until_earliest_time) { + DateTime latest_time = new DateTime.from_unix_utc(range[db.mam_catchup.from_time]); + string latest_id = range[db.mam_catchup.from_id]; + + Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params; + if (until_earliest_time == null) { + query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_before(mam_server, latest_time, latest_id); + } else { + query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_between( + mam_server, + until_earliest_time, null, + latest_time, latest_id + ); + } + PageRequestResult page_result = yield fetch_query(account, query_params, range[db.mam_catchup.id]); + } + + /** + * Iteratively fetches all pages returned for a query (until a PageResult other than MorePagesAvailable is returned) + * @return The last PageRequestResult result + **/ + private async PageRequestResult fetch_query(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int db_id) { + print("fetch query\n"); + PageRequestResult? page_result = null; + do { + page_result = yield get_mam_page(account, query_params, page_result); + print(@"page result $(page_result.page_result) $(page_result.stanzas == null)\n"); + + if (page_result.page_result == PageResult.Error || page_result.stanzas == null) return page_result; + + string last_mam_id = page_result.query_result.last; + long last_mam_time = (long)mam_times[account][last_mam_id].to_unix(); + + print(@"Updating $(query_params.mam_server) to $last_mam_time, $last_mam_id\n"); + var query = db.mam_catchup.update() + .with(db.mam_catchup.id, "=", db_id) + .set(db.mam_catchup.from_time, last_mam_time) + .set(db.mam_catchup.from_id, last_mam_id); + + if (page_result.page_result == PageResult.NoMoreMessages) { + // If the server doesn't have more messages, store that this range is at its end. + print("no more message\n"); + query.set(db.mam_catchup.from_end, true); + } + query.perform(); + } while (page_result.page_result == PageResult.MorePagesAvailable); + + print(@"page result 2 $(page_result.page_result)\n"); + return page_result; + } + + enum PageResult { + MorePagesAvailable, + TargetReached, + NoMoreMessages, + Duplicate, // TODO additional boolean + Error + } + + /** + * prev_page_result: null if this is the first page request + **/ + private async PageRequestResult get_mam_page(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, PageRequestResult? prev_page_result) { + XmppStream stream = stream_interactor.get_stream(account); + Xmpp.MessageArchiveManagement.QueryResult query_result = null; + if (prev_page_result == null) { + query_result = yield Xmpp.MessageArchiveManagement.V2.query_archive(stream, query_params); + } else { + query_result = yield Xmpp.MessageArchiveManagement.V2.page_through_results(stream, query_params, prev_page_result.query_result); + } + return yield process_query_result(account, query_result, query_params.query_id, query_params.start_id); + } + + private async PageRequestResult process_query_result(Account account, Xmpp.MessageArchiveManagement.QueryResult query_result, string query_id, string? after_id) { + PageResult page_result = PageResult.MorePagesAvailable; + + if (query_result.malformed || query_result.error) { + print(@"$(query_result.malformed) $(query_result.error)\n"); + page_result = PageResult.Error; + } + + // We wait until all the messages from the page are processed (and we got the `mam_times` from them) + Idle.add(process_query_result.callback, Priority.LOW); + yield; + + // We might have successfully reached the target or the server doesn't have all messages stored anymore + // If it's the former, we'll overwrite the value with PageResult.MorePagesAvailable below. + if (query_result.complete) { + page_result = PageResult.NoMoreMessages; + } + + string selection = null; + string[] selection_args = {}; + + // Check the server id of all returned messages. Check if we've hit our target (from_id) or got a duplicate. + if (stanzas.has_key(query_id) && !stanzas[query_id].is_empty) { + print(@"$(stanzas.has_key(query_id)) $(!stanzas[query_id].is_empty) looking for $(after_id ?? "")\n"); + foreach (Xmpp.MessageStanza message in stanzas[query_id]) { + Xmpp.MessageArchiveManagement.MessageFlag? mam_message_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message); + if (mam_message_flag != null && mam_message_flag.mam_id != null) { + if (after_id != null && mam_message_flag.mam_id == after_id) { + // Successfully fetched the whole range + page_result = PageResult.TargetReached; + } + + if (selection != null) selection += " OR "; + selection = @"$(db.message.server_id) = ?"; + } + } + + if (hitted_range.has_key(query_id)) { + // Message got filtered out by xmpp-vala, but succesfull range fetch nevertheless + page_result = PageResult.TargetReached; + } + + int64 duplicates_found = db.message.select().where(selection, selection_args).count(); + if (duplicates_found > 0) { + // We got a duplicate although we thought we have to catch up. + // There was a server bug where prosody would send all messages if it didn't know the after ID that was given + page_result = PageResult.Duplicate; + } + } + + var res = new PageRequestResult() { stanzas=stanzas[query_id], page_result=page_result, query_result=query_result }; + send_messages_back_into_pipeline(account, query_id); + return res; + } + + private void send_messages_back_into_pipeline(Account account, string query_id) { + print(@"send_messages_back_into_pipeline $query_id\n"); + if (!stanzas.has_key(query_id)) return; + + foreach (Xmpp.MessageStanza message in stanzas[query_id]) { + stream_interactor.get_module(MessageProcessor.IDENTITY).run_pipeline_announce.begin(account, message); + } + stanzas.unset(query_id); + print(@"send_messages_back_into_pipeline $query_id done\n"); + } + + private void on_account_added(Account account) { + cleanup_db_ranges(db, account); + + mam_times[account] = new HashMap(); + + XmppStream? stream_bak = null; + stream_interactor.module_manager.get_module(account, Xmpp.MessageArchiveManagement.Module.IDENTITY).feature_available.connect( (stream) => { + if (stream == stream_bak) return; + + current_catchup_id[account] = new HashMap(Jid.hash_func, Jid.equals_func); + stream_bak = stream; + debug("MAM: [%s] MAM available", account.bare_jid.to_string()); + fetch_everything.begin(account, account.bare_jid); + }); + + stream_interactor.module_manager.get_module(account, Xmpp.MessageModule.IDENTITY).received_message_unprocessed.connect((stream, message) => { + on_unprocessed_message(account, stream, message); + }); + } + + public static void cleanup_db_ranges(Database db, Account account) { + var ranges = new HashMap>(Jid.hash_func, Jid.equals_func); + foreach (Row row in db.mam_catchup.select().with(db.mam_catchup.account_id, "=", account.id)) { + var mam_range = new MamRange(); + mam_range.id = row[db.mam_catchup.id]; + mam_range.server_jid = new Jid(row[db.mam_catchup.server_jid]); + mam_range.from_time = row[db.mam_catchup.from_time]; + mam_range.from_id = row[db.mam_catchup.from_id]; + mam_range.from_end = row[db.mam_catchup.from_end]; + mam_range.to_time = row[db.mam_catchup.to_time]; + mam_range.to_id = row[db.mam_catchup.to_id]; + + if (!ranges.has_key(mam_range.server_jid)) ranges[mam_range.server_jid] = new ArrayList(); + ranges[mam_range.server_jid].add(mam_range); + } + + var to_delete = new ArrayList(); + + foreach (Jid server_jid in ranges.keys) { + foreach (var range1 in ranges[server_jid]) { + if (to_delete.contains(range1)) continue; + + foreach (MamRange range2 in ranges[server_jid]) { + print(@"$(account.bare_jid) | $(server_jid) | $(range1.from_time) - $(range1.to_time) vs $(range2.from_time) - $(range2.to_time)\n"); + if (range1 == range2 || to_delete.contains(range2)) continue; + + // Check if range2 is a subset of range1 + // range1: ##################### + // range2: ###### + if (range1.from_time <= range2.from_time && range1.to_time >= range2.to_time) { + critical(@"MAM: Removing db range which is a subset of another one"); + to_delete.add(range2); + continue; + } + + // Check if range2 is an extension of range1 (towards earlier) + // range1: ##################### + // range2: ############### + if (range1.from_time <= range2.from_time <= range1.to_time && range1.to_time < range2.to_time) { + critical(@"MAM: Removing db range that overlapped another one (towards earlier)"); + db.mam_catchup.update() + .with(db.mam_catchup.id, "=", range1.id) + .set(db.mam_catchup.from_id, range2.to_id) + .set(db.mam_catchup.from_time, range2.to_time) + .set(db.mam_catchup.from_end, range2.from_end) + .perform(); + to_delete.add(range2); + continue; + } + + // Check if range2 is an extension of range1 (towards more current) + // range1: ##################### + // range2: ############### + if (range1.from_time <= range2.from_time <= range1.to_time && range1.to_time < range2.to_time) { + critical(@"MAM: Removing db range that overlapped another one (towards more current)"); + db.mam_catchup.update() + .with(db.mam_catchup.id, "=", range1.id) + .set(db.mam_catchup.to_id, range2.to_id) + .set(db.mam_catchup.to_time, range2.to_time) + .perform(); + to_delete.add(range2); + continue; + } + } + } + } + + foreach (MamRange row in to_delete) { + db.mam_catchup.delete().with(db.mam_catchup.id, "=", row.id).perform(); + } + } + + class MamRange { + public int id; + public Jid server_jid; + public long from_time; + public string from_id; + public bool from_end; + public long to_time; + public string to_id; + } + + class PageRequestResult { + public Gee.List stanzas { get; set; } + public PageResult page_result { get; set; } + public Xmpp.MessageArchiveManagement.QueryResult query_result { get; set; } + } +} \ No newline at end of file diff --git a/libdino/src/service/message_processor.vala b/libdino/src/service/message_processor.vala index 6445ce40..bfecf340 100644 --- a/libdino/src/service/message_processor.vala +++ b/libdino/src/service/message_processor.vala @@ -18,15 +18,11 @@ public class MessageProcessor : StreamInteractionModule, Object { public signal void message_sent_or_received(Entities.Message message, Conversation conversation); public signal void history_synced(Account account); + public HistorySync history_sync; public MessageListenerHolder received_pipeline = new MessageListenerHolder(); private StreamInteractor stream_interactor; private Database db; - private HashMap current_catchup_id = new HashMap(Account.hash_func, Account.equals_func); - private HashMap> mam_times = new HashMap>(); - public HashMap hitted_range = new HashMap(); - public HashMap catchup_until_id = new HashMap(Account.hash_func, Account.equals_func); - public HashMap catchup_until_time = new HashMap(Account.hash_func, Account.equals_func); public static void start(StreamInteractor stream_interactor, Database db) { MessageProcessor m = new MessageProcessor(stream_interactor, db); @@ -36,6 +32,7 @@ public class MessageProcessor : StreamInteractionModule, Object { private MessageProcessor(StreamInteractor stream_interactor, Database db) { this.stream_interactor = stream_interactor; this.db = db; + this.history_sync = new HistorySync(db, stream_interactor); received_pipeline.connect(new DeduplicateMessageListener(this, db)); received_pipeline.connect(new FilterMessageListener()); @@ -47,11 +44,6 @@ public class MessageProcessor : StreamInteractionModule, Object { stream_interactor.stream_negotiated.connect(send_unsent_chat_messages); stream_interactor.stream_resumed.connect(send_unsent_chat_messages); - - stream_interactor.connection_manager.stream_opened.connect((account, stream) => { - debug("MAM: [%s] Reset catchup_id", account.bare_jid.to_string()); - current_catchup_id.unset(account); - }); } public Entities.Message send_text(string text, Conversation conversation) { @@ -106,43 +98,10 @@ public class MessageProcessor : StreamInteractionModule, Object { } private void on_account_added(Account account) { - mam_times[account] = new HashMap(); - stream_interactor.module_manager.get_module(account, Xmpp.MessageModule.IDENTITY).received_message.connect( (stream, message) => { on_message_received.begin(account, message); }); - XmppStream? stream_bak = null; - stream_interactor.module_manager.get_module(account, Xmpp.Xep.MessageArchiveManagement.Module.IDENTITY).feature_available.connect( (stream) => { - if (stream == stream_bak) return; - - current_catchup_id.unset(account); - stream_bak = stream; - debug("MAM: [%s] MAM available", account.bare_jid.to_string()); - do_mam_catchup.begin(account); - }); - stream_interactor.module_manager.get_module(account, Xmpp.MessageModule.IDENTITY).received_message_unprocessed.connect((stream, message) => { - if (!message.from.equals(account.bare_jid)) return; - - Xep.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xep.MessageArchiveManagement.Flag.IDENTITY) : null; - if (mam_flag == null) return; - string? id = message.stanza.get_deep_attribute(mam_flag.ns_ver + ":result", "id"); - if (id == null) return; - StanzaNode? delay_node = message.stanza.get_deep_subnode(mam_flag.ns_ver + ":result", "urn:xmpp:forward:0:forwarded", "urn:xmpp:delay:delay"); - if (delay_node == null) { - warning("MAM result did not contain delayed time %s", message.stanza.to_string()); - return; - } - DateTime? time = DelayedDelivery.get_time_for_node(delay_node); - if (time == null) return; - mam_times[account][id] = time; - - string? query_id = message.stanza.get_deep_attribute(mam_flag.ns_ver + ":result", mam_flag.ns_ver + ":queryid"); - if (query_id != null && id == catchup_until_id[account]) { - debug("MAM: [%s] Hitted range (id) %s", account.bare_jid.to_string(), id); - hitted_range[query_id] = -2; - } - }); stream_interactor.module_manager.get_module(account, Xmpp.MessageModule.IDENTITY).received_error.connect((stream, message_stanza, error_stanza) => { Message? message = null; @@ -164,203 +123,20 @@ public class MessageProcessor : StreamInteractionModule, Object { convert_sending_to_unsent_msgs(account); } - private async void do_mam_catchup(Account account) { - debug("MAM: [%s] Start catchup", account.bare_jid.to_string()); - string? earliest_id = null; - DateTime? earliest_time = null; - bool continue_sync = true; - - while (continue_sync) { - continue_sync = false; - - // Get previous row - var previous_qry = db.mam_catchup.select().with(db.mam_catchup.account_id, "=", account.id).order_by(db.mam_catchup.to_time, "DESC"); - if (current_catchup_id.has_key(account)) { - previous_qry.with(db.mam_catchup.id, "!=", current_catchup_id[account]); - } - RowOption previous_row = previous_qry.single().row(); - if (previous_row.is_present()) { - catchup_until_id[account] = previous_row[db.mam_catchup.to_id]; - catchup_until_time[account] = (new DateTime.from_unix_utc(previous_row[db.mam_catchup.to_time])).add_minutes(-5); - debug("MAM: [%s] Previous entry exists", account.bare_jid.to_string()); - } else { - catchup_until_id.unset(account); - catchup_until_time.unset(account); - } - - string query_id = Xmpp.random_uuid(); - yield get_mam_range(account, query_id, null, null, earliest_time, earliest_id); - - if (!hitted_range.has_key(query_id)) { - debug("MAM: [%s] Set catchup end reached", account.bare_jid.to_string()); - db.mam_catchup.update() - .set(db.mam_catchup.from_end, true) - .with(db.mam_catchup.id, "=", current_catchup_id[account]) - .perform(); - } - - if (hitted_range.has_key(query_id)) { - if (merge_ranges(account, null)) { - RowOption current_row = db.mam_catchup.row_with(db.mam_catchup.id, current_catchup_id[account]); - bool range_from_complete = current_row[db.mam_catchup.from_end]; - if (!range_from_complete) { - continue_sync = true; - earliest_id = current_row[db.mam_catchup.from_id]; - earliest_time = (new DateTime.from_unix_utc(current_row[db.mam_catchup.from_time])).add_seconds(1); - } - } - } - } - } - - /* - * Merges the row with `current_catchup_id` with the previous range (optional: with `earlier_id`) - * Changes `current_catchup_id` to the previous range - */ - private bool merge_ranges(Account account, int? earlier_id) { - RowOption current_row = db.mam_catchup.row_with(db.mam_catchup.id, current_catchup_id[account]); - RowOption previous_row = null; - - if (earlier_id != null) { - previous_row = db.mam_catchup.row_with(db.mam_catchup.id, earlier_id); - } else { - previous_row = db.mam_catchup.select() - .with(db.mam_catchup.account_id, "=", account.id) - .with(db.mam_catchup.id, "!=", current_catchup_id[account]) - .order_by(db.mam_catchup.to_time, "DESC").single().row(); - } - - if (!previous_row.is_present()) { - debug("MAM: [%s] Merging: No previous row", account.bare_jid.to_string()); - return false; - } - - var qry = db.mam_catchup.update().with(db.mam_catchup.id, "=", previous_row[db.mam_catchup.id]); - debug("MAM: [%s] Merging %ld-%ld with %ld- %ld", account.bare_jid.to_string(), previous_row[db.mam_catchup.from_time], previous_row[db.mam_catchup.to_time], current_row[db.mam_catchup.from_time], current_row[db.mam_catchup.to_time]); - if (current_row[db.mam_catchup.from_time] < previous_row[db.mam_catchup.from_time]) { - qry.set(db.mam_catchup.from_id, current_row[db.mam_catchup.from_id]) - .set(db.mam_catchup.from_time, current_row[db.mam_catchup.from_time]); - } - if (current_row[db.mam_catchup.to_time] > previous_row[db.mam_catchup.to_time]) { - qry.set(db.mam_catchup.to_id, current_row[db.mam_catchup.to_id]) - .set(db.mam_catchup.to_time, current_row[db.mam_catchup.to_time]); - } - qry.perform(); - - current_catchup_id[account] = previous_row[db.mam_catchup.id]; - - db.mam_catchup.delete().with(db.mam_catchup.id, "=", current_row[db.mam_catchup.id]).perform(); - - return true; - } - - private async bool get_mam_range(Account account, string? query_id, DateTime? from_time, string? from_id, DateTime? to_time, string? to_id) { - debug("MAM: [%s] Get range %s - %s", account.bare_jid.to_string(), from_time != null ? from_time.to_string() : "", to_time != null ? to_time.to_string() : ""); - XmppStream stream = stream_interactor.get_stream(account); - - Iq.Stanza? iq = yield stream.get_module(Xep.MessageArchiveManagement.Module.IDENTITY).query_archive(stream, null, query_id, from_time, from_id, to_time, to_id); - - if (iq == null) { - debug(@"MAM: [%s] IQ null", account.bare_jid.to_string()); - return true; - } - - if (iq.stanza.get_deep_string_content("urn:xmpp:mam:2:fin", "http://jabber.org/protocol/rsm" + ":set", "first") == null) { - return true; - } - - while (iq != null) { - string? earliest_id = iq.stanza.get_deep_string_content("urn:xmpp:mam:2:fin", "http://jabber.org/protocol/rsm" + ":set", "first"); - if (earliest_id == null) return true; - string? latest_id = iq.stanza.get_deep_string_content("urn:xmpp:mam:2:fin", "http://jabber.org/protocol/rsm" + ":set", "last"); - - // We wait until all the messages from the page are processed (and we got the `mam_times` from them) - Idle.add(get_mam_range.callback, Priority.LOW); - yield; - - int wait_ms = 1000; - - - if (mam_times[account].has_key(earliest_id) && (current_catchup_id.has_key(account) || mam_times[account].has_key(latest_id))) { - - debug("MAM: [%s] Update from_id %s", account.bare_jid.to_string(), earliest_id); - if (!current_catchup_id.has_key(account)) { - debug("MAM: [%s] We get our first MAM page", account.bare_jid.to_string()); - current_catchup_id[account] = (int) db.mam_catchup.insert() - .value(db.mam_catchup.account_id, account.id) - .value(db.mam_catchup.from_id, earliest_id) - .value(db.mam_catchup.from_time, (long)mam_times[account][earliest_id].to_unix()) - .value(db.mam_catchup.to_id, latest_id) - .value(db.mam_catchup.to_time, (long)mam_times[account][latest_id].to_unix()) - .perform(); - } else { - // Update existing id - db.mam_catchup.update() - .set(db.mam_catchup.from_id, earliest_id) - .set(db.mam_catchup.from_time, (long)mam_times[account][earliest_id].to_unix()) - .with(db.mam_catchup.id, "=", current_catchup_id[account]) - .perform(); - } - - TimeSpan catchup_time_ago = (new DateTime.now_utc()).difference(mam_times[account][earliest_id]); - - if (catchup_time_ago > 14 * TimeSpan.DAY) { - wait_ms = 2000; - } else if (catchup_time_ago > 5 * TimeSpan.DAY) { - wait_ms = 1000; - } else if (catchup_time_ago > 2 * TimeSpan.DAY) { - wait_ms = 200; - } else if (catchup_time_ago > TimeSpan.DAY) { - wait_ms = 50; - } else { - wait_ms = 10; - } - } else { - warning("Didn't have time for MAM id; earliest_id:%s latest_id:%s", mam_times[account].has_key(earliest_id).to_string(), mam_times[account].has_key(latest_id).to_string()); - } - - mam_times[account] = new HashMap(); + private async void on_message_received(Account account, Xmpp.MessageStanza message_stanza) { - Timeout.add(wait_ms, () => { - if (hitted_range.has_key(query_id)) { - debug(@"MAM: [%s] Hitted contains key %s", account.bare_jid.to_string(), query_id); - iq = null; - Idle.add(get_mam_range.callback); - return false; - } + // If it's a message from MAM, it's going to be processed by HistorySync which calls run_pipeline_announce later. + if (history_sync.process(account, message_stanza)) return; - stream.get_module(Xep.MessageArchiveManagement.Module.IDENTITY).page_through_results.begin(stream, null, query_id, from_time, to_time, iq, (_, res) => { - iq = stream.get_module(Xep.MessageArchiveManagement.Module.IDENTITY).page_through_results.end(res); - Idle.add(get_mam_range.callback); - }); - return false; - }); - yield; - } - return false; + run_pipeline_announce(account, message_stanza); } - private async void on_message_received(Account account, Xmpp.MessageStanza message_stanza) { + public async void run_pipeline_announce(Account account, Xmpp.MessageStanza message_stanza) { Entities.Message message = yield parse_message_stanza(account, message_stanza); Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation_for_message(message); if (conversation == null) return; - // MAM state database update - Xep.MessageArchiveManagement.MessageFlag? mam_flag = Xep.MessageArchiveManagement.MessageFlag.get_flag(message_stanza); - if (mam_flag == null) { - if (current_catchup_id.has_key(account)) { - string? stanza_id = UniqueStableStanzaIDs.get_stanza_id(message_stanza, account.bare_jid); - if (stanza_id != null) { - db.mam_catchup.update() - .with(db.mam_catchup.id, "=", current_catchup_id[account]) - .set(db.mam_catchup.to_time, (long)message.local_time.to_unix()) - .set(db.mam_catchup.to_id, stanza_id) - .perform(); - } - } - } - bool abort = yield received_pipeline.run(message, message_stanza, conversation); if (abort) return; @@ -373,7 +149,7 @@ public class MessageProcessor : StreamInteractionModule, Object { message_sent_or_received(message, conversation); } - private async Entities.Message parse_message_stanza(Account account, Xmpp.MessageStanza message) { + public async Entities.Message parse_message_stanza(Account account, Xmpp.MessageStanza message) { string? body = message.body; if (body != null) body = body.strip(); Entities.Message new_message = new Entities.Message(body); @@ -393,20 +169,20 @@ public class MessageProcessor : StreamInteractionModule, Object { new_message.ourpart = new_message.direction == Entities.Message.DIRECTION_SENT ? message.from : message.to; XmppStream? stream = stream_interactor.get_stream(account); - Xep.MessageArchiveManagement.MessageFlag? mam_message_flag = Xep.MessageArchiveManagement.MessageFlag.get_flag(message); - Xep.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xep.MessageArchiveManagement.Flag.IDENTITY) : null; + Xmpp.MessageArchiveManagement.MessageFlag? mam_message_flag = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(message); + Xmpp.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xmpp.MessageArchiveManagement.Flag.IDENTITY) : null; EntityInfo entity_info = stream_interactor.get_module(EntityInfo.IDENTITY); - if (mam_message_flag != null && mam_flag != null && mam_flag.ns_ver == Xep.MessageArchiveManagement.NS_URI_2 && mam_message_flag.mam_id != null) { + if (mam_message_flag != null && mam_flag != null && mam_flag.ns_ver == Xmpp.MessageArchiveManagement.NS_URI_2 && mam_message_flag.mam_id != null) { new_message.server_id = mam_message_flag.mam_id; } else if (message.type_ == Xmpp.MessageStanza.TYPE_GROUPCHAT) { bool server_supports_sid = (yield entity_info.has_feature(account, new_message.counterpart.bare_jid, Xep.UniqueStableStanzaIDs.NS_URI)) || - (yield entity_info.has_feature(account, new_message.counterpart.bare_jid, Xep.MessageArchiveManagement.NS_URI_2)); + (yield entity_info.has_feature(account, new_message.counterpart.bare_jid, Xmpp.MessageArchiveManagement.NS_URI_2)); if (server_supports_sid) { new_message.server_id = Xep.UniqueStableStanzaIDs.get_stanza_id(message, new_message.counterpart.bare_jid); } } else if (message.type_ == Xmpp.MessageStanza.TYPE_CHAT) { bool server_supports_sid = (yield entity_info.has_feature(account, account.bare_jid, Xep.UniqueStableStanzaIDs.NS_URI)) || - (yield entity_info.has_feature(account, account.bare_jid, Xep.MessageArchiveManagement.NS_URI_2)); + (yield entity_info.has_feature(account, account.bare_jid, Xmpp.MessageArchiveManagement.NS_URI_2)); if (server_supports_sid) { new_message.server_id = Xep.UniqueStableStanzaIDs.get_stanza_id(message, account.bare_jid); } @@ -474,7 +250,6 @@ public class MessageProcessor : StreamInteractionModule, Object { public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { Account account = conversation.account; - Xep.MessageArchiveManagement.MessageFlag? mam_flag = Xep.MessageArchiveManagement.MessageFlag.get_flag(stanza); // Deduplicate by server_id if (message.server_id != null) { @@ -482,16 +257,12 @@ public class MessageProcessor : StreamInteractionModule, Object { .with(db.message.server_id, "=", message.server_id) .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart)) .with(db.message.account_id, "=", account.id); - bool duplicate = builder.count() > 0; - if (duplicate && mam_flag != null) { - debug(@"MAM: [%s] Hitted range duplicate server id. id %s qid %s", account.bare_jid.to_string(), message.server_id, mam_flag.query_id); - if (outer.catchup_until_time.has_key(account) && mam_flag.server_time.compare(outer.catchup_until_time[account]) < 0) { - outer.hitted_range[mam_flag.query_id] = -1; - debug(@"MAM: [%s] In range (time) %s < %s", account.bare_jid.to_string(), mam_flag.server_time.to_string(), outer.catchup_until_time[account].to_string()); - } + // If the message is a duplicate + if (builder.count() > 0) { + outer.history_sync.on_server_id_duplicate(account, stanza, message); + return true; } - if (duplicate) return true; } // Deduplicate messages by uuid @@ -514,14 +285,7 @@ public class MessageProcessor : StreamInteractionModule, Object { builder.with_null(db.message.our_resource); } } - RowOption row_opt = builder.single().row(); - bool duplicate = row_opt.is_present(); - - if (duplicate && mam_flag != null && row_opt[db.message.server_id] == null && - outer.catchup_until_time.has_key(account) && mam_flag.server_time.compare(outer.catchup_until_time[account]) > 0) { - outer.hitted_range[mam_flag.query_id] = -1; - debug(@"MAM: [%s] Hitted range duplicate message id. id %s qid %s", account.bare_jid.to_string(), message.stanza_id, mam_flag.query_id); - } + bool duplicate = builder.single().row().is_present(); return duplicate; } @@ -608,9 +372,9 @@ public class MessageProcessor : StreamInteractionModule, Object { } public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { - bool is_mam_message = Xep.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null; + bool is_mam_message = Xmpp.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null; XmppStream? stream = stream_interactor.get_stream(conversation.account); - Xep.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xep.MessageArchiveManagement.Flag.IDENTITY) : null; + Xmpp.MessageArchiveManagement.Flag? mam_flag = stream != null ? stream.get_flag(Xmpp.MessageArchiveManagement.Flag.IDENTITY) : null; if (is_mam_message || (mam_flag != null && mam_flag.cought_up == true)) { conversation.account.mam_earliest_synced = message.local_time; } diff --git a/libdino/src/service/module_manager.vala b/libdino/src/service/module_manager.vala index b54b1a1e..fc01a687 100644 --- a/libdino/src/service/module_manager.vala +++ b/libdino/src/service/module_manager.vala @@ -57,7 +57,7 @@ public class ModuleManager { module_map[account].add(new Xep.Bookmarks2.Module()); module_map[account].add(new Presence.Module()); module_map[account].add(new Xmpp.MessageModule()); - module_map[account].add(new Xep.MessageArchiveManagement.Module()); + module_map[account].add(new Xmpp.MessageArchiveManagement.Module()); module_map[account].add(new Xep.MessageCarbons.Module()); module_map[account].add(new Xep.Muc.Module()); module_map[account].add(new Xep.Pubsub.Module()); diff --git a/libdino/src/service/muc_manager.vala b/libdino/src/service/muc_manager.vala index 5cfe5528..17787387 100644 --- a/libdino/src/service/muc_manager.vala +++ b/libdino/src/service/muc_manager.vala @@ -68,6 +68,15 @@ public class MucManager : StreamInteractionModule, Object { if (last_message != null) history_since = last_message.time; } + bool receive_history = true; + EntityInfo entity_info = stream_interactor.get_module(EntityInfo.IDENTITY); + bool can_do_mam = yield entity_info.has_feature(account, jid, Xmpp.MessageArchiveManagement.NS_URI_2); + print(@"$(jid) $can_do_mam\n"); + if (can_do_mam) { + receive_history = false; + history_since = null; + } + if (!mucs_joining.has_key(account)) { mucs_joining[account] = new HashSet(Jid.hash_bare_func, Jid.equals_bare_func); } @@ -78,7 +87,7 @@ public class MucManager : StreamInteractionModule, Object { } mucs_todo[account].add(jid.with_resource(nick_)); - Muc.JoinResult? res = yield stream.get_module(Xep.Muc.Module.IDENTITY).enter(stream, jid.bare_jid, nick_, password, history_since, null); + Muc.JoinResult? res = yield stream.get_module(Xep.Muc.Module.IDENTITY).enter(stream, jid.bare_jid, nick_, password, history_since, receive_history, null); mucs_joining[account].remove(jid); @@ -91,6 +100,18 @@ public class MucManager : StreamInteractionModule, Object { Conversation joined_conversation = stream_interactor.get_module(ConversationManager.IDENTITY).create_conversation(jid, account, Conversation.Type.GROUPCHAT); joined_conversation.nickname = nick; stream_interactor.get_module(ConversationManager.IDENTITY).start_conversation(joined_conversation); + + if (can_do_mam) { + if (conversation == null) { + // We never joined the conversation before, just fetch the latest MAM page + yield stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync + .fetch_latest_page(account, jid.bare_jid, null, new DateTime.from_unix_utc(0)); + } else { + // Fetch everything up to the last time the user actively joined + stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync + .fetch_everything.begin(account, jid.bare_jid, conversation.active_last_changed); + } + } } else if (res.muc_error != null) { // Join failed enter_errors[jid] = res.muc_error; -- cgit v1.2.3-70-g09d2