aboutsummaryrefslogtreecommitdiff
path: root/libdino/src/service
diff options
context:
space:
mode:
authorfiaxh <git@mx.ax.lt>2017-04-04 15:47:00 +0200
committerfiaxh <git@mx.ax.lt>2017-04-04 15:57:35 +0200
commit75e51b5ed3b639b9cf7b16b0ddbee7e362c44ef1 (patch)
treef593528003a5f00268988255904dfc8a6752a779 /libdino/src/service
parentea5d3e50c6fd63ae3a151c883f691e76b9cb1018 (diff)
downloaddino-75e51b5ed3b639b9cf7b16b0ddbee7e362c44ef1.tar.gz
dino-75e51b5ed3b639b9cf7b16b0ddbee7e362c44ef1.zip
MessageStorage/Processor, correctly resolve conversations (fixup 22adbd3)
Diffstat (limited to 'libdino/src/service')
-rw-r--r--libdino/src/service/chat_interaction.vala8
-rw-r--r--libdino/src/service/conversation_manager.vala12
-rw-r--r--libdino/src/service/counterpart_interaction_manager.vala40
-rw-r--r--libdino/src/service/database.vala5
-rw-r--r--libdino/src/service/message_processor.vala (renamed from libdino/src/service/message_manager.vala)68
-rw-r--r--libdino/src/service/message_storage.vala68
-rw-r--r--libdino/src/service/muc_manager.vala12
-rw-r--r--libdino/src/service/util.vala19
8 files changed, 136 insertions, 96 deletions
diff --git a/libdino/src/service/chat_interaction.vala b/libdino/src/service/chat_interaction.vala
index 9943c3c3..891abf29 100644
--- a/libdino/src/service/chat_interaction.vala
+++ b/libdino/src/service/chat_interaction.vala
@@ -27,8 +27,8 @@ public class ChatInteraction : StreamInteractionModule, Object {
private ChatInteraction(StreamInteractor stream_interactor) {
this.stream_interactor = stream_interactor;
Timeout.add_seconds(30, update_interactions);
- stream_interactor.get_module(MessageManager.IDENTITY).message_received.connect(on_message_received);
- stream_interactor.get_module(MessageManager.IDENTITY).message_sent.connect(on_message_sent);
+ stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect(on_message_received);
+ stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect(on_message_sent);
}
public bool is_active_focus(Conversation? conversation = null) {
@@ -79,7 +79,7 @@ public class ChatInteraction : StreamInteractionModule, Object {
if (conversation == null) return;
conversation_read(selected_conversation);
check_send_read();
- selected_conversation.read_up_to = stream_interactor.get_module(MessageManager.IDENTITY).get_last_message(conversation);
+ selected_conversation.read_up_to = stream_interactor.get_module(MessageStorage.IDENTITY).get_last_message(conversation);
}
private void on_conversation_unfocused(Conversation? conversation) {
@@ -93,7 +93,7 @@ public class ChatInteraction : StreamInteractionModule, Object {
private void check_send_read() {
if (selected_conversation == null || selected_conversation.type_ == Conversation.Type.GROUPCHAT) return;
- Entities.Message? message = stream_interactor.get_module(MessageManager.IDENTITY).get_last_message(selected_conversation);
+ Entities.Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_last_message(selected_conversation);
if (message != null && message.direction == Entities.Message.DIRECTION_RECEIVED &&
message.stanza != null && !message.equals(selected_conversation.read_up_to)) {
selected_conversation.read_up_to = message;
diff --git a/libdino/src/service/conversation_manager.vala b/libdino/src/service/conversation_manager.vala
index ff4717ee..db9cff91 100644
--- a/libdino/src/service/conversation_manager.vala
+++ b/libdino/src/service/conversation_manager.vala
@@ -27,8 +27,8 @@ public class ConversationManager : StreamInteractionModule, Object {
stream_interactor.add_module(this);
stream_interactor.account_added.connect(on_account_added);
stream_interactor.get_module(MucManager.IDENTITY).groupchat_joined.connect(on_groupchat_joined);
- stream_interactor.get_module(MessageManager.IDENTITY).pre_message_received.connect(on_message_received);
- stream_interactor.get_module(MessageManager.IDENTITY).message_sent.connect(on_message_sent);
+ stream_interactor.get_module(MessageProcessor.IDENTITY).pre_message_received.connect(on_message_received);
+ stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect(on_message_sent);
}
public Conversation create_conversation(Jid jid, Account account, Conversation.Type? type = null) {
@@ -55,10 +55,14 @@ public class ConversationManager : StreamInteractionModule, Object {
}
public Gee.List<Conversation> get_conversations_for_presence(Show show, Account account) {
+ return get_conversations(show.jid, account);
+ }
+
+ public Gee.List<Conversation> get_conversations(Jid jid, Account account) {
Gee.List<Conversation> ret = new ArrayList<Conversation>(Conversation.equals_func);
- Conversation? bare_conversation = get_conversation(show.jid, account);
+ Conversation? bare_conversation = get_conversation(jid, account);
if (bare_conversation != null) ret.add(bare_conversation);
- Conversation? full_conversation = get_conversation(show.jid.bare_jid, account);
+ Conversation? full_conversation = get_conversation(jid.bare_jid, account);
if (full_conversation != null) ret.add(full_conversation);
return ret;
}
diff --git a/libdino/src/service/counterpart_interaction_manager.vala b/libdino/src/service/counterpart_interaction_manager.vala
index bfd473a2..75d2d7be 100644
--- a/libdino/src/service/counterpart_interaction_manager.vala
+++ b/libdino/src/service/counterpart_interaction_manager.vala
@@ -25,7 +25,7 @@ public class CounterpartInteractionManager : StreamInteractionModule, Object {
private CounterpartInteractionManager(StreamInteractor stream_interactor) {
this.stream_interactor = stream_interactor;
stream_interactor.account_added.connect(on_account_added);
- stream_interactor.get_module(MessageManager.IDENTITY).message_received.connect(on_message_received);
+ stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect(on_message_received);
}
public string? get_chat_state(Account account, Jid jid) {
@@ -54,28 +54,24 @@ public class CounterpartInteractionManager : StreamInteractionModule, Object {
}
private void on_chat_marker_received(Account account, Jid jid, string marker, string stanza_id) {
- Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(jid, account);
- if (conversation != null) {
- Gee.List<Entities.Message>? messages = stream_interactor.get_module(MessageManager.IDENTITY).get_messages(conversation);
- if (messages != null) { // TODO not here
- foreach (Entities.Message message in messages) {
- if (message.stanza_id == stanza_id) {
- switch (marker) {
- case Xep.ChatMarkers.MARKER_RECEIVED:
- received_message_received(account, jid, message);
- message.marked = Entities.Message.Marked.RECEIVED;
- break;
- case Xep.ChatMarkers.MARKER_DISPLAYED:
- last_read[jid] = message;
- received_message_displayed(account, jid, message);
- foreach (Entities.Message m in messages) {
- if (m.equals(message)) break;
- if (m.marked == Entities.Message.Marked.RECEIVED) m.marked = Entities.Message.Marked.READ;
- }
- message.marked = Entities.Message.Marked.READ;
- break;
+ foreach (Conversation conversation in stream_interactor.get_module(ConversationManager.IDENTITY).get_conversations(jid, account)) {
+ Entities.Message? message = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(stanza_id, conversation);
+ if (message != null) {
+ switch (marker) {
+ case Xep.ChatMarkers.MARKER_RECEIVED:
+ received_message_received(account, jid, message);
+ message.marked = Entities.Message.Marked.RECEIVED;
+ break;
+ case Xep.ChatMarkers.MARKER_DISPLAYED:
+ last_read[jid] = message;
+ received_message_displayed(account, jid, message);
+ Gee.List<Entities.Message> messages = stream_interactor.get_module(MessageStorage.IDENTITY).get_messages(conversation);
+ foreach (Entities.Message m in messages) {
+ if (m.equals(message)) break;
+ if (m.marked == Entities.Message.Marked.RECEIVED) m.marked = Entities.Message.Marked.READ;
}
- }
+ message.marked = Entities.Message.Marked.READ;
+ break;
}
}
}
diff --git a/libdino/src/service/database.vala b/libdino/src/service/database.vala
index 797fd6c9..a74ac056 100644
--- a/libdino/src/service/database.vala
+++ b/libdino/src/service/database.vala
@@ -190,7 +190,10 @@ public class Database : Qlite.Database {
public Gee.List<Message> get_unsend_messages(Account account) {
Gee.List<Message> ret = new ArrayList<Message>();
- foreach (Row row in message.select().with(message.marked, "=", (int) Message.Marked.UNSENT)) {
+ var select = message.select()
+ .with(message.account_id, "=", account.id)
+ .with(message.marked, "=", (int) Message.Marked.UNSENT);
+ foreach (Row row in select) {
ret.add(new Message.from_row(this, row));
}
return ret;
diff --git a/libdino/src/service/message_manager.vala b/libdino/src/service/message_processor.vala
index 73f49237..10954672 100644
--- a/libdino/src/service/message_manager.vala
+++ b/libdino/src/service/message_processor.vala
@@ -5,8 +5,8 @@ using Dino.Entities;
namespace Dino {
-public class MessageManager : StreamInteractionModule, Object {
- public static ModuleIdentity<MessageManager> IDENTITY = new ModuleIdentity<MessageManager>("message_manager");
+public class MessageProcessor : StreamInteractionModule, Object {
+ public static ModuleIdentity<MessageProcessor> IDENTITY = new ModuleIdentity<MessageProcessor>("message_manager");
public string id { get { return IDENTITY.id; } }
public signal void pre_message_received(Entities.Message message, Xmpp.Message.Stanza message_stanza, Conversation conversation);
@@ -17,14 +17,14 @@ public class MessageManager : StreamInteractionModule, Object {
private StreamInteractor stream_interactor;
private Database db;
- private HashMap<Conversation, ArrayList<Entities.Message>> messages = new HashMap<Conversation, ArrayList<Entities.Message>>(Conversation.hash_func, Conversation.equals_func);
+ private Object lock_send_unsent;
public static void start(StreamInteractor stream_interactor, Database db) {
- MessageManager m = new MessageManager(stream_interactor, db);
+ MessageProcessor m = new MessageProcessor(stream_interactor, db);
stream_interactor.add_module(m);
}
- private MessageManager(StreamInteractor stream_interactor, Database db) {
+ private MessageProcessor(StreamInteractor stream_interactor, Database db) {
this.stream_interactor = stream_interactor;
this.db = db;
stream_interactor.account_added.connect(on_account_added);
@@ -35,52 +35,12 @@ public class MessageManager : StreamInteractionModule, Object {
public void send_message(string text, Conversation conversation) {
Entities.Message message = create_out_message(text, conversation);
- add_message(message, conversation);
+ stream_interactor.get_module(MessageStorage.IDENTITY).add_message(message, conversation);
message.persist(db);
send_xmpp_message(message, conversation);
message_sent(message, conversation);
}
- public Gee.List<Entities.Message>? get_messages(Conversation conversation, int count = 50) {
- if (messages.has_key(conversation) && messages[conversation].size > 0) {
- Gee.List<Entities.Message> db_messages = db.get_messages(conversation.counterpart, conversation.account, get_message_type_for_conversation(conversation), count, messages[conversation][0]);
- db_messages.add_all(messages[conversation]);
- return db_messages;
- } else {
- Gee.List<Entities.Message> db_messages = db.get_messages(conversation.counterpart, conversation.account, get_message_type_for_conversation(conversation), count, null);
- return db_messages;
- }
- }
-
- public Entities.Message? get_last_message(Conversation conversation) {
- if (messages.has_key(conversation) && messages[conversation].size > 0) {
- return messages[conversation][messages[conversation].size - 1];
- } else {
- Gee.List<Entities.Message> db_messages = db.get_messages(conversation.counterpart, conversation.account, get_message_type_for_conversation(conversation), 1, null);
- if (db_messages.size >= 1) {
- return db_messages[0];
- }
- }
- return null;
- }
-
- public Gee.List<Entities.Message>? get_messages_before(Conversation? conversation, Entities.Message before) {
- Gee.List<Entities.Message> db_messages = db.get_messages(conversation.counterpart, conversation.account, get_message_type_for_conversation(conversation), 20, before);
- return db_messages;
- }
-
- private Entities.Message.Type get_message_type_for_conversation(Conversation conversation) {
- switch (conversation.type_) {
- case Conversation.Type.CHAT:
- return Entities.Message.Type.CHAT;
- case Conversation.Type.GROUPCHAT:
- return Entities.Message.Type.GROUPCHAT;
- case Conversation.Type.GROUPCHAT_PM:
- return Entities.Message.Type.GROUPCHAT_PM;
- }
- assert_not_reached();
- }
-
private void on_account_added(Account account) {
stream_interactor.module_manager.get_module(account, Xmpp.Message.Module.IDENTITY).received_message.connect( (stream, message) => {
on_message_received(account, message);
@@ -135,8 +95,7 @@ public class MessageManager : StreamInteractionModule, Object {
bool is_uuid = new_message.stanza_id != null && Regex.match_simple("""[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}""", new_message.stanza_id);
if ((is_uuid && !db.contains_message_by_stanza_id(new_message.stanza_id, conversation.account)) ||
(!is_uuid && !db.contains_message(new_message, conversation.account))) {
- new_message.persist(db);
- add_message(new_message, conversation);
+ stream_interactor.get_module(MessageStorage.IDENTITY).add_message(new_message, conversation);
if (new_message.direction == Entities.Message.DIRECTION_SENT) {
message_sent(new_message, conversation);
} else {
@@ -161,7 +120,7 @@ public class MessageManager : StreamInteractionModule, Object {
} else {
Core.XmppStream stream = stream_interactor.get_stream(account);
if (stream != null) stream.get_module(Xep.ServiceDiscovery.Module.IDENTITY).get_entity_categories(stream, message.counterpart.bare_jid.to_string(), (stream, identities, store) => {
- Triple<MessageManager, Entities.Message, Xmpp.Message.Stanza> triple = store as Triple<MessageManager, Entities.Message, Xmpp.Message.Stanza>;
+ Triple<MessageProcessor, Entities.Message, Xmpp.Message.Stanza> triple = store as Triple<MessageProcessor, Entities.Message, Xmpp.Message.Stanza>;
Entities.Message m = triple.b;
if (identities == null) {
m.type_ = Entities.Message.Type.CHAT;
@@ -181,16 +140,9 @@ public class MessageManager : StreamInteractionModule, Object {
}
}
- private void add_message(Entities.Message message, Conversation conversation) {
- if (!messages.has_key(conversation)) {
- messages[conversation] = new ArrayList<Entities.Message>(Entities.Message.equals_func);
- }
- messages[conversation].add(message);
- }
-
private Entities.Message create_out_message(string text, Conversation conversation) {
Entities.Message message = new Entities.Message(text);
- message.type_ = get_message_type_for_conversation(conversation);
+ message.type_ = Util.get_message_type_for_conversation(conversation);
message.stanza_id = random_uuid();
message.account = conversation.account;
message.body = text;
@@ -207,7 +159,7 @@ public class MessageManager : StreamInteractionModule, Object {
}
public void send_xmpp_message(Entities.Message message, Conversation conversation, bool delayed = false) {
- lock (messages) {
+ lock (lock_send_unsent) {
Core.XmppStream stream = stream_interactor.get_stream(conversation.account);
message.marked = Entities.Message.Marked.NONE;
if (stream != null) {
diff --git a/libdino/src/service/message_storage.vala b/libdino/src/service/message_storage.vala
new file mode 100644
index 00000000..ea765f96
--- /dev/null
+++ b/libdino/src/service/message_storage.vala
@@ -0,0 +1,68 @@
+using Gee;
+
+using Dino.Entities;
+
+namespace Dino {
+
+public class MessageStorage : StreamInteractionModule, Object {
+ public static ModuleIdentity<MessageStorage> IDENTITY = new ModuleIdentity<MessageStorage>("message_cache");
+ public string id { get { return IDENTITY.id; } }
+
+ private StreamInteractor stream_interactor;
+ private Database db;
+
+ private HashMap<Conversation, Gee.List<Message>> messages = new HashMap<Conversation, Gee.List<Entities.Message>>(Conversation.hash_func, Conversation.equals_func);
+
+ public static void start(StreamInteractor stream_interactor, Database db) {
+ MessageStorage m = new MessageStorage(stream_interactor, db);
+ stream_interactor.add_module(m);
+ }
+
+ private MessageStorage(StreamInteractor stream_interactor, Database db) {
+ this.stream_interactor = stream_interactor;
+ this.db = db;
+ }
+
+ public void add_message(Message message, Conversation conversation) {
+ message.persist(db);
+ init_conversation(conversation);
+ messages[conversation].add(message);
+ }
+
+ public Gee.List<Message> get_messages(Conversation conversation, int count = 50) {
+ init_conversation(conversation);
+ if (messages[conversation].size > 0) {
+ return messages[conversation][int.max(messages[conversation].size - count - 1, 0) : messages[conversation].size];
+ }
+ return new ArrayList<Conversation>();
+ }
+
+ public Message? get_last_message(Conversation conversation) {
+ init_conversation(conversation);
+ if (messages[conversation].size > 0) {
+ return messages[conversation][messages[conversation].size - 1];
+ }
+ return null;
+ }
+
+ public Gee.List<Message>? get_messages_before(Conversation? conversation, Message before, int count = 20) {
+ Gee.List<Message> db_messages = db.get_messages(conversation.counterpart, conversation.account, Util.get_message_type_for_conversation(conversation), count, before);
+ return db_messages;
+ }
+
+ public Message? get_message_by_id(string stanza_id, Conversation conversation) {
+ init_conversation(conversation);
+ for (int i = messages[conversation].size - 1; i > 0; i--) {
+ if (messages[conversation][i].stanza_id == stanza_id) return messages[conversation][i];
+ }
+ return null;
+ }
+
+ private void init_conversation(Conversation conversation) {
+ if (!messages.has_key(conversation)) {
+ messages[conversation] = db.get_messages(conversation.counterpart, conversation.account, Util.get_message_type_for_conversation(conversation), 50, null);
+ }
+ }
+}
+
+} \ No newline at end of file
diff --git a/libdino/src/service/muc_manager.vala b/libdino/src/service/muc_manager.vala
index c5bfb8ba..f63a557c 100644
--- a/libdino/src/service/muc_manager.vala
+++ b/libdino/src/service/muc_manager.vala
@@ -24,7 +24,7 @@ public class MucManager : StreamInteractionModule, Object {
this.stream_interactor = stream_interactor;
stream_interactor.account_added.connect(on_account_added);
stream_interactor.stream_negotiated.connect(on_stream_negotiated);
- stream_interactor.get_module(MessageManager.IDENTITY).pre_message_received.connect(on_pre_message_received);
+ stream_interactor.get_module(MessageProcessor.IDENTITY).pre_message_received.connect(on_pre_message_received);
}
public void join(Account account, Jid jid, string nick, string? password = null) {
@@ -170,12 +170,10 @@ public class MucManager : StreamInteractionModule, Object {
}
string? muc_nick = stream.get_flag(Xep.Muc.Flag.IDENTITY).get_muc_nick(conversation.counterpart.bare_jid.to_string());
if (muc_nick != null && message.from.equals(new Jid(@"$(message.from.bare_jid)/$muc_nick"))) { // TODO better from own
- Gee.List<Entities.Message>? messages = stream_interactor.get_module(MessageManager.IDENTITY).get_messages(conversation);
- if (messages != null) { // TODO not here
- foreach (Entities.Message m in messages) {
- if (m.equals(message)) {
- m.marked = Entities.Message.Marked.RECEIVED;
- }
+ Gee.List<Entities.Message> messages = stream_interactor.get_module(MessageStorage.IDENTITY).get_messages(conversation);
+ foreach (Entities.Message m in messages) { // TODO not here
+ if (m.equals(message)) {
+ m.marked = Entities.Message.Marked.RECEIVED;
}
}
}
diff --git a/libdino/src/service/util.vala b/libdino/src/service/util.vala
new file mode 100644
index 00000000..d0e19dc3
--- /dev/null
+++ b/libdino/src/service/util.vala
@@ -0,0 +1,19 @@
+using Dino.Entities;
+
+namespace Dino {
+
+public class Util {
+ public static Entities.Message.Type get_message_type_for_conversation(Conversation conversation) {
+ switch (conversation.type_) {
+ case Conversation.Type.CHAT:
+ return Entities.Message.Type.CHAT;
+ case Conversation.Type.GROUPCHAT:
+ return Entities.Message.Type.GROUPCHAT;
+ case Conversation.Type.GROUPCHAT_PM:
+ return Entities.Message.Type.GROUPCHAT_PM;
+ }
+ assert_not_reached();
+ }
+}
+
+} \ No newline at end of file