summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2022-03-01 17:25:12 +0300
committerDavid S. Miller <davem@davemloft.net>2022-03-01 17:25:12 +0300
commit7282c126f7688f697d33f3b965c29bba67fb4eba (patch)
tree2da8de087c96a637611ed2b31add76f48dd03e89 /net
parent1e385c08249e4822e0f425efde1896d3933d1471 (diff)
parent6b88af839d204c9283ae09357555e5c4f56c6da5 (diff)
downloadlinux-7282c126f7688f697d33f3b965c29bba67fb4eba.tar.xz
Merge branch 'smc-datapath-opts'
Dust Li says: ==================== net/smc: some datapath performance optimizations This series tries to improve the performance of SMC in datapath. - patch #1, add sysctl interface to support tuning the behaviour of SMC in container environment. - patch #2/#3, add autocorking support which is very efficient for small messages without trade-off for latency. - patch #4, send directly on setting TCP_NODELAY, without wake up the TX worker, this make it consistent with clearing TCP_CORK. - patch #5, this correct the setting of RMB window update limit, so we don't send CDC messages to update peer's RMB window too frequently in some cases. - patch #6, implemented something like NAPI in SMC, decrease the number of hardirq when busy. - patch #7, this moves TX work doing in the BH to the user context when sock_lock is hold by user. With this patchset applied, we can get a good performance gain: - qperf tcp_bw test has shown a great improvement. Other benchmarks like 'netperf TCP_STREAM' or 'sockperf throughput' has similar result. - In my testing environment, running qperf tcp_bw and tcp_lat, SMC behaves better then TCP in most all message size. Here are some test results with the following testing command: client: smc_run taskset -c 1 qperf smc-server -oo msg_size:1:64K:*2 \ -t 30 -vu tcp_{bw|lat} server: smc_run taskset -c 1 qperf ==== Bandwidth ==== MsgSize Origin SMC TCP SMC with patches 1 0.578 MB/s 2.392 MB/s(313.57%) 2.561 MB/s(342.83%) 2 1.159 MB/s 4.780 MB/s(312.53%) 5.162 MB/s(345.46%) 4 2.283 MB/s 10.266 MB/s(349.77%) 10.122 MB/s(343.46%) 8 4.668 MB/s 19.040 MB/s(307.86%) 20.521 MB/s(339.59%) 16 9.147 MB/s 38.904 MB/s(325.31%) 40.823 MB/s(346.29%) 32 18.369 MB/s 79.587 MB/s(333.25%) 80.535 MB/s(338.42%) 64 36.562 MB/s 148.668 MB/s(306.61%) 158.170 MB/s(332.60%) 128 72.961 MB/s 274.913 MB/s(276.80%) 316.217 MB/s(333.41%) 256 144.705 MB/s 512.059 MB/s(253.86%) 626.019 MB/s(332.62%) 512 288.873 MB/s 884.977 MB/s(206.35%) 1221.596 MB/s(322.88%) 1024 574.180 MB/s 1337.736 MB/s(132.98%) 2203.156 MB/s(283.70%) 2048 1095.192 MB/s 1865.952 MB/s( 70.38%) 3036.448 MB/s(177.25%) 4096 2066.157 MB/s 2380.337 MB/s( 15.21%) 3834.271 MB/s( 85.58%) 8192 3717.198 MB/s 2733.073 MB/s(-26.47%) 4904.910 MB/s( 31.95%) 16384 4742.221 MB/s 2958.693 MB/s(-37.61%) 5220.272 MB/s( 10.08%) 32768 5349.550 MB/s 3061.285 MB/s(-42.77%) 5321.865 MB/s( -0.52%) 65536 5162.919 MB/s 3731.408 MB/s(-27.73%) 5245.021 MB/s( 1.59%) ==== Latency ==== MsgSize Origin SMC TCP SMC with patches 1 10.540 us 11.938 us( 13.26%) 10.356 us( -1.75%) 2 10.996 us 11.992 us( 9.06%) 10.073 us( -8.39%) 4 10.229 us 11.687 us( 14.25%) 9.996 us( -2.28%) 8 10.203 us 11.653 us( 14.21%) 10.063 us( -1.37%) 16 10.530 us 11.313 us( 7.44%) 10.013 us( -4.91%) 32 10.241 us 11.586 us( 13.13%) 10.081 us( -1.56%) 64 10.693 us 11.652 us( 8.97%) 9.986 us( -6.61%) 128 10.597 us 11.579 us( 9.27%) 10.262 us( -3.16%) 256 10.409 us 11.957 us( 14.87%) 10.148 us( -2.51%) 512 11.088 us 12.505 us( 12.78%) 10.206 us( -7.95%) 1024 11.240 us 12.255 us( 9.03%) 10.631 us( -5.42%) 2048 11.485 us 16.970 us( 47.76%) 10.981 us( -4.39%) 4096 12.077 us 13.948 us( 15.49%) 11.847 us( -1.90%) 8192 13.683 us 16.693 us( 22.00%) 13.336 us( -2.54%) 16384 16.470 us 23.615 us( 43.38%) 16.519 us( 0.30%) 32768 22.540 us 40.966 us( 81.75%) 22.452 us( -0.39%) 65536 34.192 us 73.003 us(113.51%) 33.916 us( -0.81%) ------------ Test environment notes: 1. Testing is run on 2 VMs within the same physical host 2. The NIC is ConnectX-4Lx, using SRIOV, and passing through 2 VFs to the 2 VMs respectively. 3. To decrease jitter, VM's vCPU are binded to each physical CPU, and those physical CPUs are all isolated using boot parameter `isolcpus=xxx` 4. The queue number are set to 1, and interrupt from the queue is binded to CPU0 in the guest ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net')
-rw-r--r--net/smc/Makefile2
-rw-r--r--net/smc/af_smc.c30
-rw-r--r--net/smc/smc.h6
-rw-r--r--net/smc/smc_cdc.c24
-rw-r--r--net/smc/smc_core.c2
-rw-r--r--net/smc/smc_sysctl.c80
-rw-r--r--net/smc/smc_sysctl.h32
-rw-r--r--net/smc/smc_tx.c107
-rw-r--r--net/smc/smc_wr.c49
9 files changed, 290 insertions, 42 deletions
diff --git a/net/smc/Makefile b/net/smc/Makefile
index 196fb6f01b14..640af9a39f9c 100644
--- a/net/smc/Makefile
+++ b/net/smc/Makefile
@@ -4,4 +4,4 @@ obj-$(CONFIG_SMC) += smc.o
obj-$(CONFIG_SMC_DIAG) += smc_diag.o
smc-y := af_smc.o smc_pnet.o smc_ib.o smc_clc.o smc_core.o smc_wr.o smc_llc.o
smc-y += smc_cdc.o smc_tx.o smc_rx.o smc_close.o smc_ism.o smc_netlink.o smc_stats.o
-smc-y += smc_tracepoint.o
+smc-y += smc_tracepoint.o smc_sysctl.o
diff --git a/net/smc/af_smc.c b/net/smc/af_smc.c
index 30acc31b2c45..6447607675fa 100644
--- a/net/smc/af_smc.c
+++ b/net/smc/af_smc.c
@@ -51,6 +51,7 @@
#include "smc_close.h"
#include "smc_stats.h"
#include "smc_tracepoint.h"
+#include "smc_sysctl.h"
static DEFINE_MUTEX(smc_server_lgr_pending); /* serialize link group
* creation on server
@@ -192,12 +193,27 @@ void smc_unhash_sk(struct sock *sk)
}
EXPORT_SYMBOL_GPL(smc_unhash_sk);
+/* This will be called before user really release sock_lock. So do the
+ * work which we didn't do because of user hold the sock_lock in the
+ * BH context
+ */
+static void smc_release_cb(struct sock *sk)
+{
+ struct smc_sock *smc = smc_sk(sk);
+
+ if (smc->conn.tx_in_release_sock) {
+ smc_tx_pending(&smc->conn);
+ smc->conn.tx_in_release_sock = false;
+ }
+}
+
struct proto smc_proto = {
.name = "SMC",
.owner = THIS_MODULE,
.keepalive = smc_set_keepalive,
.hash = smc_hash_sk,
.unhash = smc_unhash_sk,
+ .release_cb = smc_release_cb,
.obj_size = sizeof(struct smc_sock),
.h.smc_hash = &smc_v4_hashinfo,
.slab_flags = SLAB_TYPESAFE_BY_RCU,
@@ -210,6 +226,7 @@ struct proto smc_proto6 = {
.keepalive = smc_set_keepalive,
.hash = smc_hash_sk,
.unhash = smc_unhash_sk,
+ .release_cb = smc_release_cb,
.obj_size = sizeof(struct smc_sock),
.h.smc_hash = &smc_v6_hashinfo,
.slab_flags = SLAB_TYPESAFE_BY_RCU,
@@ -2795,8 +2812,8 @@ static int smc_setsockopt(struct socket *sock, int level, int optname,
sk->sk_state != SMC_CLOSED) {
if (val) {
SMC_STAT_INC(smc, ndly_cnt);
- mod_delayed_work(smc->conn.lgr->tx_wq,
- &smc->conn.tx_work, 0);
+ smc_tx_pending(&smc->conn);
+ cancel_delayed_work(&smc->conn.tx_work);
}
}
break;
@@ -3273,9 +3290,17 @@ static int __init smc_init(void)
goto out_sock;
}
+ rc = smc_sysctl_init();
+ if (rc) {
+ pr_err("%s: sysctl_init fails with %d\n", __func__, rc);
+ goto out_ulp;
+ }
+
static_branch_enable(&tcp_have_smc);
return 0;
+out_ulp:
+ tcp_unregister_ulp(&smc_ulp_ops);
out_sock:
sock_unregister(PF_SMC);
out_proto6:
@@ -3303,6 +3328,7 @@ out_pernet_subsys:
static void __exit smc_exit(void)
{
static_branch_disable(&tcp_have_smc);
+ smc_sysctl_exit();
tcp_unregister_ulp(&smc_ulp_ops);
sock_unregister(PF_SMC);
smc_core_exit();
diff --git a/net/smc/smc.h b/net/smc/smc.h
index a096d8af21a0..ea0620529ebe 100644
--- a/net/smc/smc.h
+++ b/net/smc/smc.h
@@ -29,6 +29,7 @@
#define SMC_MAX_ISM_DEVS 8 /* max # of proposed non-native ISM
* devices
*/
+#define SMC_AUTOCORKING_DEFAULT_SIZE 0x10000 /* 64K by default */
extern struct proto smc_proto;
extern struct proto smc_proto6;
@@ -192,6 +193,7 @@ struct smc_connection {
* - dec on polled tx cqe
*/
wait_queue_head_t cdc_pend_tx_wq; /* wakeup on no cdc_pend_tx_wr*/
+ atomic_t tx_pushing; /* nr_threads trying tx push */
struct delayed_work tx_work; /* retry of smc_cdc_msg_send */
u32 tx_off; /* base offset in peer rmb */
@@ -211,6 +213,10 @@ struct smc_connection {
* data still pending
*/
char urg_rx_byte; /* urgent byte */
+ bool tx_in_release_sock;
+ /* flush pending tx data in
+ * sock release_cb()
+ */
atomic_t bytes_to_rcv; /* arrived data,
* not yet received
*/
diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c
index 9d5a97168969..5c731f27996e 100644
--- a/net/smc/smc_cdc.c
+++ b/net/smc/smc_cdc.c
@@ -48,9 +48,19 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
conn->tx_cdc_seq_fin = cdcpend->ctrl_seq;
}
- if (atomic_dec_and_test(&conn->cdc_pend_tx_wr) &&
- unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq)))
- wake_up(&conn->cdc_pend_tx_wq);
+ if (atomic_dec_and_test(&conn->cdc_pend_tx_wr)) {
+ /* If user owns the sock_lock, mark the connection need sending.
+ * User context will later try to send when it release sock_lock
+ * in smc_release_cb()
+ */
+ if (sock_owned_by_user(&smc->sk))
+ conn->tx_in_release_sock = true;
+ else
+ smc_tx_pending(conn);
+
+ if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq)))
+ wake_up(&conn->cdc_pend_tx_wq);
+ }
WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0);
smc_tx_sndbuf_nonfull(smc);
@@ -350,8 +360,12 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc,
/* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */
if ((diff_cons && smc_tx_prepared_sends(conn)) ||
conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
- conn->local_rx_ctrl.prod_flags.urg_data_pending)
- smc_tx_sndbuf_nonempty(conn);
+ conn->local_rx_ctrl.prod_flags.urg_data_pending) {
+ if (!sock_owned_by_user(&smc->sk))
+ smc_tx_pending(conn);
+ else
+ conn->tx_in_release_sock = true;
+ }
if (diff_cons && conn->urg_tx_pend &&
atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) {
diff --git a/net/smc/smc_core.c b/net/smc/smc_core.c
index 29525d03b253..1674b2549f8b 100644
--- a/net/smc/smc_core.c
+++ b/net/smc/smc_core.c
@@ -1988,7 +1988,7 @@ static struct smc_buf_desc *smc_buf_get_slot(int compressed_bufsize,
*/
static inline int smc_rmb_wnd_update_limit(int rmbe_size)
{
- return min_t(int, rmbe_size / 10, SOCK_MIN_SNDBUF / 2);
+ return max_t(int, rmbe_size / 10, SOCK_MIN_SNDBUF / 2);
}
/* map an rmb buf to a link */
diff --git a/net/smc/smc_sysctl.c b/net/smc/smc_sysctl.c
new file mode 100644
index 000000000000..3b59876aaac9
--- /dev/null
+++ b/net/smc/smc_sysctl.c
@@ -0,0 +1,80 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Shared Memory Communications over RDMA (SMC-R) and RoCE
+ *
+ * smc_sysctl.c: sysctl interface to SMC subsystem.
+ *
+ * Copyright (c) 2022, Alibaba Inc.
+ *
+ * Author: Tony Lu <tonylu@linux.alibaba.com>
+ *
+ */
+
+#include <linux/init.h>
+#include <linux/sysctl.h>
+#include <net/net_namespace.h>
+
+#include "smc.h"
+#include "smc_sysctl.h"
+
+static struct ctl_table smc_table[] = {
+ {
+ .procname = "autocorking_size",
+ .data = &init_net.smc.sysctl_autocorking_size,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = proc_douintvec,
+ },
+ { }
+};
+
+static __net_init int smc_sysctl_init_net(struct net *net)
+{
+ struct ctl_table *table;
+
+ table = smc_table;
+ if (!net_eq(net, &init_net)) {
+ int i;
+
+ table = kmemdup(table, sizeof(smc_table), GFP_KERNEL);
+ if (!table)
+ goto err_alloc;
+
+ for (i = 0; i < ARRAY_SIZE(smc_table) - 1; i++)
+ table[i].data += (void *)net - (void *)&init_net;
+ }
+
+ net->smc.smc_hdr = register_net_sysctl(net, "net/smc", table);
+ if (!net->smc.smc_hdr)
+ goto err_reg;
+
+ net->smc.sysctl_autocorking_size = SMC_AUTOCORKING_DEFAULT_SIZE;
+
+ return 0;
+
+err_reg:
+ if (!net_eq(net, &init_net))
+ kfree(table);
+err_alloc:
+ return -ENOMEM;
+}
+
+static __net_exit void smc_sysctl_exit_net(struct net *net)
+{
+ unregister_net_sysctl_table(net->smc.smc_hdr);
+}
+
+static struct pernet_operations smc_sysctl_ops __net_initdata = {
+ .init = smc_sysctl_init_net,
+ .exit = smc_sysctl_exit_net,
+};
+
+int __init smc_sysctl_init(void)
+{
+ return register_pernet_subsys(&smc_sysctl_ops);
+}
+
+void smc_sysctl_exit(void)
+{
+ unregister_pernet_subsys(&smc_sysctl_ops);
+}
diff --git a/net/smc/smc_sysctl.h b/net/smc/smc_sysctl.h
new file mode 100644
index 000000000000..49553ac236b6
--- /dev/null
+++ b/net/smc/smc_sysctl.h
@@ -0,0 +1,32 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+/*
+ * Shared Memory Communications over RDMA (SMC-R) and RoCE
+ *
+ * smc_sysctl.c: sysctl interface to SMC subsystem.
+ *
+ * Copyright (c) 2022, Alibaba Inc.
+ *
+ * Author: Tony Lu <tonylu@linux.alibaba.com>
+ *
+ */
+
+#ifndef _SMC_SYSCTL_H
+#define _SMC_SYSCTL_H
+
+#ifdef CONFIG_SYSCTL
+
+int smc_sysctl_init(void);
+void smc_sysctl_exit(void);
+
+#else
+
+int smc_sysctl_init(void)
+{
+ return 0;
+}
+
+void smc_sysctl_exit(void) { }
+
+#endif /* CONFIG_SYSCTL */
+
+#endif /* _SMC_SYSCTL_H */
diff --git a/net/smc/smc_tx.c b/net/smc/smc_tx.c
index 436ac836f363..257dc0d0aeb1 100644
--- a/net/smc/smc_tx.c
+++ b/net/smc/smc_tx.c
@@ -131,6 +131,51 @@ static bool smc_tx_is_corked(struct smc_sock *smc)
return (tp->nonagle & TCP_NAGLE_CORK) ? true : false;
}
+/* If we have pending CDC messages, do not send:
+ * Because CQE of this CDC message will happen shortly, it gives
+ * a chance to coalesce future sendmsg() payload in to one RDMA Write,
+ * without need for a timer, and with no latency trade off.
+ * Algorithm here:
+ * 1. First message should never cork
+ * 2. If we have pending Tx CDC messages, wait for the first CDC
+ * message's completion
+ * 3. Don't cork to much data in a single RDMA Write to prevent burst
+ * traffic, total corked message should not exceed sendbuf/2
+ */
+static bool smc_should_autocork(struct smc_sock *smc)
+{
+ struct smc_connection *conn = &smc->conn;
+ int corking_size;
+
+ corking_size = min(sock_net(&smc->sk)->smc.sysctl_autocorking_size,
+ conn->sndbuf_desc->len >> 1);
+
+ if (atomic_read(&conn->cdc_pend_tx_wr) == 0 ||
+ smc_tx_prepared_sends(conn) > corking_size)
+ return false;
+ return true;
+}
+
+static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg)
+{
+ struct smc_connection *conn = &smc->conn;
+
+ if (smc_should_autocork(smc))
+ return true;
+
+ /* for a corked socket defer the RDMA writes if
+ * sndbuf_space is still available. The applications
+ * should known how/when to uncork it.
+ */
+ if ((msg->msg_flags & MSG_MORE ||
+ smc_tx_is_corked(smc) ||
+ msg->msg_flags & MSG_SENDPAGE_NOTLAST) &&
+ atomic_read(&conn->sndbuf_space))
+ return true;
+
+ return false;
+}
+
/* sndbuf producer: main API called by socket layer.
* called under sock lock.
*/
@@ -235,13 +280,10 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
*/
if ((msg->msg_flags & MSG_OOB) && !send_remaining)
conn->urg_tx_pend = true;
- /* for a corked socket defer the RDMA writes if
- * sndbuf_space is still available. The applications
- * should known how/when to uncork it.
+ /* If we need to cork, do nothing and wait for the next
+ * sendmsg() call or push on tx completion
*/
- if (!((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc) ||
- msg->msg_flags & MSG_SENDPAGE_NOTLAST) &&
- atomic_read(&conn->sndbuf_space)))
+ if (!smc_tx_should_cork(smc, msg))
smc_tx_sndbuf_nonempty(conn);
trace_smc_tx_sendmsg(smc, copylen);
@@ -589,13 +631,26 @@ static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn)
return rc;
}
-int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
+static int __smc_tx_sndbuf_nonempty(struct smc_connection *conn)
{
- int rc;
+ struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
+ int rc = 0;
+
+ /* No data in the send queue */
+ if (unlikely(smc_tx_prepared_sends(conn) <= 0))
+ goto out;
+
+ /* Peer don't have RMBE space */
+ if (unlikely(atomic_read(&conn->peer_rmbe_space) <= 0)) {
+ SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk);
+ goto out;
+ }
if (conn->killed ||
- conn->local_rx_ctrl.conn_state_flags.peer_conn_abort)
- return -EPIPE; /* connection being aborted */
+ conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
+ rc = -EPIPE; /* connection being aborted */
+ goto out;
+ }
if (conn->lgr->is_smcd)
rc = smcd_tx_sndbuf_nonempty(conn);
else
@@ -603,10 +658,38 @@ int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
if (!rc) {
/* trigger socket release if connection is closing */
- struct smc_sock *smc = container_of(conn, struct smc_sock,
- conn);
smc_close_wake_tx_prepared(smc);
}
+
+out:
+ return rc;
+}
+
+int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
+{
+ int rc;
+
+ /* This make sure only one can send simultaneously to prevent wasting
+ * of CPU and CDC slot.
+ * Record whether someone has tried to push while we are pushing.
+ */
+ if (atomic_inc_return(&conn->tx_pushing) > 1)
+ return 0;
+
+again:
+ atomic_set(&conn->tx_pushing, 1);
+ smp_wmb(); /* Make sure tx_pushing is 1 before real send */
+ rc = __smc_tx_sndbuf_nonempty(conn);
+
+ /* We need to check whether someone else have added some data into
+ * the send queue and tried to push but failed after the atomic_set()
+ * when we are pushing.
+ * If so, we need to push again to prevent those data hang in the send
+ * queue.
+ */
+ if (unlikely(!atomic_dec_and_test(&conn->tx_pushing)))
+ goto again;
+
return rc;
}
diff --git a/net/smc/smc_wr.c b/net/smc/smc_wr.c
index 24be1d03fef9..34d616406d51 100644
--- a/net/smc/smc_wr.c
+++ b/net/smc/smc_wr.c
@@ -137,25 +137,28 @@ static void smc_wr_tx_tasklet_fn(struct tasklet_struct *t)
{
struct smc_ib_device *dev = from_tasklet(dev, t, send_tasklet);
struct ib_wc wc[SMC_WR_MAX_POLL_CQE];
- int i = 0, rc;
- int polled = 0;
+ int i, rc;
again:
- polled++;
do {
memset(&wc, 0, sizeof(wc));
rc = ib_poll_cq(dev->roce_cq_send, SMC_WR_MAX_POLL_CQE, wc);
- if (polled == 1) {
- ib_req_notify_cq(dev->roce_cq_send,
- IB_CQ_NEXT_COMP |
- IB_CQ_REPORT_MISSED_EVENTS);
- }
- if (!rc)
- break;
for (i = 0; i < rc; i++)
smc_wr_tx_process_cqe(&wc[i]);
+ if (rc < SMC_WR_MAX_POLL_CQE)
+ /* If < SMC_WR_MAX_POLL_CQE, the CQ should have been
+ * drained, no need to poll again. --Guangguan Wang
+ */
+ break;
} while (rc > 0);
- if (polled == 1)
+
+ /* IB_CQ_REPORT_MISSED_EVENTS make sure if ib_req_notify_cq() returns
+ * 0, it is safe to wait for the next event.
+ * Else we must poll the CQ again to make sure we won't miss any event
+ */
+ if (ib_req_notify_cq(dev->roce_cq_send,
+ IB_CQ_NEXT_COMP |
+ IB_CQ_REPORT_MISSED_EVENTS))
goto again;
}
@@ -478,24 +481,28 @@ static void smc_wr_rx_tasklet_fn(struct tasklet_struct *t)
{
struct smc_ib_device *dev = from_tasklet(dev, t, recv_tasklet);
struct ib_wc wc[SMC_WR_MAX_POLL_CQE];
- int polled = 0;
int rc;
again:
- polled++;
do {
memset(&wc, 0, sizeof(wc));
rc = ib_poll_cq(dev->roce_cq_recv, SMC_WR_MAX_POLL_CQE, wc);
- if (polled == 1) {
- ib_req_notify_cq(dev->roce_cq_recv,
- IB_CQ_SOLICITED_MASK
- | IB_CQ_REPORT_MISSED_EVENTS);
- }
- if (!rc)
+ if (rc > 0)
+ smc_wr_rx_process_cqes(&wc[0], rc);
+ if (rc < SMC_WR_MAX_POLL_CQE)
+ /* If < SMC_WR_MAX_POLL_CQE, the CQ should have been
+ * drained, no need to poll again. --Guangguan Wang
+ */
break;
- smc_wr_rx_process_cqes(&wc[0], rc);
} while (rc > 0);
- if (polled == 1)
+
+ /* IB_CQ_REPORT_MISSED_EVENTS make sure if ib_req_notify_cq() returns
+ * 0, it is safe to wait for the next event.
+ * Else we must poll the CQ again to make sure we won't miss any event
+ */
+ if (ib_req_notify_cq(dev->roce_cq_recv,
+ IB_CQ_SOLICITED_MASK |
+ IB_CQ_REPORT_MISSED_EVENTS))
goto again;
}