aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarvin W <git@larma.de>2023-02-06 14:39:59 +0100
committerMarvin W <git@larma.de>2023-02-07 10:50:45 +0100
commitf74c1f18b12df0d650f74b6fa43b7f2f0a9bce79 (patch)
treef2b63b25b226872fba2820a76819f7269512da57
parentd76e12b215eb62e4eda5a0f92fbf5c1bd7c1848e (diff)
downloaddino-f74c1f18b12df0d650f74b6fa43b7f2f0a9bce79.tar.gz
dino-f74c1f18b12df0d650f74b6fa43b7f2f0a9bce79.zip
Deduplicate messages before storing in database
-rw-r--r--libdino/src/service/message_processor.vala136
1 files changed, 70 insertions, 66 deletions
diff --git a/libdino/src/service/message_processor.vala b/libdino/src/service/message_processor.vala
index ecac4004..12bbeeac 100644
--- a/libdino/src/service/message_processor.vala
+++ b/libdino/src/service/message_processor.vala
@@ -34,9 +34,9 @@ public class MessageProcessor : StreamInteractionModule, Object {
this.db = db;
this.history_sync = new HistorySync(db, stream_interactor);
- received_pipeline.connect(new DeduplicateMessageListener(this, db));
+ received_pipeline.connect(new DeduplicateMessageListener(this));
received_pipeline.connect(new FilterMessageListener());
- received_pipeline.connect(new StoreMessageListener(stream_interactor));
+ received_pipeline.connect(new StoreMessageListener(this, stream_interactor));
received_pipeline.connect(new StoreContentItemListener(stream_interactor));
received_pipeline.connect(new MamMessageListener(stream_interactor));
@@ -233,6 +233,67 @@ public class MessageProcessor : StreamInteractionModule, Object {
return Entities.Message.Type.CHAT;
}
+ private bool is_duplicate(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
+ Account account = conversation.account;
+
+ // Deduplicate by server_id
+ if (message.server_id != null) {
+ QueryBuilder builder = db.message.select()
+ .with(db.message.server_id, "=", message.server_id)
+ .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart))
+ .with(db.message.account_id, "=", account.id);
+
+ // If the message is a duplicate
+ if (builder.count() > 0) {
+ history_sync.on_server_id_duplicate(account, stanza, message);
+ return true;
+ }
+ }
+
+ // Deduplicate messages by uuid
+ bool is_uuid = message.stanza_id != null && Regex.match_simple("""[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}""", message.stanza_id);
+ if (is_uuid) {
+ QueryBuilder builder = db.message.select()
+ .with(db.message.stanza_id, "=", message.stanza_id)
+ .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart))
+ .with(db.message.account_id, "=", account.id);
+ if (message.direction == Message.DIRECTION_RECEIVED) {
+ if (message.counterpart.resourcepart != null) {
+ builder.with(db.message.counterpart_resource, "=", message.counterpart.resourcepart);
+ } else {
+ builder.with_null(db.message.counterpart_resource);
+ }
+ } else if (message.direction == Message.DIRECTION_SENT) {
+ if (message.ourpart.resourcepart != null) {
+ builder.with(db.message.our_resource, "=", message.ourpart.resourcepart);
+ } else {
+ builder.with_null(db.message.our_resource);
+ }
+ }
+ bool duplicate = builder.single().row().is_present();
+ return duplicate;
+ }
+
+ // Deduplicate messages based on content and metadata
+ QueryBuilder builder = db.message.select()
+ .with(db.message.account_id, "=", account.id)
+ .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart))
+ .with(db.message.body, "=", message.body)
+ .with(db.message.time, "<", (long) message.time.add_minutes(1).to_unix())
+ .with(db.message.time, ">", (long) message.time.add_minutes(-1).to_unix());
+ if (message.stanza_id != null) {
+ builder.with(db.message.stanza_id, "=", message.stanza_id);
+ } else {
+ builder.with_null(db.message.stanza_id);
+ }
+ if (message.counterpart.resourcepart != null) {
+ builder.with(db.message.counterpart_resource, "=", message.counterpart.resourcepart);
+ } else {
+ builder.with_null(db.message.counterpart_resource);
+ }
+ return builder.count() > 0;
+ }
+
private class DeduplicateMessageListener : MessageListener {
public string[] after_actions_const = new string[]{ "FILTER_EMPTY", "MUC" };
@@ -240,73 +301,13 @@ public class MessageProcessor : StreamInteractionModule, Object {
public override string[] after_actions { get { return after_actions_const; } }
private MessageProcessor outer;
- private Database db;
- public DeduplicateMessageListener(MessageProcessor outer, Database db) {
+ public DeduplicateMessageListener(MessageProcessor outer) {
this.outer = outer;
- this.db = db;
}
public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
- Account account = conversation.account;
-
-
- // Deduplicate by server_id
- if (message.server_id != null) {
- QueryBuilder builder = db.message.select()
- .with(db.message.server_id, "=", message.server_id)
- .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart))
- .with(db.message.account_id, "=", account.id);
-
- // If the message is a duplicate
- if (builder.count() > 0) {
- outer.history_sync.on_server_id_duplicate(account, stanza, message);
- return true;
- }
- }
-
- // Deduplicate messages by uuid
- bool is_uuid = message.stanza_id != null && Regex.match_simple("""[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}""", message.stanza_id);
- if (is_uuid) {
- QueryBuilder builder = db.message.select()
- .with(db.message.stanza_id, "=", message.stanza_id)
- .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart))
- .with(db.message.account_id, "=", account.id);
- if (message.direction == Message.DIRECTION_RECEIVED) {
- if (message.counterpart.resourcepart != null) {
- builder.with(db.message.counterpart_resource, "=", message.counterpart.resourcepart);
- } else {
- builder.with_null(db.message.counterpart_resource);
- }
- } else if (message.direction == Message.DIRECTION_SENT) {
- if (message.ourpart.resourcepart != null) {
- builder.with(db.message.our_resource, "=", message.ourpart.resourcepart);
- } else {
- builder.with_null(db.message.our_resource);
- }
- }
- bool duplicate = builder.single().row().is_present();
- return duplicate;
- }
-
- // Deduplicate messages based on content and metadata
- QueryBuilder builder = db.message.select()
- .with(db.message.account_id, "=", account.id)
- .with(db.message.counterpart_id, "=", db.get_jid_id(message.counterpart))
- .with(db.message.body, "=", message.body)
- .with(db.message.time, "<", (long) message.time.add_minutes(1).to_unix())
- .with(db.message.time, ">", (long) message.time.add_minutes(-1).to_unix());
- if (message.stanza_id != null) {
- builder.with(db.message.stanza_id, "=", message.stanza_id);
- } else {
- builder.with_null(db.message.stanza_id);
- }
- if (message.counterpart.resourcepart != null) {
- builder.with(db.message.counterpart_resource, "=", message.counterpart.resourcepart);
- } else {
- builder.with_null(db.message.counterpart_resource);
- }
- return builder.count() > 0;
+ return outer.is_duplicate(message, stanza, conversation);
}
}
@@ -327,14 +328,17 @@ public class MessageProcessor : StreamInteractionModule, Object {
public override string action_group { get { return "STORE"; } }
public override string[] after_actions { get { return after_actions_const; } }
+ private MessageProcessor outer;
private StreamInteractor stream_interactor;
- public StoreMessageListener(StreamInteractor stream_interactor) {
+ public StoreMessageListener(MessageProcessor outer, StreamInteractor stream_interactor) {
+ this.outer = outer;
this.stream_interactor = stream_interactor;
}
public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
- if (message.body == null) return true;
+ if (message.body == null || outer.is_duplicate(message, stanza, conversation)) return true;
+
stream_interactor.get_module(MessageStorage.IDENTITY).add_message(message, conversation);
return false;
}