summaryrefslogtreecommitdiff
path: root/fs/dlm/midcomms.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/midcomms.c')
-rw-r--r--fs/dlm/midcomms.c131
1 files changed, 66 insertions, 65 deletions
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index fc015a6abe17..c02c43e4980a 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -146,8 +146,8 @@
/* init value for sequence numbers for testing purpose only e.g. overflows */
#define DLM_SEQ_INIT 0
-/* 3 minutes wait to sync ending of dlm */
-#define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(3 * 60 * 1000)
+/* 5 seconds wait to sync ending of dlm */
+#define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000)
#define DLM_VERSION_NOT_SET 0
struct midcomms_node {
@@ -375,7 +375,7 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
struct dlm_msg *msg;
char *ppc;
- msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_NOFS, &ppc,
+ msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_ATOMIC, &ppc,
NULL, NULL);
if (!msg)
return -ENOMEM;
@@ -402,10 +402,11 @@ static int dlm_send_fin(struct midcomms_node *node,
struct dlm_mhandle *mh;
char *ppc;
- mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_NOFS, &ppc);
+ mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_ATOMIC, &ppc);
if (!mh)
return -ENOMEM;
+ set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags);
mh->ack_rcv = ack_rcv;
m_header = (struct dlm_header *)ppc;
@@ -417,7 +418,6 @@ static int dlm_send_fin(struct midcomms_node *node,
pr_debug("sending fin msg to node %d\n", node->nodeid);
dlm_midcomms_commit_mhandle(mh, NULL, 0);
- set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags);
return 0;
}
@@ -467,7 +467,7 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
break;
default:
spin_unlock(&node->state_lock);
- log_print("%s: unexpected state: %d\n",
+ log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
@@ -498,18 +498,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
switch (p->header.h_cmd) {
case DLM_FIN:
- /* send ack before fin */
- dlm_send_ack(node->nodeid, node->seq_next);
-
spin_lock(&node->state_lock);
pr_debug("receive fin msg from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
switch (node->state) {
case DLM_ESTABLISHED:
- node->state = DLM_CLOSE_WAIT;
- pr_debug("switch node %d to state %s\n",
- node->nodeid, dlm_state_str(node->state));
+ dlm_send_ack(node->nodeid, node->seq_next);
+
/* passive shutdown DLM_LAST_ACK case 1
* additional we check if the node is used by
* cluster manager events at all.
@@ -518,34 +514,38 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
node->state = DLM_LAST_ACK;
pr_debug("switch node %d to state %s case 1\n",
node->nodeid, dlm_state_str(node->state));
- spin_unlock(&node->state_lock);
- goto send_fin;
+ set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
+ dlm_send_fin(node, dlm_pas_fin_ack_rcv);
+ } else {
+ node->state = DLM_CLOSE_WAIT;
+ pr_debug("switch node %d to state %s\n",
+ node->nodeid, dlm_state_str(node->state));
}
break;
case DLM_FIN_WAIT1:
+ dlm_send_ack(node->nodeid, node->seq_next);
node->state = DLM_CLOSING;
+ set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
break;
case DLM_FIN_WAIT2:
+ dlm_send_ack(node->nodeid, node->seq_next);
midcomms_node_reset(node);
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
- wake_up(&node->shutdown_wait);
break;
case DLM_LAST_ACK:
/* probably remove_member caught it, do nothing */
break;
default:
spin_unlock(&node->state_lock);
- log_print("%s: unexpected state: %d\n",
+ log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
spin_unlock(&node->state_lock);
-
- set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
break;
default:
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
@@ -564,12 +564,6 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
seq, node->seq_next, node->nodeid);
}
-
- return;
-
-send_fin:
- set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
- dlm_send_fin(node, dlm_pas_fin_ack_rcv);
}
static struct midcomms_node *
@@ -612,16 +606,8 @@ dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
case DLM_ESTABLISHED:
break;
default:
- /* some invalid state passive shutdown
- * was failed, we try to reset and
- * hope it will go on.
- */
- log_print("reset node %d because shutdown stuck",
- node->nodeid);
-
- midcomms_node_reset(node);
- node->state = DLM_ESTABLISHED;
- break;
+ spin_unlock(&node->state_lock);
+ return NULL;
}
spin_unlock(&node->state_lock);
}
@@ -671,6 +657,7 @@ static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
switch (node->version) {
case DLM_VERSION_NOT_SET:
node->version = DLM_VERSION_3_2;
+ wake_up(&node->shutdown_wait);
log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
node->nodeid);
break;
@@ -840,6 +827,7 @@ static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
switch (node->version) {
case DLM_VERSION_NOT_SET:
node->version = DLM_VERSION_3_1;
+ wake_up(&node->shutdown_wait);
log_print("version 0x%08x for node %d detected", DLM_VERSION_3_1,
node->nodeid);
break;
@@ -1214,8 +1202,15 @@ void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh,
dlm_free_mhandle(mh);
break;
case DLM_VERSION_3_2:
+ /* held rcu read lock here, because we sending the
+ * dlm message out, when we do that we could receive
+ * an ack back which releases the mhandle and we
+ * get a use after free.
+ */
+ rcu_read_lock();
dlm_midcomms_commit_msg_3_2(mh, name, namelen);
srcu_read_unlock(&nodes_srcu, mh->idx);
+ rcu_read_unlock();
break;
default:
srcu_read_unlock(&nodes_srcu, mh->idx);
@@ -1266,7 +1261,6 @@ static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
midcomms_node_reset(node);
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
- wake_up(&node->shutdown_wait);
break;
case DLM_CLOSED:
/* not valid but somehow we got what we want */
@@ -1274,7 +1268,7 @@ static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
break;
default:
spin_unlock(&node->state_lock);
- log_print("%s: unexpected state: %d\n",
+ log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
@@ -1362,11 +1356,11 @@ void dlm_midcomms_remove_member(int nodeid)
case DLM_CLOSE_WAIT:
/* passive shutdown DLM_LAST_ACK case 2 */
node->state = DLM_LAST_ACK;
- spin_unlock(&node->state_lock);
-
pr_debug("switch node %d to state %s case 2\n",
node->nodeid, dlm_state_str(node->state));
- goto send_fin;
+ set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
+ dlm_send_fin(node, dlm_pas_fin_ack_rcv);
+ break;
case DLM_LAST_ACK:
/* probably receive fin caught it, do nothing */
break;
@@ -1374,7 +1368,7 @@ void dlm_midcomms_remove_member(int nodeid)
/* already gone, do nothing */
break;
default:
- log_print("%s: unexpected state: %d\n",
+ log_print("%s: unexpected state: %d",
__func__, node->state);
break;
}
@@ -1382,12 +1376,6 @@ void dlm_midcomms_remove_member(int nodeid)
spin_unlock(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
- return;
-
-send_fin:
- set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
- dlm_send_fin(node, dlm_pas_fin_ack_rcv);
- srcu_read_unlock(&nodes_srcu, idx);
}
static void midcomms_node_release(struct rcu_head *rcu)
@@ -1395,9 +1383,31 @@ static void midcomms_node_release(struct rcu_head *rcu)
struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
+ dlm_send_queue_flush(node);
kfree(node);
}
+void dlm_midcomms_version_wait(void)
+{
+ struct midcomms_node *node;
+ int i, idx, ret;
+
+ idx = srcu_read_lock(&nodes_srcu);
+ for (i = 0; i < CONN_HASH_SIZE; i++) {
+ hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
+ ret = wait_event_timeout(node->shutdown_wait,
+ node->version != DLM_VERSION_NOT_SET ||
+ node->state == DLM_CLOSED ||
+ test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
+ DLM_SHUTDOWN_TIMEOUT);
+ if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags))
+ pr_debug("version wait timed out for node %d with state %s\n",
+ node->nodeid, dlm_state_str(node->state));
+ }
+ }
+ srcu_read_unlock(&nodes_srcu, idx);
+}
+
static void midcomms_shutdown(struct midcomms_node *node)
{
int ret;
@@ -1418,11 +1428,11 @@ static void midcomms_shutdown(struct midcomms_node *node)
node->state = DLM_FIN_WAIT1;
pr_debug("switch node %d to state %s case 2\n",
node->nodeid, dlm_state_str(node->state));
+ dlm_send_fin(node, dlm_act_fin_ack_rcv);
break;
case DLM_CLOSED:
/* we have what we want */
- spin_unlock(&node->state_lock);
- return;
+ break;
default:
/* busy to enter DLM_FIN_WAIT1, wait until passive
* done in shutdown_wait to enter DLM_CLOSED.
@@ -1431,29 +1441,20 @@ static void midcomms_shutdown(struct midcomms_node *node)
}
spin_unlock(&node->state_lock);
- if (node->state == DLM_FIN_WAIT1) {
- dlm_send_fin(node, dlm_act_fin_ack_rcv);
-
- if (DLM_DEBUG_FENCE_TERMINATION)
- msleep(5000);
- }
+ if (DLM_DEBUG_FENCE_TERMINATION)
+ msleep(5000);
/* wait for other side dlm + fin */
ret = wait_event_timeout(node->shutdown_wait,
node->state == DLM_CLOSED ||
test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
DLM_SHUTDOWN_TIMEOUT);
- if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) {
+ if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags))
pr_debug("active shutdown timed out for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
- midcomms_node_reset(node);
- dlm_lowcomms_shutdown_node(node->nodeid, true);
- return;
- }
-
- pr_debug("active shutdown done for node %d with state %s\n",
- node->nodeid, dlm_state_str(node->state));
- dlm_lowcomms_shutdown_node(node->nodeid, false);
+ else
+ pr_debug("active shutdown done for node %d with state %s\n",
+ node->nodeid, dlm_state_str(node->state));
}
void dlm_midcomms_shutdown(void)
@@ -1461,8 +1462,6 @@ void dlm_midcomms_shutdown(void)
struct midcomms_node *node;
int i, idx;
- dlm_lowcomms_shutdown();
-
mutex_lock(&close_lock);
idx = srcu_read_lock(&nodes_srcu);
for (i = 0; i < CONN_HASH_SIZE; i++) {
@@ -1480,6 +1479,8 @@ void dlm_midcomms_shutdown(void)
}
srcu_read_unlock(&nodes_srcu, idx);
mutex_unlock(&close_lock);
+
+ dlm_lowcomms_shutdown();
}
int dlm_midcomms_close(int nodeid)