diff options
author | Marvin W <git@larma.de> | 2023-02-06 14:39:59 +0100 |
---|---|---|
committer | Marvin W <git@larma.de> | 2023-02-07 10:50:45 +0100 |
commit | f74c1f18b12df0d650f74b6fa43b7f2f0a9bce79 (patch) | |
tree | f2b63b25b226872fba2820a76819f7269512da57 | |
parent | d76e12b215eb62e4eda5a0f92fbf5c1bd7c1848e (diff) | |
download | dino-f74c1f18b12df0d650f74b6fa43b7f2f0a9bce79.tar.gz dino-f74c1f18b12df0d650f74b6fa43b7f2f0a9bce79.zip |
Deduplicate messages before storing in database
-rw-r--r-- | libdino/src/service/message_processor.vala | 136 |
1 files changed, 70 insertions, 66 deletions
diff --git a/libdino/src/service/message_processor.vala b/libdino/src/service/message_processor.vala index ecac4004..12bbeeac 100644 --- a/libdino/src/service/message_processor.vala +++ b/libdino/src/service/message_processor.vala @@ -34,9 +34,9 @@ public class MessageProcessor : StreamInteractionModule, Object { this.db = db; this.history_sync = new HistorySync(db, stream_interactor); - received_pipeline.connect(new DeduplicateMessageListener(this, db)); + received_pipeline.connect(new DeduplicateMessageListener(this)); received_pipeline.connect(new FilterMessageListener()); - received_pipeline.connect(new StoreMessageListener(stream_interactor)); + received_pipeline.connect(new StoreMessageListener(this, stream_interactor)); received_pipeline.connect(new StoreContentItemListener(stream_interactor)); received_pipeline.connect(new MamMessageListener(stream_interactor)); @@ -233,6 +233,67 @@ public class MessageProcessor : StreamInteractionModule, Object { return Entities.Message.Type.CHAT; } + private bool is_duplicate(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { + Account account = conversation.account; + + // Deduplicate by server_id + if (message.server_id != null) { + QueryBuilder builder = db.message.select() + .with(db.message.server_id, "=", message.server_id) + .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart)) + .with(db.message.account_id, "=", account.id); + + // If the message is a duplicate + if (builder.count() > 0) { + history_sync.on_server_id_duplicate(account, stanza, message); + return true; + } + } + + // Deduplicate messages by uuid + bool is_uuid = message.stanza_id != null && Regex.match_simple("""[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}""", message.stanza_id); + if (is_uuid) { + QueryBuilder builder = db.message.select() + .with(db.message.stanza_id, "=", message.stanza_id) + .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart)) + .with(db.message.account_id, "=", account.id); + if (message.direction == Message.DIRECTION_RECEIVED) { + if (message.counterpart.resourcepart != null) { + builder.with(db.message.counterpart_resource, "=", message.counterpart.resourcepart); + } else { + builder.with_null(db.message.counterpart_resource); + } + } else if (message.direction == Message.DIRECTION_SENT) { + if (message.ourpart.resourcepart != null) { + builder.with(db.message.our_resource, "=", message.ourpart.resourcepart); + } else { + builder.with_null(db.message.our_resource); + } + } + bool duplicate = builder.single().row().is_present(); + return duplicate; + } + + // Deduplicate messages based on content and metadata + QueryBuilder builder = db.message.select() + .with(db.message.account_id, "=", account.id) + .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart)) + .with(db.message.body, "=", message.body) + .with(db.message.time, "<", (long) message.time.add_minutes(1).to_unix()) + .with(db.message.time, ">", (long) message.time.add_minutes(-1).to_unix()); + if (message.stanza_id != null) { + builder.with(db.message.stanza_id, "=", message.stanza_id); + } else { + builder.with_null(db.message.stanza_id); + } + if (message.counterpart.resourcepart != null) { + builder.with(db.message.counterpart_resource, "=", message.counterpart.resourcepart); + } else { + builder.with_null(db.message.counterpart_resource); + } + return builder.count() > 0; + } + private class DeduplicateMessageListener : MessageListener { public string[] after_actions_const = new string[]{ "FILTER_EMPTY", "MUC" }; @@ -240,73 +301,13 @@ public class MessageProcessor : StreamInteractionModule, Object { public override string[] after_actions { get { return after_actions_const; } } private MessageProcessor outer; - private Database db; - public DeduplicateMessageListener(MessageProcessor outer, Database db) { + public DeduplicateMessageListener(MessageProcessor outer) { this.outer = outer; - this.db = db; } public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { - Account account = conversation.account; - - - // Deduplicate by server_id - if (message.server_id != null) { - QueryBuilder builder = db.message.select() - .with(db.message.server_id, "=", message.server_id) - .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart)) - .with(db.message.account_id, "=", account.id); - - // If the message is a duplicate - if (builder.count() > 0) { - outer.history_sync.on_server_id_duplicate(account, stanza, message); - return true; - } - } - - // Deduplicate messages by uuid - bool is_uuid = message.stanza_id != null && Regex.match_simple("""[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}""", message.stanza_id); - if (is_uuid) { - QueryBuilder builder = db.message.select() - .with(db.message.stanza_id, "=", message.stanza_id) - .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart)) - .with(db.message.account_id, "=", account.id); - if (message.direction == Message.DIRECTION_RECEIVED) { - if (message.counterpart.resourcepart != null) { - builder.with(db.message.counterpart_resource, "=", message.counterpart.resourcepart); - } else { - builder.with_null(db.message.counterpart_resource); - } - } else if (message.direction == Message.DIRECTION_SENT) { - if (message.ourpart.resourcepart != null) { - builder.with(db.message.our_resource, "=", message.ourpart.resourcepart); - } else { - builder.with_null(db.message.our_resource); - } - } - bool duplicate = builder.single().row().is_present(); - return duplicate; - } - - // Deduplicate messages based on content and metadata - QueryBuilder builder = db.message.select() - .with(db.message.account_id, "=", account.id) - .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart)) - .with(db.message.body, "=", message.body) - .with(db.message.time, "<", (long) message.time.add_minutes(1).to_unix()) - .with(db.message.time, ">", (long) message.time.add_minutes(-1).to_unix()); - if (message.stanza_id != null) { - builder.with(db.message.stanza_id, "=", message.stanza_id); - } else { - builder.with_null(db.message.stanza_id); - } - if (message.counterpart.resourcepart != null) { - builder.with(db.message.counterpart_resource, "=", message.counterpart.resourcepart); - } else { - builder.with_null(db.message.counterpart_resource); - } - return builder.count() > 0; + return outer.is_duplicate(message, stanza, conversation); } } @@ -327,14 +328,17 @@ public class MessageProcessor : StreamInteractionModule, Object { public override string action_group { get { return "STORE"; } } public override string[] after_actions { get { return after_actions_const; } } + private MessageProcessor outer; private StreamInteractor stream_interactor; - public StoreMessageListener(StreamInteractor stream_interactor) { + public StoreMessageListener(MessageProcessor outer, StreamInteractor stream_interactor) { + this.outer = outer; this.stream_interactor = stream_interactor; } public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { - if (message.body == null) return true; + if (message.body == null || outer.is_duplicate(message, stanza, conversation)) return true; + stream_interactor.get_module(MessageStorage.IDENTITY).add_message(message, conversation); return false; } |