1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
using Gee;
using Xmpp;
using Xmpp.Xep;
using Dino.Entities;
using Qlite;
namespace Dino {
public class MessageCorrection : StreamInteractionModule, MessageListener {
public static ModuleIdentity<MessageCorrection> IDENTITY = new ModuleIdentity<MessageCorrection>("message_correction");
public string id { get { return IDENTITY.id; } }
public signal void received_correction(ContentItem content_item);
private StreamInteractor stream_interactor;
private Database db;
private HashMap<Conversation, HashMap<Jid, Message>> last_messages = new HashMap<Conversation, HashMap<Jid, Message>>(Conversation.hash_func, Conversation.equals_func);
private HashMap<string, string> outstanding_correction_nodes = new HashMap<string, string>();
public static void start(StreamInteractor stream_interactor, Database db) {
MessageCorrection m = new MessageCorrection(stream_interactor, db);
stream_interactor.add_module(m);
}
public MessageCorrection(StreamInteractor stream_interactor, Database db) {
this.stream_interactor = stream_interactor;
this.db = db;
stream_interactor.account_added.connect(on_account_added);
stream_interactor.get_module(MessageProcessor.IDENTITY).received_pipeline.connect(this);
stream_interactor.get_module(MessageProcessor.IDENTITY).build_message_stanza.connect(check_add_correction_node);
stream_interactor.get_module(PresenceManager.IDENTITY).received_offline_presence.connect((jid, account) => {
Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(jid.bare_jid, account, Conversation.Type.GROUPCHAT);
if (conversation != null) {
if (last_messages.has_key(conversation)) last_messages[conversation].unset(jid);
}
});
}
public void set_correction(Conversation conversation, Message message, Message old_message) {
string reference_stanza_id = old_message.edit_to ?? old_message.stanza_id;
outstanding_correction_nodes[message.stanza_id] = reference_stanza_id;
db.message_correction.insert()
.value(db.message_correction.message_id, message.id)
.value(db.message_correction.to_stanza_id, reference_stanza_id)
.perform();
db.content_item.update()
.with(db.content_item.foreign_id, "=", old_message.id)
.with(db.content_item.content_type, "=", 1)
.set(db.content_item.foreign_id, message.id)
.perform();
}
public bool is_own_correction_allowed(Conversation conversation, Message message) {
string stanza_id = message.edit_to ?? message.stanza_id;
Jid? own_jid = null;
if (conversation.type_ == Conversation.Type.CHAT) {
own_jid = conversation.account.full_jid;
} else if (conversation.type_ == Conversation.Type.GROUPCHAT) {
own_jid = stream_interactor.get_module(MucManager.IDENTITY).get_own_jid(conversation.counterpart, conversation.account);
}
if (own_jid == null) return false;
return last_messages.has_key(conversation) &&
last_messages[conversation].has_key(own_jid) &&
last_messages[conversation][own_jid].stanza_id == stanza_id;
}
private void check_add_correction_node(Entities.Message message, Xmpp.MessageStanza message_stanza, Conversation conversation) {
if (outstanding_correction_nodes.has_key(message.stanza_id)) {
LastMessageCorrection.set_replace_id(message_stanza, outstanding_correction_nodes[message.stanza_id]);
outstanding_correction_nodes.unset(message.stanza_id);
} else {
if (!last_messages.has_key(conversation)) {
last_messages[conversation] = new HashMap<Jid, Message>(Jid.hash_func, Jid.equals_func);
}
last_messages[conversation][message.from] = message;
}
}
public string[] after_actions_const = new string[]{ "DEDUPLICATE", "DECRYPT", "FILTER_EMPTY" };
public override string action_group { get { return "CORRECTION"; } }
public override string[] after_actions { get { return after_actions_const; } }
public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
if (conversation.type_ != Conversation.Type.CHAT) {
// Don't process messages or corrections from MUC history or MUC MAM
DateTime? mam_delay = Xep.DelayedDelivery.get_time_for_message(stanza, message.from.bare_jid);
if (mam_delay != null) return false;
if (Xmpp.MessageArchiveManagement.MessageFlag.get_flag(stanza) != null) return false;
}
string? replace_id = Xep.LastMessageCorrection.get_replace_id(stanza);
if (replace_id == null) {
if (!last_messages.has_key(conversation)) {
last_messages[conversation] = new HashMap<Jid, Message>(Jid.hash_func, Jid.equals_func);
}
last_messages[conversation][message.from] = message;
return false;
}
if (!last_messages.has_key(conversation) || !last_messages[conversation].has_key(message.from)) return false;
Message original_message = last_messages[conversation][message.from];
if (original_message.stanza_id != replace_id) return false;
int message_id_to_be_updated = get_latest_correction_message_id(conversation.account.id, replace_id, db.get_jid_id(message.counterpart), message.counterpart.resourcepart);
if (message_id_to_be_updated == -1) {
message_id_to_be_updated = original_message.id;
}
db.message_correction.insert()
.value(db.message_correction.message_id, message.id)
.value(db.message_correction.to_stanza_id, replace_id)
.perform();
int current_correction_message_id = get_latest_correction_message_id(conversation.account.id, replace_id, db.get_jid_id(message.counterpart), message.counterpart.resourcepart);
if (current_correction_message_id != message_id_to_be_updated) {
db.content_item.update()
.with(db.content_item.foreign_id, "=", message_id_to_be_updated)
.with(db.content_item.content_type, "=", 1)
.set(db.content_item.foreign_id, current_correction_message_id)
.perform();
message.edit_to = replace_id;
on_received_correction(conversation, current_correction_message_id);
return true;
}
return false;
}
public void on_received_correction(Conversation conversation, int message_id) {
ContentItem? content_item = stream_interactor.get_module(ContentItemStore.IDENTITY).get_item_by_foreign(conversation, 1, message_id);
if (content_item != null) {
received_correction(content_item);
}
}
private int get_latest_correction_message_id(int account_id, string stanza_id, int counterpart_jid_id, string? counterpart_resource) {
var qry = db.message_correction.select({db.message.id})
.join_with(db.message, db.message.id, db.message_correction.message_id)
.with(db.message.account_id, "=", account_id)
.with(db.message.counterpart_id, "=", counterpart_jid_id)
.with(db.message_correction.to_stanza_id, "=", stanza_id)
.order_by(db.message.time, "DESC");
if (counterpart_resource != null) {
qry.with(db.message.counterpart_resource, "=", counterpart_resource);
}
RowOption row = qry.single().row();
if (row.is_present()) {
return row[db.message.id];
}
return -1;
}
private void on_account_added(Account account) {
Gee.List<Conversation> conversations = stream_interactor.get_module(ConversationManager.IDENTITY).get_active_conversations(account);
foreach (Conversation conversation in conversations) {
if (conversation.type_ != Conversation.Type.CHAT) continue;
HashMap<Jid, Message> last_conversation_messages = new HashMap<Jid, Message>(Jid.hash_func, Jid.equals_func);
Gee.List<Message> messages = stream_interactor.get_module(MessageStorage.IDENTITY).get_messages(conversation);
for (int i = messages.size - 1; i > 0; i--) {
Message message = messages[i];
if (!last_conversation_messages.has_key(message.from) && message.edit_to == null) {
last_conversation_messages[message.from] = message;
}
}
last_messages[conversation] = last_conversation_messages;
}
}
}
}
|