Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 /*
0003  * Shared Memory Communications over RDMA (SMC-R) and RoCE
0004  *
0005  * Manage send buffer.
0006  * Producer:
0007  * Copy user space data into send buffer, if send buffer space available.
0008  * Consumer:
0009  * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
0010  *
0011  * Copyright IBM Corp. 2016
0012  *
0013  * Author(s):  Ursula Braun <ubraun@linux.vnet.ibm.com>
0014  */
0015 
0016 #include <linux/net.h>
0017 #include <linux/rcupdate.h>
0018 #include <linux/workqueue.h>
0019 #include <linux/sched/signal.h>
0020 
0021 #include <net/sock.h>
0022 #include <net/tcp.h>
0023 
0024 #include "smc.h"
0025 #include "smc_wr.h"
0026 #include "smc_cdc.h"
0027 #include "smc_close.h"
0028 #include "smc_ism.h"
0029 #include "smc_tx.h"
0030 #include "smc_stats.h"
0031 #include "smc_tracepoint.h"
0032 
0033 #define SMC_TX_WORK_DELAY   0
0034 
0035 /***************************** sndbuf producer *******************************/
0036 
0037 /* callback implementation for sk.sk_write_space()
0038  * to wakeup sndbuf producers that blocked with smc_tx_wait().
0039  * called under sk_socket lock.
0040  */
0041 static void smc_tx_write_space(struct sock *sk)
0042 {
0043     struct socket *sock = sk->sk_socket;
0044     struct smc_sock *smc = smc_sk(sk);
0045     struct socket_wq *wq;
0046 
0047     /* similar to sk_stream_write_space */
0048     if (atomic_read(&smc->conn.sndbuf_space) && sock) {
0049         if (test_bit(SOCK_NOSPACE, &sock->flags))
0050             SMC_STAT_RMB_TX_FULL(smc, !smc->conn.lnk);
0051         clear_bit(SOCK_NOSPACE, &sock->flags);
0052         rcu_read_lock();
0053         wq = rcu_dereference(sk->sk_wq);
0054         if (skwq_has_sleeper(wq))
0055             wake_up_interruptible_poll(&wq->wait,
0056                            EPOLLOUT | EPOLLWRNORM |
0057                            EPOLLWRBAND);
0058         if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
0059             sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
0060         rcu_read_unlock();
0061     }
0062 }
0063 
0064 /* Wakeup sndbuf producers that blocked with smc_tx_wait().
0065  * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
0066  */
0067 void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
0068 {
0069     if (smc->sk.sk_socket &&
0070         test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
0071         smc->sk.sk_write_space(&smc->sk);
0072 }
0073 
0074 /* blocks sndbuf producer until at least one byte of free space available
0075  * or urgent Byte was consumed
0076  */
0077 static int smc_tx_wait(struct smc_sock *smc, int flags)
0078 {
0079     DEFINE_WAIT_FUNC(wait, woken_wake_function);
0080     struct smc_connection *conn = &smc->conn;
0081     struct sock *sk = &smc->sk;
0082     long timeo;
0083     int rc = 0;
0084 
0085     /* similar to sk_stream_wait_memory */
0086     timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
0087     add_wait_queue(sk_sleep(sk), &wait);
0088     while (1) {
0089         sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
0090         if (sk->sk_err ||
0091             (sk->sk_shutdown & SEND_SHUTDOWN) ||
0092             conn->killed ||
0093             conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
0094             rc = -EPIPE;
0095             break;
0096         }
0097         if (smc_cdc_rxed_any_close(conn)) {
0098             rc = -ECONNRESET;
0099             break;
0100         }
0101         if (!timeo) {
0102             /* ensure EPOLLOUT is subsequently generated */
0103             set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
0104             rc = -EAGAIN;
0105             break;
0106         }
0107         if (signal_pending(current)) {
0108             rc = sock_intr_errno(timeo);
0109             break;
0110         }
0111         sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
0112         if (atomic_read(&conn->sndbuf_space) && !conn->urg_tx_pend)
0113             break; /* at least 1 byte of free & no urgent data */
0114         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
0115         sk_wait_event(sk, &timeo,
0116                   sk->sk_err ||
0117                   (sk->sk_shutdown & SEND_SHUTDOWN) ||
0118                   smc_cdc_rxed_any_close(conn) ||
0119                   (atomic_read(&conn->sndbuf_space) &&
0120                    !conn->urg_tx_pend),
0121                   &wait);
0122     }
0123     remove_wait_queue(sk_sleep(sk), &wait);
0124     return rc;
0125 }
0126 
0127 static bool smc_tx_is_corked(struct smc_sock *smc)
0128 {
0129     struct tcp_sock *tp = tcp_sk(smc->clcsock->sk);
0130 
0131     return (tp->nonagle & TCP_NAGLE_CORK) ? true : false;
0132 }
0133 
0134 /* If we have pending CDC messages, do not send:
0135  * Because CQE of this CDC message will happen shortly, it gives
0136  * a chance to coalesce future sendmsg() payload in to one RDMA Write,
0137  * without need for a timer, and with no latency trade off.
0138  * Algorithm here:
0139  *  1. First message should never cork
0140  *  2. If we have pending Tx CDC messages, wait for the first CDC
0141  *     message's completion
0142  *  3. Don't cork to much data in a single RDMA Write to prevent burst
0143  *     traffic, total corked message should not exceed sendbuf/2
0144  */
0145 static bool smc_should_autocork(struct smc_sock *smc)
0146 {
0147     struct smc_connection *conn = &smc->conn;
0148     int corking_size;
0149 
0150     corking_size = min_t(unsigned int, conn->sndbuf_desc->len >> 1,
0151                  sock_net(&smc->sk)->smc.sysctl_autocorking_size);
0152 
0153     if (atomic_read(&conn->cdc_pend_tx_wr) == 0 ||
0154         smc_tx_prepared_sends(conn) > corking_size)
0155         return false;
0156     return true;
0157 }
0158 
0159 static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg)
0160 {
0161     struct smc_connection *conn = &smc->conn;
0162 
0163     if (smc_should_autocork(smc))
0164         return true;
0165 
0166     /* for a corked socket defer the RDMA writes if
0167      * sndbuf_space is still available. The applications
0168      * should known how/when to uncork it.
0169      */
0170     if ((msg->msg_flags & MSG_MORE ||
0171          smc_tx_is_corked(smc) ||
0172          msg->msg_flags & MSG_SENDPAGE_NOTLAST) &&
0173         atomic_read(&conn->sndbuf_space))
0174         return true;
0175 
0176     return false;
0177 }
0178 
0179 /* sndbuf producer: main API called by socket layer.
0180  * called under sock lock.
0181  */
0182 int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
0183 {
0184     size_t copylen, send_done = 0, send_remaining = len;
0185     size_t chunk_len, chunk_off, chunk_len_sum;
0186     struct smc_connection *conn = &smc->conn;
0187     union smc_host_cursor prep;
0188     struct sock *sk = &smc->sk;
0189     char *sndbuf_base;
0190     int tx_cnt_prep;
0191     int writespace;
0192     int rc, chunk;
0193 
0194     /* This should be in poll */
0195     sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
0196 
0197     if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
0198         rc = -EPIPE;
0199         goto out_err;
0200     }
0201 
0202     if (sk->sk_state == SMC_INIT)
0203         return -ENOTCONN;
0204 
0205     if (len > conn->sndbuf_desc->len)
0206         SMC_STAT_RMB_TX_SIZE_SMALL(smc, !conn->lnk);
0207 
0208     if (len > conn->peer_rmbe_size)
0209         SMC_STAT_RMB_TX_PEER_SIZE_SMALL(smc, !conn->lnk);
0210 
0211     if (msg->msg_flags & MSG_OOB)
0212         SMC_STAT_INC(smc, urg_data_cnt);
0213 
0214     while (msg_data_left(msg)) {
0215         if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
0216             (smc->sk.sk_err == ECONNABORTED) ||
0217             conn->killed)
0218             return -EPIPE;
0219         if (smc_cdc_rxed_any_close(conn))
0220             return send_done ?: -ECONNRESET;
0221 
0222         if (msg->msg_flags & MSG_OOB)
0223             conn->local_tx_ctrl.prod_flags.urg_data_pending = 1;
0224 
0225         if (!atomic_read(&conn->sndbuf_space) || conn->urg_tx_pend) {
0226             if (send_done)
0227                 return send_done;
0228             rc = smc_tx_wait(smc, msg->msg_flags);
0229             if (rc)
0230                 goto out_err;
0231             continue;
0232         }
0233 
0234         /* initialize variables for 1st iteration of subsequent loop */
0235         /* could be just 1 byte, even after smc_tx_wait above */
0236         writespace = atomic_read(&conn->sndbuf_space);
0237         /* not more than what user space asked for */
0238         copylen = min_t(size_t, send_remaining, writespace);
0239         /* determine start of sndbuf */
0240         sndbuf_base = conn->sndbuf_desc->cpu_addr;
0241         smc_curs_copy(&prep, &conn->tx_curs_prep, conn);
0242         tx_cnt_prep = prep.count;
0243         /* determine chunks where to write into sndbuf */
0244         /* either unwrapped case, or 1st chunk of wrapped case */
0245         chunk_len = min_t(size_t, copylen, conn->sndbuf_desc->len -
0246                   tx_cnt_prep);
0247         chunk_len_sum = chunk_len;
0248         chunk_off = tx_cnt_prep;
0249         for (chunk = 0; chunk < 2; chunk++) {
0250             rc = memcpy_from_msg(sndbuf_base + chunk_off,
0251                          msg, chunk_len);
0252             if (rc) {
0253                 smc_sndbuf_sync_sg_for_device(conn);
0254                 if (send_done)
0255                     return send_done;
0256                 goto out_err;
0257             }
0258             send_done += chunk_len;
0259             send_remaining -= chunk_len;
0260 
0261             if (chunk_len_sum == copylen)
0262                 break; /* either on 1st or 2nd iteration */
0263             /* prepare next (== 2nd) iteration */
0264             chunk_len = copylen - chunk_len; /* remainder */
0265             chunk_len_sum += chunk_len;
0266             chunk_off = 0; /* modulo offset in send ring buffer */
0267         }
0268         smc_sndbuf_sync_sg_for_device(conn);
0269         /* update cursors */
0270         smc_curs_add(conn->sndbuf_desc->len, &prep, copylen);
0271         smc_curs_copy(&conn->tx_curs_prep, &prep, conn);
0272         /* increased in send tasklet smc_cdc_tx_handler() */
0273         smp_mb__before_atomic();
0274         atomic_sub(copylen, &conn->sndbuf_space);
0275         /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
0276         smp_mb__after_atomic();
0277         /* since we just produced more new data into sndbuf,
0278          * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
0279          */
0280         if ((msg->msg_flags & MSG_OOB) && !send_remaining)
0281             conn->urg_tx_pend = true;
0282         /* If we need to cork, do nothing and wait for the next
0283          * sendmsg() call or push on tx completion
0284          */
0285         if (!smc_tx_should_cork(smc, msg))
0286             smc_tx_sndbuf_nonempty(conn);
0287 
0288         trace_smc_tx_sendmsg(smc, copylen);
0289     } /* while (msg_data_left(msg)) */
0290 
0291     return send_done;
0292 
0293 out_err:
0294     rc = sk_stream_error(sk, msg->msg_flags, rc);
0295     /* make sure we wake any epoll edge trigger waiter */
0296     if (unlikely(rc == -EAGAIN))
0297         sk->sk_write_space(sk);
0298     return rc;
0299 }
0300 
0301 int smc_tx_sendpage(struct smc_sock *smc, struct page *page, int offset,
0302             size_t size, int flags)
0303 {
0304     struct msghdr msg = {.msg_flags = flags};
0305     char *kaddr = kmap(page);
0306     struct kvec iov;
0307     int rc;
0308 
0309     iov.iov_base = kaddr + offset;
0310     iov.iov_len = size;
0311     iov_iter_kvec(&msg.msg_iter, WRITE, &iov, 1, size);
0312     rc = smc_tx_sendmsg(smc, &msg, size);
0313     kunmap(page);
0314     return rc;
0315 }
0316 
0317 /***************************** sndbuf consumer *******************************/
0318 
0319 /* sndbuf consumer: actual data transfer of one target chunk with ISM write */
0320 int smcd_tx_ism_write(struct smc_connection *conn, void *data, size_t len,
0321               u32 offset, int signal)
0322 {
0323     int rc;
0324 
0325     rc = smc_ism_write(conn->lgr->smcd, conn->peer_token,
0326                conn->peer_rmbe_idx, signal, conn->tx_off + offset,
0327                data, len);
0328     if (rc)
0329         conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
0330     return rc;
0331 }
0332 
0333 /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
0334 static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
0335                  int num_sges, struct ib_rdma_wr *rdma_wr)
0336 {
0337     struct smc_link_group *lgr = conn->lgr;
0338     struct smc_link *link = conn->lnk;
0339     int rc;
0340 
0341     rdma_wr->wr.wr_id = smc_wr_tx_get_next_wr_id(link);
0342     rdma_wr->wr.num_sge = num_sges;
0343     rdma_wr->remote_addr =
0344         lgr->rtokens[conn->rtoken_idx][link->link_idx].dma_addr +
0345         /* RMBE within RMB */
0346         conn->tx_off +
0347         /* offset within RMBE */
0348         peer_rmbe_offset;
0349     rdma_wr->rkey = lgr->rtokens[conn->rtoken_idx][link->link_idx].rkey;
0350     rc = ib_post_send(link->roce_qp, &rdma_wr->wr, NULL);
0351     if (rc)
0352         smcr_link_down_cond_sched(link);
0353     return rc;
0354 }
0355 
0356 /* sndbuf consumer */
0357 static inline void smc_tx_advance_cursors(struct smc_connection *conn,
0358                       union smc_host_cursor *prod,
0359                       union smc_host_cursor *sent,
0360                       size_t len)
0361 {
0362     smc_curs_add(conn->peer_rmbe_size, prod, len);
0363     /* increased in recv tasklet smc_cdc_msg_rcv() */
0364     smp_mb__before_atomic();
0365     /* data in flight reduces usable snd_wnd */
0366     atomic_sub(len, &conn->peer_rmbe_space);
0367     /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
0368     smp_mb__after_atomic();
0369     smc_curs_add(conn->sndbuf_desc->len, sent, len);
0370 }
0371 
0372 /* SMC-R helper for smc_tx_rdma_writes() */
0373 static int smcr_tx_rdma_writes(struct smc_connection *conn, size_t len,
0374                    size_t src_off, size_t src_len,
0375                    size_t dst_off, size_t dst_len,
0376                    struct smc_rdma_wr *wr_rdma_buf)
0377 {
0378     struct smc_link *link = conn->lnk;
0379 
0380     dma_addr_t dma_addr =
0381         sg_dma_address(conn->sndbuf_desc->sgt[link->link_idx].sgl);
0382     u64 virt_addr = (uintptr_t)conn->sndbuf_desc->cpu_addr;
0383     int src_len_sum = src_len, dst_len_sum = dst_len;
0384     int sent_count = src_off;
0385     int srcchunk, dstchunk;
0386     int num_sges;
0387     int rc;
0388 
0389     for (dstchunk = 0; dstchunk < 2; dstchunk++) {
0390         struct ib_rdma_wr *wr = &wr_rdma_buf->wr_tx_rdma[dstchunk];
0391         struct ib_sge *sge = wr->wr.sg_list;
0392         u64 base_addr = dma_addr;
0393 
0394         if (dst_len < link->qp_attr.cap.max_inline_data) {
0395             base_addr = virt_addr;
0396             wr->wr.send_flags |= IB_SEND_INLINE;
0397         } else {
0398             wr->wr.send_flags &= ~IB_SEND_INLINE;
0399         }
0400 
0401         num_sges = 0;
0402         for (srcchunk = 0; srcchunk < 2; srcchunk++) {
0403             sge[srcchunk].addr = conn->sndbuf_desc->is_vm ?
0404                 (virt_addr + src_off) : (base_addr + src_off);
0405             sge[srcchunk].length = src_len;
0406             if (conn->sndbuf_desc->is_vm)
0407                 sge[srcchunk].lkey =
0408                     conn->sndbuf_desc->mr[link->link_idx]->lkey;
0409             num_sges++;
0410 
0411             src_off += src_len;
0412             if (src_off >= conn->sndbuf_desc->len)
0413                 src_off -= conn->sndbuf_desc->len;
0414                         /* modulo in send ring */
0415             if (src_len_sum == dst_len)
0416                 break; /* either on 1st or 2nd iteration */
0417             /* prepare next (== 2nd) iteration */
0418             src_len = dst_len - src_len; /* remainder */
0419             src_len_sum += src_len;
0420         }
0421         rc = smc_tx_rdma_write(conn, dst_off, num_sges, wr);
0422         if (rc)
0423             return rc;
0424         if (dst_len_sum == len)
0425             break; /* either on 1st or 2nd iteration */
0426         /* prepare next (== 2nd) iteration */
0427         dst_off = 0; /* modulo offset in RMBE ring buffer */
0428         dst_len = len - dst_len; /* remainder */
0429         dst_len_sum += dst_len;
0430         src_len = min_t(int, dst_len, conn->sndbuf_desc->len -
0431                 sent_count);
0432         src_len_sum = src_len;
0433     }
0434     return 0;
0435 }
0436 
0437 /* SMC-D helper for smc_tx_rdma_writes() */
0438 static int smcd_tx_rdma_writes(struct smc_connection *conn, size_t len,
0439                    size_t src_off, size_t src_len,
0440                    size_t dst_off, size_t dst_len)
0441 {
0442     int src_len_sum = src_len, dst_len_sum = dst_len;
0443     int srcchunk, dstchunk;
0444     int rc;
0445 
0446     for (dstchunk = 0; dstchunk < 2; dstchunk++) {
0447         for (srcchunk = 0; srcchunk < 2; srcchunk++) {
0448             void *data = conn->sndbuf_desc->cpu_addr + src_off;
0449 
0450             rc = smcd_tx_ism_write(conn, data, src_len, dst_off +
0451                            sizeof(struct smcd_cdc_msg), 0);
0452             if (rc)
0453                 return rc;
0454             dst_off += src_len;
0455             src_off += src_len;
0456             if (src_off >= conn->sndbuf_desc->len)
0457                 src_off -= conn->sndbuf_desc->len;
0458                         /* modulo in send ring */
0459             if (src_len_sum == dst_len)
0460                 break; /* either on 1st or 2nd iteration */
0461             /* prepare next (== 2nd) iteration */
0462             src_len = dst_len - src_len; /* remainder */
0463             src_len_sum += src_len;
0464         }
0465         if (dst_len_sum == len)
0466             break; /* either on 1st or 2nd iteration */
0467         /* prepare next (== 2nd) iteration */
0468         dst_off = 0; /* modulo offset in RMBE ring buffer */
0469         dst_len = len - dst_len; /* remainder */
0470         dst_len_sum += dst_len;
0471         src_len = min_t(int, dst_len, conn->sndbuf_desc->len - src_off);
0472         src_len_sum = src_len;
0473     }
0474     return 0;
0475 }
0476 
0477 /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
0478  * usable snd_wnd as max transmit
0479  */
0480 static int smc_tx_rdma_writes(struct smc_connection *conn,
0481                   struct smc_rdma_wr *wr_rdma_buf)
0482 {
0483     size_t len, src_len, dst_off, dst_len; /* current chunk values */
0484     union smc_host_cursor sent, prep, prod, cons;
0485     struct smc_cdc_producer_flags *pflags;
0486     int to_send, rmbespace;
0487     int rc;
0488 
0489     /* source: sndbuf */
0490     smc_curs_copy(&sent, &conn->tx_curs_sent, conn);
0491     smc_curs_copy(&prep, &conn->tx_curs_prep, conn);
0492     /* cf. wmem_alloc - (snd_max - snd_una) */
0493     to_send = smc_curs_diff(conn->sndbuf_desc->len, &sent, &prep);
0494     if (to_send <= 0)
0495         return 0;
0496 
0497     /* destination: RMBE */
0498     /* cf. snd_wnd */
0499     rmbespace = atomic_read(&conn->peer_rmbe_space);
0500     if (rmbespace <= 0) {
0501         struct smc_sock *smc = container_of(conn, struct smc_sock,
0502                             conn);
0503         SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk);
0504         return 0;
0505     }
0506     smc_curs_copy(&prod, &conn->local_tx_ctrl.prod, conn);
0507     smc_curs_copy(&cons, &conn->local_rx_ctrl.cons, conn);
0508 
0509     /* if usable snd_wnd closes ask peer to advertise once it opens again */
0510     pflags = &conn->local_tx_ctrl.prod_flags;
0511     pflags->write_blocked = (to_send >= rmbespace);
0512     /* cf. usable snd_wnd */
0513     len = min(to_send, rmbespace);
0514 
0515     /* initialize variables for first iteration of subsequent nested loop */
0516     dst_off = prod.count;
0517     if (prod.wrap == cons.wrap) {
0518         /* the filled destination area is unwrapped,
0519          * hence the available free destination space is wrapped
0520          * and we need 2 destination chunks of sum len; start with 1st
0521          * which is limited by what's available in sndbuf
0522          */
0523         dst_len = min_t(size_t,
0524                 conn->peer_rmbe_size - prod.count, len);
0525     } else {
0526         /* the filled destination area is wrapped,
0527          * hence the available free destination space is unwrapped
0528          * and we need a single destination chunk of entire len
0529          */
0530         dst_len = len;
0531     }
0532     /* dst_len determines the maximum src_len */
0533     if (sent.count + dst_len <= conn->sndbuf_desc->len) {
0534         /* unwrapped src case: single chunk of entire dst_len */
0535         src_len = dst_len;
0536     } else {
0537         /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
0538         src_len = conn->sndbuf_desc->len - sent.count;
0539     }
0540 
0541     if (conn->lgr->is_smcd)
0542         rc = smcd_tx_rdma_writes(conn, len, sent.count, src_len,
0543                      dst_off, dst_len);
0544     else
0545         rc = smcr_tx_rdma_writes(conn, len, sent.count, src_len,
0546                      dst_off, dst_len, wr_rdma_buf);
0547     if (rc)
0548         return rc;
0549 
0550     if (conn->urg_tx_pend && len == to_send)
0551         pflags->urg_data_present = 1;
0552     smc_tx_advance_cursors(conn, &prod, &sent, len);
0553     /* update connection's cursors with advanced local cursors */
0554     smc_curs_copy(&conn->local_tx_ctrl.prod, &prod, conn);
0555                             /* dst: peer RMBE */
0556     smc_curs_copy(&conn->tx_curs_sent, &sent, conn);/* src: local sndbuf */
0557 
0558     return 0;
0559 }
0560 
0561 /* Wakeup sndbuf consumers from any context (IRQ or process)
0562  * since there is more data to transmit; usable snd_wnd as max transmit
0563  */
0564 static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
0565 {
0566     struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
0567     struct smc_link *link = conn->lnk;
0568     struct smc_rdma_wr *wr_rdma_buf;
0569     struct smc_cdc_tx_pend *pend;
0570     struct smc_wr_buf *wr_buf;
0571     int rc;
0572 
0573     if (!link || !smc_wr_tx_link_hold(link))
0574         return -ENOLINK;
0575     rc = smc_cdc_get_free_slot(conn, link, &wr_buf, &wr_rdma_buf, &pend);
0576     if (rc < 0) {
0577         smc_wr_tx_link_put(link);
0578         if (rc == -EBUSY) {
0579             struct smc_sock *smc =
0580                 container_of(conn, struct smc_sock, conn);
0581 
0582             if (smc->sk.sk_err == ECONNABORTED)
0583                 return sock_error(&smc->sk);
0584             if (conn->killed)
0585                 return -EPIPE;
0586             rc = 0;
0587             mod_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
0588                      SMC_TX_WORK_DELAY);
0589         }
0590         return rc;
0591     }
0592 
0593     spin_lock_bh(&conn->send_lock);
0594     if (link != conn->lnk) {
0595         /* link of connection changed, tx_work will restart */
0596         smc_wr_tx_put_slot(link,
0597                    (struct smc_wr_tx_pend_priv *)pend);
0598         rc = -ENOLINK;
0599         goto out_unlock;
0600     }
0601     if (!pflags->urg_data_present) {
0602         rc = smc_tx_rdma_writes(conn, wr_rdma_buf);
0603         if (rc) {
0604             smc_wr_tx_put_slot(link,
0605                        (struct smc_wr_tx_pend_priv *)pend);
0606             goto out_unlock;
0607         }
0608     }
0609 
0610     rc = smc_cdc_msg_send(conn, wr_buf, pend);
0611     if (!rc && pflags->urg_data_present) {
0612         pflags->urg_data_pending = 0;
0613         pflags->urg_data_present = 0;
0614     }
0615 
0616 out_unlock:
0617     spin_unlock_bh(&conn->send_lock);
0618     smc_wr_tx_link_put(link);
0619     return rc;
0620 }
0621 
0622 static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn)
0623 {
0624     struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
0625     int rc = 0;
0626 
0627     spin_lock_bh(&conn->send_lock);
0628     if (!pflags->urg_data_present)
0629         rc = smc_tx_rdma_writes(conn, NULL);
0630     if (!rc)
0631         rc = smcd_cdc_msg_send(conn);
0632 
0633     if (!rc && pflags->urg_data_present) {
0634         pflags->urg_data_pending = 0;
0635         pflags->urg_data_present = 0;
0636     }
0637     spin_unlock_bh(&conn->send_lock);
0638     return rc;
0639 }
0640 
0641 static int __smc_tx_sndbuf_nonempty(struct smc_connection *conn)
0642 {
0643     struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
0644     int rc = 0;
0645 
0646     /* No data in the send queue */
0647     if (unlikely(smc_tx_prepared_sends(conn) <= 0))
0648         goto out;
0649 
0650     /* Peer don't have RMBE space */
0651     if (unlikely(atomic_read(&conn->peer_rmbe_space) <= 0)) {
0652         SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk);
0653         goto out;
0654     }
0655 
0656     if (conn->killed ||
0657         conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
0658         rc = -EPIPE;    /* connection being aborted */
0659         goto out;
0660     }
0661     if (conn->lgr->is_smcd)
0662         rc = smcd_tx_sndbuf_nonempty(conn);
0663     else
0664         rc = smcr_tx_sndbuf_nonempty(conn);
0665 
0666     if (!rc) {
0667         /* trigger socket release if connection is closing */
0668         smc_close_wake_tx_prepared(smc);
0669     }
0670 
0671 out:
0672     return rc;
0673 }
0674 
0675 int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
0676 {
0677     int rc;
0678 
0679     /* This make sure only one can send simultaneously to prevent wasting
0680      * of CPU and CDC slot.
0681      * Record whether someone has tried to push while we are pushing.
0682      */
0683     if (atomic_inc_return(&conn->tx_pushing) > 1)
0684         return 0;
0685 
0686 again:
0687     atomic_set(&conn->tx_pushing, 1);
0688     smp_wmb(); /* Make sure tx_pushing is 1 before real send */
0689     rc = __smc_tx_sndbuf_nonempty(conn);
0690 
0691     /* We need to check whether someone else have added some data into
0692      * the send queue and tried to push but failed after the atomic_set()
0693      * when we are pushing.
0694      * If so, we need to push again to prevent those data hang in the send
0695      * queue.
0696      */
0697     if (unlikely(!atomic_dec_and_test(&conn->tx_pushing)))
0698         goto again;
0699 
0700     return rc;
0701 }
0702 
0703 /* Wakeup sndbuf consumers from process context
0704  * since there is more data to transmit. The caller
0705  * must hold sock lock.
0706  */
0707 void smc_tx_pending(struct smc_connection *conn)
0708 {
0709     struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
0710     int rc;
0711 
0712     if (smc->sk.sk_err)
0713         return;
0714 
0715     rc = smc_tx_sndbuf_nonempty(conn);
0716     if (!rc && conn->local_rx_ctrl.prod_flags.write_blocked &&
0717         !atomic_read(&conn->bytes_to_rcv))
0718         conn->local_rx_ctrl.prod_flags.write_blocked = 0;
0719 }
0720 
0721 /* Wakeup sndbuf consumers from process context
0722  * since there is more data to transmit in locked
0723  * sock.
0724  */
0725 void smc_tx_work(struct work_struct *work)
0726 {
0727     struct smc_connection *conn = container_of(to_delayed_work(work),
0728                            struct smc_connection,
0729                            tx_work);
0730     struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
0731 
0732     lock_sock(&smc->sk);
0733     smc_tx_pending(conn);
0734     release_sock(&smc->sk);
0735 }
0736 
0737 void smc_tx_consumer_update(struct smc_connection *conn, bool force)
0738 {
0739     union smc_host_cursor cfed, cons, prod;
0740     int sender_free = conn->rmb_desc->len;
0741     int to_confirm;
0742 
0743     smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
0744     smc_curs_copy(&cfed, &conn->rx_curs_confirmed, conn);
0745     to_confirm = smc_curs_diff(conn->rmb_desc->len, &cfed, &cons);
0746     if (to_confirm > conn->rmbe_update_limit) {
0747         smc_curs_copy(&prod, &conn->local_rx_ctrl.prod, conn);
0748         sender_free = conn->rmb_desc->len -
0749                   smc_curs_diff_large(conn->rmb_desc->len,
0750                           &cfed, &prod);
0751     }
0752 
0753     if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
0754         force ||
0755         ((to_confirm > conn->rmbe_update_limit) &&
0756          ((sender_free <= (conn->rmb_desc->len / 2)) ||
0757           conn->local_rx_ctrl.prod_flags.write_blocked))) {
0758         if (conn->killed ||
0759             conn->local_rx_ctrl.conn_state_flags.peer_conn_abort)
0760             return;
0761         if ((smc_cdc_get_slot_and_msg_send(conn) < 0) &&
0762             !conn->killed) {
0763             queue_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
0764                        SMC_TX_WORK_DELAY);
0765             return;
0766         }
0767     }
0768     if (conn->local_rx_ctrl.prod_flags.write_blocked &&
0769         !atomic_read(&conn->bytes_to_rcv))
0770         conn->local_rx_ctrl.prod_flags.write_blocked = 0;
0771 }
0772 
0773 /***************************** send initialize *******************************/
0774 
0775 /* Initialize send properties on connection establishment. NB: not __init! */
0776 void smc_tx_init(struct smc_sock *smc)
0777 {
0778     smc->sk.sk_write_space = smc_tx_write_space;
0779 }