From 871ff33ac79f3d17b0260b8bfcd27780038edd6d Mon Sep 17 00:00:00 2001 From: fiaxh Date: Fri, 3 Apr 2020 22:49:59 +0200 Subject: Add support for last message correction --- libdino/src/service/content_item_store.vala | 12 +- libdino/src/service/database.vala | 23 +++- libdino/src/service/message_correction.vala | 175 ++++++++++++++++++++++++++++ libdino/src/service/message_processor.vala | 21 ++++ libdino/src/service/message_storage.vala | 1 - libdino/src/service/module_manager.vala | 1 + 6 files changed, 226 insertions(+), 7 deletions(-) create mode 100644 libdino/src/service/message_correction.vala (limited to 'libdino/src/service') diff --git a/libdino/src/service/content_item_store.vala b/libdino/src/service/content_item_store.vala index 1e2ee85a..1ea0275e 100644 --- a/libdino/src/service/content_item_store.vala +++ b/libdino/src/service/content_item_store.vala @@ -45,9 +45,13 @@ public class ContentItemStore : StreamInteractionModule, Object { foreach (var row in select) { int provider = row[db.content_item.content_type]; int foreign_id = row[db.content_item.foreign_id]; + DateTime time = new DateTime.from_unix_utc(row[db.content_item.time]); + DateTime local_time = new DateTime.from_unix_utc(row[db.content_item.local_time]); switch (provider) { case 1: - RowOption row_option = db.message.select().with(db.message.id, "=", foreign_id).row(); + RowOption row_option = db.message.select().with(db.message.id, "=", foreign_id) + .outer_join_with(db.message_correction, db.message_correction.message_id, db.message.id) + .row(); if (row_option.is_present()) { Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(foreign_id, conversation); if (message == null) { @@ -58,7 +62,10 @@ public class ContentItemStore : StreamInteractionModule, Object { } } if (message != null) { - items.add(new MessageItem(message, conversation, row[db.content_item.id])); + var message_item = new MessageItem(message, conversation, row[db.content_item.id]); + message_item.display_time = time; + message_item.sort_time = local_time; + items.add(message_item); } } break; @@ -259,6 +266,7 @@ public class MessageItem : ContentItem { public MessageItem(Message message, Conversation conversation, int id) { base(id, TYPE, message.from, message.local_time, message.time, message.encryption, message.marked); + this.message = message; this.conversation = conversation; diff --git a/libdino/src/service/database.vala b/libdino/src/service/database.vala index ebf05637..4dfcb5b4 100644 --- a/libdino/src/service/database.vala +++ b/libdino/src/service/database.vala @@ -7,7 +7,7 @@ using Dino.Entities; namespace Dino { public class Database : Qlite.Database { - private const int VERSION = 13; + private const int VERSION = 14; public class AccountTable : Table { public Column id = new Column.Integer("id") { primary_key = true, auto_increment = true }; @@ -97,6 +97,18 @@ public class Database : Qlite.Database { } } + public class MessageCorrectionTable : Table { + public Column id = new Column.Integer("id") { primary_key = true, auto_increment = true }; + public Column message_id = new Column.Integer("message_id") { unique=true }; + public Column to_stanza_id = new Column.Text("to_stanza_id"); + + internal MessageCorrectionTable(Database db) { + base(db, "message_correction"); + init({id, message_id, to_stanza_id}); + index("message_correction_to_stanza_id_idx", {to_stanza_id}); + } + } + public class RealJidTable : Table { public Column message_id = new Column.Integer("message_id") { primary_key = true }; public Column real_jid = new Column.Text("real_jid"); @@ -247,6 +259,7 @@ public class Database : Qlite.Database { public EntityTable entity { get; private set; } public ContentItemTable content_item { get; private set; } public MessageTable message { get; private set; } + public MessageCorrectionTable message_correction { get; private set; } public RealJidTable real_jid { get; private set; } public FileTransferTable file_transfer { get; private set; } public ConversationTable conversation { get; private set; } @@ -268,6 +281,7 @@ public class Database : Qlite.Database { entity = new EntityTable(this); content_item = new ContentItemTable(this); message = new MessageTable(this); + message_correction = new MessageCorrectionTable(this); real_jid = new RealJidTable(this); file_transfer = new FileTransferTable(this); conversation = new ConversationTable(this); @@ -277,7 +291,7 @@ public class Database : Qlite.Database { roster = new RosterTable(this); mam_catchup = new MamCatchupTable(this); settings = new SettingsTable(this); - init({ account, jid, entity, content_item, message, real_jid, file_transfer, conversation, avatar, entity_identity, entity_feature, roster, mam_catchup, settings }); + init({ account, jid, entity, content_item, message, message_correction, real_jid, file_transfer, conversation, avatar, entity_identity, entity_feature, roster, mam_catchup, settings }); try { exec("PRAGMA synchronous=0"); } catch (Error e) { } @@ -401,14 +415,14 @@ public class Database : Qlite.Database { if (before != null) { if (id > 0) { - select.where(@"local_time < ? OR (local_time = ? AND id < ?)", { before.to_unix().to_string(), before.to_unix().to_string(), id.to_string() }); + select.where(@"local_time < ? OR (local_time = ? AND message.id < ?)", { before.to_unix().to_string(), before.to_unix().to_string(), id.to_string() }); } else { select.with(message.id, "<", id); } } if (after != null) { if (id > 0) { - select.where(@"local_time > ? OR (local_time = ? AND id > ?)", { after.to_unix().to_string(), after.to_unix().to_string(), id.to_string() }); + select.where(@"local_time > ? OR (local_time = ? AND message.id > ?)", { after.to_unix().to_string(), after.to_unix().to_string(), id.to_string() }); } else { select.with(message.local_time, ">", (long) after.to_unix()); } @@ -430,6 +444,7 @@ public class Database : Qlite.Database { } select.outer_join_with(real_jid, real_jid.message_id, message.id); + select.outer_join_with(message_correction, message_correction.message_id, message.id); LinkedList ret = new LinkedList(); foreach (Row row in select) { diff --git a/libdino/src/service/message_correction.vala b/libdino/src/service/message_correction.vala new file mode 100644 index 00000000..320c0b7e --- /dev/null +++ b/libdino/src/service/message_correction.vala @@ -0,0 +1,175 @@ +using Gee; + +using Xmpp; +using Xmpp.Xep; +using Dino.Entities; +using Qlite; + +namespace Dino { + + +public class MessageCorrection : StreamInteractionModule, MessageListener { + public static ModuleIdentity IDENTITY = new ModuleIdentity("message_correction"); + public string id { get { return IDENTITY.id; } } + + public signal void received_correction(ContentItem content_item); + + private StreamInteractor stream_interactor; + private Database db; + private HashMap> last_messages = new HashMap>(Conversation.hash_func, Conversation.equals_func); + + private HashMap outstanding_correction_nodes = new HashMap(); + + public static void start(StreamInteractor stream_interactor, Database db) { + MessageCorrection m = new MessageCorrection(stream_interactor, db); + stream_interactor.add_module(m); + } + + public MessageCorrection(StreamInteractor stream_interactor, Database db) { + this.stream_interactor = stream_interactor; + this.db = db; + stream_interactor.account_added.connect(on_account_added); + stream_interactor.get_module(MessageProcessor.IDENTITY).received_pipeline.connect(this); + stream_interactor.get_module(MessageProcessor.IDENTITY).build_message_stanza.connect(check_add_correction_node); + stream_interactor.get_module(PresenceManager.IDENTITY).received_offline_presence.connect((jid, account) => { + Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(jid.bare_jid, account, Conversation.Type.GROUPCHAT); + if (conversation != null) { + if (last_messages.has_key(conversation)) last_messages[conversation].unset(jid); + } + }); + } + + public void send_correction(Conversation conversation, Message old_message, string correction_text) { + string stanza_id = old_message.edit_to ?? old_message.stanza_id; + + Message out_message = stream_interactor.get_module(MessageProcessor.IDENTITY).create_out_message(correction_text, conversation); + out_message.edit_to = stanza_id; + outstanding_correction_nodes[out_message.stanza_id] = stanza_id; + stream_interactor.get_module(MessageStorage.IDENTITY).add_message(out_message, conversation); + stream_interactor.get_module(MessageProcessor.IDENTITY).send_xmpp_message(out_message, conversation); + + db.message_correction.insert() + .value(db.message_correction.message_id, out_message.id) + .value(db.message_correction.to_stanza_id, stanza_id) + .perform(); + + db.content_item.update() + .with(db.content_item.foreign_id, "=", old_message.id) + .with(db.content_item.content_type, "=", 1) + .set(db.content_item.foreign_id, out_message.id) + .perform(); + + on_received_correction(conversation, out_message.id); + } + + public bool is_own_correction_allowed(Conversation conversation, Message message) { + string stanza_id = message.edit_to ?? message.stanza_id; + + Jid own_jid = conversation.account.full_jid; + if (conversation.type_ == Conversation.Type.GROUPCHAT) { + own_jid = stream_interactor.get_module(MucManager.IDENTITY).get_own_jid(conversation.counterpart, conversation.account); + } + return last_messages.has_key(conversation) && + last_messages[conversation].has_key(own_jid) && + last_messages[conversation][own_jid].stanza_id == stanza_id; + } + + private void check_add_correction_node(Entities.Message message, Xmpp.MessageStanza message_stanza, Conversation conversation) { + if (message.stanza_id in outstanding_correction_nodes) { + LastMessageCorrection.set_replace_id(message_stanza, outstanding_correction_nodes[message.stanza_id]); + outstanding_correction_nodes.unset(message.stanza_id); + } else { + if (!last_messages.has_key(conversation)) { + last_messages[conversation] = new HashMap(Jid.hash_func, Jid.equals_func); + } + last_messages[conversation][message.from] = message; + } + } + + public string[] after_actions_const = new string[]{ "DEDUPLICATE", "DECRYPT", "FILTER_EMPTY" }; + public override string action_group { get { return "CORRECTION"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { + string? replace_id = Xep.LastMessageCorrection.get_replace_id(stanza); + if (replace_id == null) { + if (!last_messages.has_key(conversation)) { + last_messages[conversation] = new HashMap(Jid.hash_func, Jid.equals_func); + } + last_messages[conversation][message.from] = message; + + return false; + } + + if (!last_messages.has_key(conversation) || !last_messages[conversation].has_key(message.from)) return false; + Message original_message = last_messages[conversation][message.from]; + if (original_message.stanza_id != replace_id) return false; + + int message_id_to_be_updated = get_latest_correction_message_id(conversation.account.id, replace_id, db.get_jid_id(message.counterpart), message.counterpart.resourcepart); + if (message_id_to_be_updated == -1) { + message_id_to_be_updated = original_message.id; + } + + db.message_correction.insert() + .value(db.message_correction.message_id, message.id) + .value(db.message_correction.to_stanza_id, replace_id) + .perform(); + + int current_correction_message_id = get_latest_correction_message_id(conversation.account.id, replace_id, db.get_jid_id(message.counterpart), message.counterpart.resourcepart); + + if (current_correction_message_id != message_id_to_be_updated) { + db.content_item.update() + .with(db.content_item.foreign_id, "=", message_id_to_be_updated) + .with(db.content_item.content_type, "=", 1) + .set(db.content_item.foreign_id, current_correction_message_id) + .perform(); + message.edit_to = replace_id; + + on_received_correction(conversation, current_correction_message_id); + } + + return true; + } + + private void on_received_correction(Conversation conversation, int message_id) { + ContentItem? content_item = stream_interactor.get_module(ContentItemStore.IDENTITY).get_item(conversation, 1, message_id); + received_correction(content_item); + } + + private int get_latest_correction_message_id(int account_id, string stanza_id, int counterpart_jid_id, string? counterpart_resource) { + var qry = db.message_correction.select({db.message.id}) + .join_with(db.message, db.message.id, db.message_correction.message_id) + .with(db.message.account_id, "=", account_id) + .with(db.message.counterpart_id, "=", counterpart_jid_id) + .with(db.message_correction.to_stanza_id, "=", stanza_id) + .order_by(db.message.time, "DESC"); + + if (counterpart_resource != null) { + qry.with(db.message.counterpart_resource, "=", counterpart_resource); + } + RowOption row = qry.single().row(); + if (row.is_present()) { + return row[db.message.id]; + } + return -1; + } + + private void on_account_added(Account account) { + Gee.List conversations = stream_interactor.get_module(ConversationManager.IDENTITY).get_active_conversations(account); + foreach (Conversation conversation in conversations) { + if (conversation.type_ != Conversation.Type.CHAT) continue; + + HashMap last_conversation_messages = new HashMap(Jid.hash_func, Jid.equals_func); + Gee.List messages = stream_interactor.get_module(MessageStorage.IDENTITY).get_messages(conversation); + for (int i = messages.size - 1; i > 0; i--) { + Message message = messages[i]; + if (!last_conversation_messages.has_key(message.from) && message.edit_to == null) { + last_conversation_messages[message.from] = message; + } + } + last_messages[conversation] = last_conversation_messages; + } + } +} + +} diff --git a/libdino/src/service/message_processor.vala b/libdino/src/service/message_processor.vala index fd719eda..0120fcd4 100644 --- a/libdino/src/service/message_processor.vala +++ b/libdino/src/service/message_processor.vala @@ -40,6 +40,7 @@ public class MessageProcessor : StreamInteractionModule, Object { received_pipeline.connect(new DeduplicateMessageListener(this, db)); received_pipeline.connect(new FilterMessageListener()); received_pipeline.connect(new StoreMessageListener(stream_interactor)); + received_pipeline.connect(new StoreContentItemListener(stream_interactor)); received_pipeline.connect(new MamMessageListener(stream_interactor)); stream_interactor.account_added.connect(on_account_added); @@ -62,6 +63,7 @@ public class MessageProcessor : StreamInteractionModule, Object { public Entities.Message send_message(Entities.Message message, Conversation conversation) { stream_interactor.get_module(MessageStorage.IDENTITY).add_message(message, conversation); + stream_interactor.get_module(ContentItemStore.IDENTITY).insert_message(message, conversation); send_xmpp_message(message, conversation); message_sent(message, conversation); return message; @@ -526,6 +528,25 @@ public class MessageProcessor : StreamInteractionModule, Object { } } + private class StoreContentItemListener : MessageListener { + + public string[] after_actions_const = new string[]{ "DEDUPLICATE", "DECRYPT", "FILTER_EMPTY", "STORE", "CORRECTION" }; + public override string action_group { get { return "STORE_CONTENT_ITEM"; } } + public override string[] after_actions { get { return after_actions_const; } } + + private StreamInteractor stream_interactor; + + public StoreContentItemListener(StreamInteractor stream_interactor) { + this.stream_interactor = stream_interactor; + } + + public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { + if (message.body == null) return true; + stream_interactor.get_module(ContentItemStore.IDENTITY).insert_message(message, conversation); + return false; + } + } + private class MamMessageListener : MessageListener { public string[] after_actions_const = new string[]{ "DEDUPLICATE" }; diff --git a/libdino/src/service/message_storage.vala b/libdino/src/service/message_storage.vala index 9c077109..50fc94b3 100644 --- a/libdino/src/service/message_storage.vala +++ b/libdino/src/service/message_storage.vala @@ -28,7 +28,6 @@ public class MessageStorage : StreamInteractionModule, Object { message.persist(db); init_conversation(conversation); messages[conversation].add(message); - stream_interactor.get_module(ContentItemStore.IDENTITY).insert_message(message, conversation); } public Gee.List get_messages(Conversation conversation, int count = 50) { diff --git a/libdino/src/service/module_manager.vala b/libdino/src/service/module_manager.vala index 0cd76a14..c0bc229e 100644 --- a/libdino/src/service/module_manager.vala +++ b/libdino/src/service/module_manager.vala @@ -79,6 +79,7 @@ public class ModuleManager { module_map[account].add(new Xep.JingleInBandBytestreams.Module()); module_map[account].add(new Xep.JingleFileTransfer.Module()); module_map[account].add(new Xep.Jet.Module()); + module_map[account].add(new Xep.LastMessageCorrection.Module()); initialize_account_modules(account, module_map[account]); } } -- cgit v1.2.3-70-g09d2