summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--fs/dlm/lowcomms.c30
-rw-r--r--fs/dlm/midcomms.c76
2 files changed, 31 insertions, 75 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 345a316ae54c..68092f953830 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -860,30 +860,8 @@ struct dlm_processed_nodes {
struct list_head list;
};
-static void add_processed_node(int nodeid, struct list_head *processed_nodes)
-{
- struct dlm_processed_nodes *n;
-
- list_for_each_entry(n, processed_nodes, list) {
- /* we already remembered this node */
- if (n->nodeid == nodeid)
- return;
- }
-
- /* if it's fails in worst case we simple don't send an ack back.
- * We try it next time.
- */
- n = kmalloc(sizeof(*n), GFP_NOFS);
- if (!n)
- return;
-
- n->nodeid = nodeid;
- list_add(&n->list, processed_nodes);
-}
-
static void process_dlm_messages(struct work_struct *work)
{
- struct dlm_processed_nodes *n, *n_tmp;
struct processqueue_entry *pentry;
LIST_HEAD(processed_nodes);
@@ -902,7 +880,6 @@ static void process_dlm_messages(struct work_struct *work)
for (;;) {
dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
pentry->buflen);
- add_processed_node(pentry->nodeid, &processed_nodes);
free_processqueue_entry(pentry);
spin_lock(&processqueue_lock);
@@ -917,13 +894,6 @@ static void process_dlm_messages(struct work_struct *work)
list_del(&pentry->list);
spin_unlock(&processqueue_lock);
}
-
- /* send ack back after we processed couple of messages */
- list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) {
- list_del(&n->list);
- dlm_midcomms_receive_done(n->nodeid);
- kfree(n);
- }
}
/* Data received from remote end */
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 70286737eb0a..e1a0df67b566 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -148,6 +148,8 @@
/* 5 seconds wait to sync ending of dlm */
#define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000)
#define DLM_VERSION_NOT_SET 0
+#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32
+#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8)
struct midcomms_node {
int nodeid;
@@ -165,7 +167,7 @@ struct midcomms_node {
#define DLM_NODE_FLAG_CLOSE 1
#define DLM_NODE_FLAG_STOP_TX 2
#define DLM_NODE_FLAG_STOP_RX 3
-#define DLM_NODE_ULP_DELIVERED 4
+ atomic_t ulp_delivered;
unsigned long flags;
wait_queue_head_t shutdown_wait;
@@ -319,6 +321,7 @@ static void midcomms_node_reset(struct midcomms_node *node)
atomic_set(&node->seq_next, DLM_SEQ_INIT);
atomic_set(&node->seq_send, DLM_SEQ_INIT);
+ atomic_set(&node->ulp_delivered, 0);
node->version = DLM_VERSION_NOT_SET;
node->flags = 0;
@@ -393,6 +396,28 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
return 0;
}
+static void dlm_send_ack_threshold(struct midcomms_node *node,
+ uint32_t threshold)
+{
+ uint32_t oval, nval;
+ bool send_ack;
+
+ /* let only send one user trigger threshold to send ack back */
+ do {
+ oval = atomic_read(&node->ulp_delivered);
+ send_ack = (oval > threshold);
+ /* abort if threshold is not reached */
+ if (!send_ack)
+ break;
+
+ nval = 0;
+ /* try to reset ulp_delivered counter */
+ } while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval);
+
+ if (send_ack)
+ dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
+}
+
static int dlm_send_fin(struct midcomms_node *node,
void (*ack_rcv)(struct midcomms_node *node))
{
@@ -560,7 +585,9 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
dlm_receive_buffer_3_2_trace(seq, p);
dlm_receive_buffer(p, node->nodeid);
- set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
+ atomic_inc(&node->ulp_delivered);
+ /* unlikely case to send ack back when we don't transmit */
+ dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD);
break;
}
} else {
@@ -969,49 +996,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
return ret;
}
-void dlm_midcomms_receive_done(int nodeid)
-{
- struct midcomms_node *node;
- int idx;
-
- idx = srcu_read_lock(&nodes_srcu);
- node = nodeid2node(nodeid, 0);
- if (!node) {
- srcu_read_unlock(&nodes_srcu, idx);
- return;
- }
-
- /* old protocol, we do nothing */
- switch (node->version) {
- case DLM_VERSION_3_2:
- break;
- default:
- srcu_read_unlock(&nodes_srcu, idx);
- return;
- }
-
- /* do nothing if we didn't delivered stateful to ulp */
- if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
- &node->flags)) {
- srcu_read_unlock(&nodes_srcu, idx);
- return;
- }
-
- spin_lock(&node->state_lock);
- /* we only ack if state is ESTABLISHED */
- switch (node->state) {
- case DLM_ESTABLISHED:
- spin_unlock(&node->state_lock);
- dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
- break;
- default:
- spin_unlock(&node->state_lock);
- /* do nothing FIN has it's own ack send */
- break;
- }
- srcu_read_unlock(&nodes_srcu, idx);
-}
-
void dlm_midcomms_unack_msg_resend(int nodeid)
{
struct midcomms_node *node;
@@ -1142,6 +1126,8 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
goto err;
}
+ /* send ack back if necessary */
+ dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
break;
default:
dlm_free_mhandle(mh);