aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfiaxh <git@lightrise.org>2020-03-05 12:21:43 +0100
committerfiaxh <git@lightrise.org>2020-03-05 12:21:43 +0100
commit013b388896315c7ac10e6cd7d36f913559998f83 (patch)
tree3d2ca3a52c81c0c981bb38fd0b756dab6000b7b6
parentb8b3e1c6f517a18a917250f17a64ba1bd19140a8 (diff)
downloaddino-013b388896315c7ac10e6cd7d36f913559998f83.tar.gz
dino-013b388896315c7ac10e6cd7d36f913559998f83.zip
Make message sending async and set unsent on error
-rw-r--r--libdino/src/service/message_processor.vala66
-rw-r--r--xmpp-vala/src/module/message/module.vala7
-rw-r--r--xmpp-vala/src/module/xep/0045_muc/module.vala4
-rw-r--r--xmpp-vala/src/module/xep/0085_chat_state_notifications.vala2
-rw-r--r--xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala2
-rw-r--r--xmpp-vala/src/module/xep/0333_chat_markers.vala2
6 files changed, 45 insertions, 38 deletions
diff --git a/libdino/src/service/message_processor.vala b/libdino/src/service/message_processor.vala
index f1526b16..04518e72 100644
--- a/libdino/src/service/message_processor.vala
+++ b/libdino/src/service/message_processor.vala
@@ -22,7 +22,6 @@ public class MessageProcessor : StreamInteractionModule, Object {
private StreamInteractor stream_interactor;
private Database db;
- private Object lock_send_unsent;
private HashMap<Account, int> current_catchup_id = new HashMap<Account, int>(Account.hash_func, Account.equals_func);
private HashMap<Account, HashMap<string, DateTime>> mam_times = new HashMap<Account, HashMap<string, DateTime>>();
public HashMap<string, int> hitted_range = new HashMap<string, int>();
@@ -565,39 +564,48 @@ public class MessageProcessor : StreamInteractionModule, Object {
}
public void send_xmpp_message(Entities.Message message, Conversation conversation, bool delayed = false) {
- lock (lock_send_unsent) {
- XmppStream stream = stream_interactor.get_stream(conversation.account);
- message.marked = Entities.Message.Marked.NONE;
- if (stream != null) {
- Xmpp.MessageStanza new_message = new Xmpp.MessageStanza(message.stanza_id);
- new_message.to = message.counterpart;
- new_message.body = message.body;
- if (conversation.type_ == Conversation.Type.GROUPCHAT) {
- new_message.type_ = Xmpp.MessageStanza.TYPE_GROUPCHAT;
- } else {
- new_message.type_ = Xmpp.MessageStanza.TYPE_CHAT;
- }
- build_message_stanza(message, new_message, conversation);
- pre_message_send(message, new_message, conversation);
- if (message.marked == Entities.Message.Marked.UNSENT || message.marked == Entities.Message.Marked.WONTSEND) return;
- if (delayed) {
- Xmpp.Xep.DelayedDelivery.Module.set_message_delay(new_message, message.time);
- }
+ XmppStream stream = stream_interactor.get_stream(conversation.account);
+ message.marked = Entities.Message.Marked.NONE;
- // Set an origin ID if a MUC doen't guarantee to keep IDs
- if (conversation.type_ == Conversation.Type.GROUPCHAT) {
- Xep.Muc.Flag? flag = stream.get_flag(Xep.Muc.Flag.IDENTITY);
- if (flag == null) return;
- if(!flag.has_room_feature(conversation.counterpart, Xep.Muc.Feature.STABLE_ID)) {
- Xep.UniqueStableStanzaIDs.set_origin_id(new_message, message.stanza_id);
- }
- }
+ if (stream == null) {
+ message.marked = Entities.Message.Marked.UNSENT;
+ return;
+ }
- stream.get_module(Xmpp.MessageModule.IDENTITY).send_message(stream, new_message);
- } else {
+ MessageStanza new_message = new MessageStanza(message.stanza_id);
+ new_message.to = message.counterpart;
+ new_message.body = message.body;
+ if (conversation.type_ == Conversation.Type.GROUPCHAT) {
+ new_message.type_ = MessageStanza.TYPE_GROUPCHAT;
+ } else {
+ new_message.type_ = MessageStanza.TYPE_CHAT;
+ }
+ build_message_stanza(message, new_message, conversation);
+ pre_message_send(message, new_message, conversation);
+ if (message.marked == Entities.Message.Marked.UNSENT || message.marked == Entities.Message.Marked.WONTSEND) return;
+ if (delayed) {
+ DelayedDelivery.Module.set_message_delay(new_message, message.time);
+ }
+
+ // Set an origin ID if a MUC doen't guarantee to keep IDs
+ if (conversation.type_ == Conversation.Type.GROUPCHAT) {
+ Xep.Muc.Flag? flag = stream.get_flag(Xep.Muc.Flag.IDENTITY);
+ if (flag == null) {
message.marked = Entities.Message.Marked.UNSENT;
+ return;
+ }
+ if(!flag.has_room_feature(conversation.counterpart, Xep.Muc.Feature.STABLE_ID)) {
+ UniqueStableStanzaIDs.set_origin_id(new_message, message.stanza_id);
}
}
+
+ stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, new_message, (_, res) => {
+ try {
+ stream.get_module(MessageModule.IDENTITY).send_message.end(res);
+ } catch (IOStreamError e) {
+ message.marked = Entities.Message.Marked.UNSENT;
+ }
+ });
}
}
diff --git a/xmpp-vala/src/module/message/module.vala b/xmpp-vala/src/module/message/module.vala
index 5ddbbe1a..5f7d40f0 100644
--- a/xmpp-vala/src/module/message/module.vala
+++ b/xmpp-vala/src/module/message/module.vala
@@ -14,10 +14,9 @@ namespace Xmpp {
public signal void received_message(XmppStream stream, MessageStanza message);
public signal void received_message_unprocessed(XmppStream stream, MessageStanza message);
- public void send_message(XmppStream stream, MessageStanza message) {
- send_pipeline.run.begin(stream, message, (obj, res) => {
- stream.write(message.stanza);
- });
+ public async void send_message(XmppStream stream, MessageStanza message) throws IOStreamError {
+ yield send_pipeline.run(stream, message);
+ yield stream.write_async(message.stanza);
}
public async void received_message_stanza_async(XmppStream stream, StanzaNode node) {
diff --git a/xmpp-vala/src/module/xep/0045_muc/module.vala b/xmpp-vala/src/module/xep/0045_muc/module.vala
index 79fd2c31..59cb703e 100644
--- a/xmpp-vala/src/module/xep/0045_muc/module.vala
+++ b/xmpp-vala/src/module/xep/0045_muc/module.vala
@@ -131,7 +131,7 @@ public class Module : XmppStreamModule {
message.to = jid;
message.type_ = MessageStanza.TYPE_GROUPCHAT;
message.stanza.put_node((new StanzaNode.build("subject")).put_node(new StanzaNode.text(subject)));
- stream.get_module(MessageModule.IDENTITY).send_message(stream, message);
+ stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, message);
}
public void change_nick(XmppStream stream, Jid jid, string new_nick) {
@@ -151,7 +151,7 @@ public class Module : XmppStreamModule {
StanzaNode invite_node = new StanzaNode.build("x", NS_URI_USER).add_self_xmlns()
.put_node(new StanzaNode.build("invite", NS_URI_USER).put_attribute("to", jid.to_string()));
message.stanza.put_node(invite_node);
- stream.get_module(MessageModule.IDENTITY).send_message(stream, message);
+ stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, message);
}
public void kick(XmppStream stream, Jid jid, string nick) {
diff --git a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala
index 7c1c9172..bf7515cb 100644
--- a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala
+++ b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala
@@ -27,7 +27,7 @@ public class Module : XmppStreamModule {
MessageProcessingHints.set_message_hint(message, MessageProcessingHints.HINT_NO_STORE);
- stream.get_module(MessageModule.IDENTITY).send_message(stream, message);
+ stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, message);
}
public override void attach(XmppStream stream) {
diff --git a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala
index b51178c7..199cfee8 100644
--- a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala
+++ b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala
@@ -12,7 +12,7 @@ namespace Xmpp.Xep.MessageDeliveryReceipts {
MessageStanza received_message = new MessageStanza();
received_message.to = from;
received_message.stanza.put_node(new StanzaNode.build("received", NS_URI).add_self_xmlns().put_attribute("id", message_id));
- stream.get_module(MessageModule.IDENTITY).send_message(stream, received_message);
+ stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, received_message);
}
public static bool requests_receipt(MessageStanza message) {
diff --git a/xmpp-vala/src/module/xep/0333_chat_markers.vala b/xmpp-vala/src/module/xep/0333_chat_markers.vala
index 257f6967..7f92fbe3 100644
--- a/xmpp-vala/src/module/xep/0333_chat_markers.vala
+++ b/xmpp-vala/src/module/xep/0333_chat_markers.vala
@@ -21,7 +21,7 @@ public class Module : XmppStreamModule {
received_message.to = jid;
received_message.type_ = type_;
received_message.stanza.put_node(new StanzaNode.build(marker, NS_URI).add_self_xmlns().put_attribute("id", message_id));
- stream.get_module(MessageModule.IDENTITY).send_message(stream, received_message);
+ stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, received_message);
}
public static bool requests_marking(MessageStanza message) {