Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 /*
0003  * Shared Memory Communications over RDMA (SMC-R) and RoCE
0004  *
0005  * Manage RMBE
0006  * copy new RMBE data into user space
0007  *
0008  * Copyright IBM Corp. 2016
0009  *
0010  * Author(s):  Ursula Braun <ubraun@linux.vnet.ibm.com>
0011  */
0012 
0013 #include <linux/net.h>
0014 #include <linux/rcupdate.h>
0015 #include <linux/sched/signal.h>
0016 
0017 #include <net/sock.h>
0018 
0019 #include "smc.h"
0020 #include "smc_core.h"
0021 #include "smc_cdc.h"
0022 #include "smc_tx.h" /* smc_tx_consumer_update() */
0023 #include "smc_rx.h"
0024 #include "smc_stats.h"
0025 #include "smc_tracepoint.h"
0026 
0027 /* callback implementation to wakeup consumers blocked with smc_rx_wait().
0028  * indirectly called by smc_cdc_msg_recv_action().
0029  */
0030 static void smc_rx_wake_up(struct sock *sk)
0031 {
0032     struct socket_wq *wq;
0033 
0034     /* derived from sock_def_readable() */
0035     /* called already in smc_listen_work() */
0036     rcu_read_lock();
0037     wq = rcu_dereference(sk->sk_wq);
0038     if (skwq_has_sleeper(wq))
0039         wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
0040                         EPOLLRDNORM | EPOLLRDBAND);
0041     sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
0042     if ((sk->sk_shutdown == SHUTDOWN_MASK) ||
0043         (sk->sk_state == SMC_CLOSED))
0044         sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP);
0045     rcu_read_unlock();
0046 }
0047 
0048 /* Update consumer cursor
0049  *   @conn   connection to update
0050  *   @cons   consumer cursor
0051  *   @len    number of Bytes consumed
0052  *   Returns:
0053  *   1 if we should end our receive, 0 otherwise
0054  */
0055 static int smc_rx_update_consumer(struct smc_sock *smc,
0056                   union smc_host_cursor cons, size_t len)
0057 {
0058     struct smc_connection *conn = &smc->conn;
0059     struct sock *sk = &smc->sk;
0060     bool force = false;
0061     int diff, rc = 0;
0062 
0063     smc_curs_add(conn->rmb_desc->len, &cons, len);
0064 
0065     /* did we process urgent data? */
0066     if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) {
0067         diff = smc_curs_comp(conn->rmb_desc->len, &cons,
0068                      &conn->urg_curs);
0069         if (sock_flag(sk, SOCK_URGINLINE)) {
0070             if (diff == 0) {
0071                 force = true;
0072                 rc = 1;
0073                 conn->urg_state = SMC_URG_READ;
0074             }
0075         } else {
0076             if (diff == 1) {
0077                 /* skip urgent byte */
0078                 force = true;
0079                 smc_curs_add(conn->rmb_desc->len, &cons, 1);
0080                 conn->urg_rx_skip_pend = false;
0081             } else if (diff < -1)
0082                 /* we read past urgent byte */
0083                 conn->urg_state = SMC_URG_READ;
0084         }
0085     }
0086 
0087     smc_curs_copy(&conn->local_tx_ctrl.cons, &cons, conn);
0088 
0089     /* send consumer cursor update if required */
0090     /* similar to advertising new TCP rcv_wnd if required */
0091     smc_tx_consumer_update(conn, force);
0092 
0093     return rc;
0094 }
0095 
0096 static void smc_rx_update_cons(struct smc_sock *smc, size_t len)
0097 {
0098     struct smc_connection *conn = &smc->conn;
0099     union smc_host_cursor cons;
0100 
0101     smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
0102     smc_rx_update_consumer(smc, cons, len);
0103 }
0104 
0105 struct smc_spd_priv {
0106     struct smc_sock *smc;
0107     size_t       len;
0108 };
0109 
0110 static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
0111                     struct pipe_buffer *buf)
0112 {
0113     struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
0114     struct smc_sock *smc = priv->smc;
0115     struct smc_connection *conn;
0116     struct sock *sk = &smc->sk;
0117 
0118     if (sk->sk_state == SMC_CLOSED ||
0119         sk->sk_state == SMC_PEERFINCLOSEWAIT ||
0120         sk->sk_state == SMC_APPFINCLOSEWAIT)
0121         goto out;
0122     conn = &smc->conn;
0123     lock_sock(sk);
0124     smc_rx_update_cons(smc, priv->len);
0125     release_sock(sk);
0126     if (atomic_sub_and_test(priv->len, &conn->splice_pending))
0127         smc_rx_wake_up(sk);
0128 out:
0129     kfree(priv);
0130     put_page(buf->page);
0131     sock_put(sk);
0132 }
0133 
0134 static const struct pipe_buf_operations smc_pipe_ops = {
0135     .release = smc_rx_pipe_buf_release,
0136     .get = generic_pipe_buf_get
0137 };
0138 
0139 static void smc_rx_spd_release(struct splice_pipe_desc *spd,
0140                    unsigned int i)
0141 {
0142     put_page(spd->pages[i]);
0143 }
0144 
0145 static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len,
0146              struct smc_sock *smc)
0147 {
0148     struct smc_link_group *lgr = smc->conn.lgr;
0149     int offset = offset_in_page(src);
0150     struct partial_page *partial;
0151     struct splice_pipe_desc spd;
0152     struct smc_spd_priv **priv;
0153     struct page **pages;
0154     int bytes, nr_pages;
0155     int i;
0156 
0157     nr_pages = !lgr->is_smcd && smc->conn.rmb_desc->is_vm ?
0158            PAGE_ALIGN(len + offset) / PAGE_SIZE : 1;
0159 
0160     pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL);
0161     if (!pages)
0162         goto out;
0163     partial = kcalloc(nr_pages, sizeof(*partial), GFP_KERNEL);
0164     if (!partial)
0165         goto out_page;
0166     priv = kcalloc(nr_pages, sizeof(*priv), GFP_KERNEL);
0167     if (!priv)
0168         goto out_part;
0169     for (i = 0; i < nr_pages; i++) {
0170         priv[i] = kzalloc(sizeof(**priv), GFP_KERNEL);
0171         if (!priv[i])
0172             goto out_priv;
0173     }
0174 
0175     if (lgr->is_smcd ||
0176         (!lgr->is_smcd && !smc->conn.rmb_desc->is_vm)) {
0177         /* smcd or smcr that uses physically contiguous RMBs */
0178         priv[0]->len = len;
0179         priv[0]->smc = smc;
0180         partial[0].offset = src - (char *)smc->conn.rmb_desc->cpu_addr;
0181         partial[0].len = len;
0182         partial[0].private = (unsigned long)priv[0];
0183         pages[0] = smc->conn.rmb_desc->pages;
0184     } else {
0185         int size, left = len;
0186         void *buf = src;
0187         /* smcr that uses virtually contiguous RMBs*/
0188         for (i = 0; i < nr_pages; i++) {
0189             size = min_t(int, PAGE_SIZE - offset, left);
0190             priv[i]->len = size;
0191             priv[i]->smc = smc;
0192             pages[i] = vmalloc_to_page(buf);
0193             partial[i].offset = offset;
0194             partial[i].len = size;
0195             partial[i].private = (unsigned long)priv[i];
0196             buf += size / sizeof(*buf);
0197             left -= size;
0198             offset = 0;
0199         }
0200     }
0201     spd.nr_pages_max = nr_pages;
0202     spd.nr_pages = nr_pages;
0203     spd.pages = pages;
0204     spd.partial = partial;
0205     spd.ops = &smc_pipe_ops;
0206     spd.spd_release = smc_rx_spd_release;
0207 
0208     bytes = splice_to_pipe(pipe, &spd);
0209     if (bytes > 0) {
0210         sock_hold(&smc->sk);
0211         if (!lgr->is_smcd && smc->conn.rmb_desc->is_vm) {
0212             for (i = 0; i < PAGE_ALIGN(bytes + offset) / PAGE_SIZE; i++)
0213                 get_page(pages[i]);
0214         } else {
0215             get_page(smc->conn.rmb_desc->pages);
0216         }
0217         atomic_add(bytes, &smc->conn.splice_pending);
0218     }
0219     kfree(priv);
0220     kfree(partial);
0221     kfree(pages);
0222 
0223     return bytes;
0224 
0225 out_priv:
0226     for (i = (i - 1); i >= 0; i--)
0227         kfree(priv[i]);
0228     kfree(priv);
0229 out_part:
0230     kfree(partial);
0231 out_page:
0232     kfree(pages);
0233 out:
0234     return -ENOMEM;
0235 }
0236 
0237 static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn)
0238 {
0239     return atomic_read(&conn->bytes_to_rcv) &&
0240            !atomic_read(&conn->splice_pending);
0241 }
0242 
0243 /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
0244  *   @smc    smc socket
0245  *   @timeo  pointer to max seconds to wait, pointer to value 0 for no timeout
0246  *   @fcrit  add'l criterion to evaluate as function pointer
0247  * Returns:
0248  * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown.
0249  * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted).
0250  */
0251 int smc_rx_wait(struct smc_sock *smc, long *timeo,
0252         int (*fcrit)(struct smc_connection *conn))
0253 {
0254     DEFINE_WAIT_FUNC(wait, woken_wake_function);
0255     struct smc_connection *conn = &smc->conn;
0256     struct smc_cdc_conn_state_flags *cflags =
0257                     &conn->local_tx_ctrl.conn_state_flags;
0258     struct sock *sk = &smc->sk;
0259     int rc;
0260 
0261     if (fcrit(conn))
0262         return 1;
0263     sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
0264     add_wait_queue(sk_sleep(sk), &wait);
0265     rc = sk_wait_event(sk, timeo,
0266                sk->sk_err ||
0267                cflags->peer_conn_abort ||
0268                sk->sk_shutdown & RCV_SHUTDOWN ||
0269                conn->killed ||
0270                fcrit(conn),
0271                &wait);
0272     remove_wait_queue(sk_sleep(sk), &wait);
0273     sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
0274     return rc;
0275 }
0276 
0277 static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len,
0278                int flags)
0279 {
0280     struct smc_connection *conn = &smc->conn;
0281     union smc_host_cursor cons;
0282     struct sock *sk = &smc->sk;
0283     int rc = 0;
0284 
0285     if (sock_flag(sk, SOCK_URGINLINE) ||
0286         !(conn->urg_state == SMC_URG_VALID) ||
0287         conn->urg_state == SMC_URG_READ)
0288         return -EINVAL;
0289 
0290     SMC_STAT_INC(smc, urg_data_cnt);
0291     if (conn->urg_state == SMC_URG_VALID) {
0292         if (!(flags & MSG_PEEK))
0293             smc->conn.urg_state = SMC_URG_READ;
0294         msg->msg_flags |= MSG_OOB;
0295         if (len > 0) {
0296             if (!(flags & MSG_TRUNC))
0297                 rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1);
0298             len = 1;
0299             smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
0300             if (smc_curs_diff(conn->rmb_desc->len, &cons,
0301                       &conn->urg_curs) > 1)
0302                 conn->urg_rx_skip_pend = true;
0303             /* Urgent Byte was already accounted for, but trigger
0304              * skipping the urgent byte in non-inline case
0305              */
0306             if (!(flags & MSG_PEEK))
0307                 smc_rx_update_consumer(smc, cons, 0);
0308         } else {
0309             msg->msg_flags |= MSG_TRUNC;
0310         }
0311 
0312         return rc ? -EFAULT : len;
0313     }
0314 
0315     if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN)
0316         return 0;
0317 
0318     return -EAGAIN;
0319 }
0320 
0321 static bool smc_rx_recvmsg_data_available(struct smc_sock *smc)
0322 {
0323     struct smc_connection *conn = &smc->conn;
0324 
0325     if (smc_rx_data_available(conn))
0326         return true;
0327     else if (conn->urg_state == SMC_URG_VALID)
0328         /* we received a single urgent Byte - skip */
0329         smc_rx_update_cons(smc, 0);
0330     return false;
0331 }
0332 
0333 /* smc_rx_recvmsg - receive data from RMBE
0334  * @msg:    copy data to receive buffer
0335  * @pipe:   copy data to pipe if set - indicates splice() call
0336  *
0337  * rcvbuf consumer: main API called by socket layer.
0338  * Called under sk lock.
0339  */
0340 int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
0341            struct pipe_inode_info *pipe, size_t len, int flags)
0342 {
0343     size_t copylen, read_done = 0, read_remaining = len;
0344     size_t chunk_len, chunk_off, chunk_len_sum;
0345     struct smc_connection *conn = &smc->conn;
0346     int (*func)(struct smc_connection *conn);
0347     union smc_host_cursor cons;
0348     int readable, chunk;
0349     char *rcvbuf_base;
0350     struct sock *sk;
0351     int splbytes;
0352     long timeo;
0353     int target;     /* Read at least these many bytes */
0354     int rc;
0355 
0356     if (unlikely(flags & MSG_ERRQUEUE))
0357         return -EINVAL; /* future work for sk.sk_family == AF_SMC */
0358 
0359     sk = &smc->sk;
0360     if (sk->sk_state == SMC_LISTEN)
0361         return -ENOTCONN;
0362     if (flags & MSG_OOB)
0363         return smc_rx_recv_urg(smc, msg, len, flags);
0364     timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
0365     target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
0366 
0367     readable = atomic_read(&conn->bytes_to_rcv);
0368     if (readable >= conn->rmb_desc->len)
0369         SMC_STAT_RMB_RX_FULL(smc, !conn->lnk);
0370 
0371     if (len < readable)
0372         SMC_STAT_RMB_RX_SIZE_SMALL(smc, !conn->lnk);
0373     /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
0374     rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr;
0375 
0376     do { /* while (read_remaining) */
0377         if (read_done >= target || (pipe && read_done))
0378             break;
0379 
0380         if (conn->killed)
0381             break;
0382 
0383         if (smc_rx_recvmsg_data_available(smc))
0384             goto copy;
0385 
0386         if (sk->sk_shutdown & RCV_SHUTDOWN) {
0387             /* smc_cdc_msg_recv_action() could have run after
0388              * above smc_rx_recvmsg_data_available()
0389              */
0390             if (smc_rx_recvmsg_data_available(smc))
0391                 goto copy;
0392             break;
0393         }
0394 
0395         if (read_done) {
0396             if (sk->sk_err ||
0397                 sk->sk_state == SMC_CLOSED ||
0398                 !timeo ||
0399                 signal_pending(current))
0400                 break;
0401         } else {
0402             if (sk->sk_err) {
0403                 read_done = sock_error(sk);
0404                 break;
0405             }
0406             if (sk->sk_state == SMC_CLOSED) {
0407                 if (!sock_flag(sk, SOCK_DONE)) {
0408                     /* This occurs when user tries to read
0409                      * from never connected socket.
0410                      */
0411                     read_done = -ENOTCONN;
0412                     break;
0413                 }
0414                 break;
0415             }
0416             if (!timeo)
0417                 return -EAGAIN;
0418             if (signal_pending(current)) {
0419                 read_done = sock_intr_errno(timeo);
0420                 break;
0421             }
0422         }
0423 
0424         if (!smc_rx_data_available(conn)) {
0425             smc_rx_wait(smc, &timeo, smc_rx_data_available);
0426             continue;
0427         }
0428 
0429 copy:
0430         /* initialize variables for 1st iteration of subsequent loop */
0431         /* could be just 1 byte, even after waiting on data above */
0432         readable = atomic_read(&conn->bytes_to_rcv);
0433         splbytes = atomic_read(&conn->splice_pending);
0434         if (!readable || (msg && splbytes)) {
0435             if (splbytes)
0436                 func = smc_rx_data_available_and_no_splice_pend;
0437             else
0438                 func = smc_rx_data_available;
0439             smc_rx_wait(smc, &timeo, func);
0440             continue;
0441         }
0442 
0443         smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
0444         /* subsequent splice() calls pick up where previous left */
0445         if (splbytes)
0446             smc_curs_add(conn->rmb_desc->len, &cons, splbytes);
0447         if (conn->urg_state == SMC_URG_VALID &&
0448             sock_flag(&smc->sk, SOCK_URGINLINE) &&
0449             readable > 1)
0450             readable--; /* always stop at urgent Byte */
0451         /* not more than what user space asked for */
0452         copylen = min_t(size_t, read_remaining, readable);
0453         /* determine chunks where to read from rcvbuf */
0454         /* either unwrapped case, or 1st chunk of wrapped case */
0455         chunk_len = min_t(size_t, copylen, conn->rmb_desc->len -
0456                   cons.count);
0457         chunk_len_sum = chunk_len;
0458         chunk_off = cons.count;
0459         smc_rmb_sync_sg_for_cpu(conn);
0460         for (chunk = 0; chunk < 2; chunk++) {
0461             if (!(flags & MSG_TRUNC)) {
0462                 if (msg) {
0463                     rc = memcpy_to_msg(msg, rcvbuf_base +
0464                                chunk_off,
0465                                chunk_len);
0466                 } else {
0467                     rc = smc_rx_splice(pipe, rcvbuf_base +
0468                             chunk_off, chunk_len,
0469                             smc);
0470                 }
0471                 if (rc < 0) {
0472                     if (!read_done)
0473                         read_done = -EFAULT;
0474                     goto out;
0475                 }
0476             }
0477             read_remaining -= chunk_len;
0478             read_done += chunk_len;
0479 
0480             if (chunk_len_sum == copylen)
0481                 break; /* either on 1st or 2nd iteration */
0482             /* prepare next (== 2nd) iteration */
0483             chunk_len = copylen - chunk_len; /* remainder */
0484             chunk_len_sum += chunk_len;
0485             chunk_off = 0; /* modulo offset in recv ring buffer */
0486         }
0487 
0488         /* update cursors */
0489         if (!(flags & MSG_PEEK)) {
0490             /* increased in recv tasklet smc_cdc_msg_rcv() */
0491             smp_mb__before_atomic();
0492             atomic_sub(copylen, &conn->bytes_to_rcv);
0493             /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
0494             smp_mb__after_atomic();
0495             if (msg && smc_rx_update_consumer(smc, cons, copylen))
0496                 goto out;
0497         }
0498 
0499         trace_smc_rx_recvmsg(smc, copylen);
0500     } while (read_remaining);
0501 out:
0502     return read_done;
0503 }
0504 
0505 /* Initialize receive properties on connection establishment. NB: not __init! */
0506 void smc_rx_init(struct smc_sock *smc)
0507 {
0508     smc->sk.sk_data_ready = smc_rx_wake_up;
0509     atomic_set(&smc->conn.splice_pending, 0);
0510     smc->conn.urg_state = SMC_URG_READ;
0511 }