0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 #include <linux/net.h>
0014 #include <linux/rcupdate.h>
0015 #include <linux/sched/signal.h>
0016
0017 #include <net/sock.h>
0018
0019 #include "smc.h"
0020 #include "smc_core.h"
0021 #include "smc_cdc.h"
0022 #include "smc_tx.h" /* smc_tx_consumer_update() */
0023 #include "smc_rx.h"
0024 #include "smc_stats.h"
0025 #include "smc_tracepoint.h"
0026
0027
0028
0029
0030 static void smc_rx_wake_up(struct sock *sk)
0031 {
0032 struct socket_wq *wq;
0033
0034
0035
0036 rcu_read_lock();
0037 wq = rcu_dereference(sk->sk_wq);
0038 if (skwq_has_sleeper(wq))
0039 wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
0040 EPOLLRDNORM | EPOLLRDBAND);
0041 sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
0042 if ((sk->sk_shutdown == SHUTDOWN_MASK) ||
0043 (sk->sk_state == SMC_CLOSED))
0044 sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP);
0045 rcu_read_unlock();
0046 }
0047
0048
0049
0050
0051
0052
0053
0054
0055 static int smc_rx_update_consumer(struct smc_sock *smc,
0056 union smc_host_cursor cons, size_t len)
0057 {
0058 struct smc_connection *conn = &smc->conn;
0059 struct sock *sk = &smc->sk;
0060 bool force = false;
0061 int diff, rc = 0;
0062
0063 smc_curs_add(conn->rmb_desc->len, &cons, len);
0064
0065
0066 if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) {
0067 diff = smc_curs_comp(conn->rmb_desc->len, &cons,
0068 &conn->urg_curs);
0069 if (sock_flag(sk, SOCK_URGINLINE)) {
0070 if (diff == 0) {
0071 force = true;
0072 rc = 1;
0073 conn->urg_state = SMC_URG_READ;
0074 }
0075 } else {
0076 if (diff == 1) {
0077
0078 force = true;
0079 smc_curs_add(conn->rmb_desc->len, &cons, 1);
0080 conn->urg_rx_skip_pend = false;
0081 } else if (diff < -1)
0082
0083 conn->urg_state = SMC_URG_READ;
0084 }
0085 }
0086
0087 smc_curs_copy(&conn->local_tx_ctrl.cons, &cons, conn);
0088
0089
0090
0091 smc_tx_consumer_update(conn, force);
0092
0093 return rc;
0094 }
0095
0096 static void smc_rx_update_cons(struct smc_sock *smc, size_t len)
0097 {
0098 struct smc_connection *conn = &smc->conn;
0099 union smc_host_cursor cons;
0100
0101 smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
0102 smc_rx_update_consumer(smc, cons, len);
0103 }
0104
0105 struct smc_spd_priv {
0106 struct smc_sock *smc;
0107 size_t len;
0108 };
0109
0110 static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
0111 struct pipe_buffer *buf)
0112 {
0113 struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
0114 struct smc_sock *smc = priv->smc;
0115 struct smc_connection *conn;
0116 struct sock *sk = &smc->sk;
0117
0118 if (sk->sk_state == SMC_CLOSED ||
0119 sk->sk_state == SMC_PEERFINCLOSEWAIT ||
0120 sk->sk_state == SMC_APPFINCLOSEWAIT)
0121 goto out;
0122 conn = &smc->conn;
0123 lock_sock(sk);
0124 smc_rx_update_cons(smc, priv->len);
0125 release_sock(sk);
0126 if (atomic_sub_and_test(priv->len, &conn->splice_pending))
0127 smc_rx_wake_up(sk);
0128 out:
0129 kfree(priv);
0130 put_page(buf->page);
0131 sock_put(sk);
0132 }
0133
0134 static const struct pipe_buf_operations smc_pipe_ops = {
0135 .release = smc_rx_pipe_buf_release,
0136 .get = generic_pipe_buf_get
0137 };
0138
0139 static void smc_rx_spd_release(struct splice_pipe_desc *spd,
0140 unsigned int i)
0141 {
0142 put_page(spd->pages[i]);
0143 }
0144
0145 static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len,
0146 struct smc_sock *smc)
0147 {
0148 struct smc_link_group *lgr = smc->conn.lgr;
0149 int offset = offset_in_page(src);
0150 struct partial_page *partial;
0151 struct splice_pipe_desc spd;
0152 struct smc_spd_priv **priv;
0153 struct page **pages;
0154 int bytes, nr_pages;
0155 int i;
0156
0157 nr_pages = !lgr->is_smcd && smc->conn.rmb_desc->is_vm ?
0158 PAGE_ALIGN(len + offset) / PAGE_SIZE : 1;
0159
0160 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL);
0161 if (!pages)
0162 goto out;
0163 partial = kcalloc(nr_pages, sizeof(*partial), GFP_KERNEL);
0164 if (!partial)
0165 goto out_page;
0166 priv = kcalloc(nr_pages, sizeof(*priv), GFP_KERNEL);
0167 if (!priv)
0168 goto out_part;
0169 for (i = 0; i < nr_pages; i++) {
0170 priv[i] = kzalloc(sizeof(**priv), GFP_KERNEL);
0171 if (!priv[i])
0172 goto out_priv;
0173 }
0174
0175 if (lgr->is_smcd ||
0176 (!lgr->is_smcd && !smc->conn.rmb_desc->is_vm)) {
0177
0178 priv[0]->len = len;
0179 priv[0]->smc = smc;
0180 partial[0].offset = src - (char *)smc->conn.rmb_desc->cpu_addr;
0181 partial[0].len = len;
0182 partial[0].private = (unsigned long)priv[0];
0183 pages[0] = smc->conn.rmb_desc->pages;
0184 } else {
0185 int size, left = len;
0186 void *buf = src;
0187
0188 for (i = 0; i < nr_pages; i++) {
0189 size = min_t(int, PAGE_SIZE - offset, left);
0190 priv[i]->len = size;
0191 priv[i]->smc = smc;
0192 pages[i] = vmalloc_to_page(buf);
0193 partial[i].offset = offset;
0194 partial[i].len = size;
0195 partial[i].private = (unsigned long)priv[i];
0196 buf += size / sizeof(*buf);
0197 left -= size;
0198 offset = 0;
0199 }
0200 }
0201 spd.nr_pages_max = nr_pages;
0202 spd.nr_pages = nr_pages;
0203 spd.pages = pages;
0204 spd.partial = partial;
0205 spd.ops = &smc_pipe_ops;
0206 spd.spd_release = smc_rx_spd_release;
0207
0208 bytes = splice_to_pipe(pipe, &spd);
0209 if (bytes > 0) {
0210 sock_hold(&smc->sk);
0211 if (!lgr->is_smcd && smc->conn.rmb_desc->is_vm) {
0212 for (i = 0; i < PAGE_ALIGN(bytes + offset) / PAGE_SIZE; i++)
0213 get_page(pages[i]);
0214 } else {
0215 get_page(smc->conn.rmb_desc->pages);
0216 }
0217 atomic_add(bytes, &smc->conn.splice_pending);
0218 }
0219 kfree(priv);
0220 kfree(partial);
0221 kfree(pages);
0222
0223 return bytes;
0224
0225 out_priv:
0226 for (i = (i - 1); i >= 0; i--)
0227 kfree(priv[i]);
0228 kfree(priv);
0229 out_part:
0230 kfree(partial);
0231 out_page:
0232 kfree(pages);
0233 out:
0234 return -ENOMEM;
0235 }
0236
0237 static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn)
0238 {
0239 return atomic_read(&conn->bytes_to_rcv) &&
0240 !atomic_read(&conn->splice_pending);
0241 }
0242
0243
0244
0245
0246
0247
0248
0249
0250
0251 int smc_rx_wait(struct smc_sock *smc, long *timeo,
0252 int (*fcrit)(struct smc_connection *conn))
0253 {
0254 DEFINE_WAIT_FUNC(wait, woken_wake_function);
0255 struct smc_connection *conn = &smc->conn;
0256 struct smc_cdc_conn_state_flags *cflags =
0257 &conn->local_tx_ctrl.conn_state_flags;
0258 struct sock *sk = &smc->sk;
0259 int rc;
0260
0261 if (fcrit(conn))
0262 return 1;
0263 sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
0264 add_wait_queue(sk_sleep(sk), &wait);
0265 rc = sk_wait_event(sk, timeo,
0266 sk->sk_err ||
0267 cflags->peer_conn_abort ||
0268 sk->sk_shutdown & RCV_SHUTDOWN ||
0269 conn->killed ||
0270 fcrit(conn),
0271 &wait);
0272 remove_wait_queue(sk_sleep(sk), &wait);
0273 sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
0274 return rc;
0275 }
0276
0277 static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len,
0278 int flags)
0279 {
0280 struct smc_connection *conn = &smc->conn;
0281 union smc_host_cursor cons;
0282 struct sock *sk = &smc->sk;
0283 int rc = 0;
0284
0285 if (sock_flag(sk, SOCK_URGINLINE) ||
0286 !(conn->urg_state == SMC_URG_VALID) ||
0287 conn->urg_state == SMC_URG_READ)
0288 return -EINVAL;
0289
0290 SMC_STAT_INC(smc, urg_data_cnt);
0291 if (conn->urg_state == SMC_URG_VALID) {
0292 if (!(flags & MSG_PEEK))
0293 smc->conn.urg_state = SMC_URG_READ;
0294 msg->msg_flags |= MSG_OOB;
0295 if (len > 0) {
0296 if (!(flags & MSG_TRUNC))
0297 rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1);
0298 len = 1;
0299 smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
0300 if (smc_curs_diff(conn->rmb_desc->len, &cons,
0301 &conn->urg_curs) > 1)
0302 conn->urg_rx_skip_pend = true;
0303
0304
0305
0306 if (!(flags & MSG_PEEK))
0307 smc_rx_update_consumer(smc, cons, 0);
0308 } else {
0309 msg->msg_flags |= MSG_TRUNC;
0310 }
0311
0312 return rc ? -EFAULT : len;
0313 }
0314
0315 if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN)
0316 return 0;
0317
0318 return -EAGAIN;
0319 }
0320
0321 static bool smc_rx_recvmsg_data_available(struct smc_sock *smc)
0322 {
0323 struct smc_connection *conn = &smc->conn;
0324
0325 if (smc_rx_data_available(conn))
0326 return true;
0327 else if (conn->urg_state == SMC_URG_VALID)
0328
0329 smc_rx_update_cons(smc, 0);
0330 return false;
0331 }
0332
0333
0334
0335
0336
0337
0338
0339
0340 int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
0341 struct pipe_inode_info *pipe, size_t len, int flags)
0342 {
0343 size_t copylen, read_done = 0, read_remaining = len;
0344 size_t chunk_len, chunk_off, chunk_len_sum;
0345 struct smc_connection *conn = &smc->conn;
0346 int (*func)(struct smc_connection *conn);
0347 union smc_host_cursor cons;
0348 int readable, chunk;
0349 char *rcvbuf_base;
0350 struct sock *sk;
0351 int splbytes;
0352 long timeo;
0353 int target;
0354 int rc;
0355
0356 if (unlikely(flags & MSG_ERRQUEUE))
0357 return -EINVAL;
0358
0359 sk = &smc->sk;
0360 if (sk->sk_state == SMC_LISTEN)
0361 return -ENOTCONN;
0362 if (flags & MSG_OOB)
0363 return smc_rx_recv_urg(smc, msg, len, flags);
0364 timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
0365 target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
0366
0367 readable = atomic_read(&conn->bytes_to_rcv);
0368 if (readable >= conn->rmb_desc->len)
0369 SMC_STAT_RMB_RX_FULL(smc, !conn->lnk);
0370
0371 if (len < readable)
0372 SMC_STAT_RMB_RX_SIZE_SMALL(smc, !conn->lnk);
0373
0374 rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr;
0375
0376 do {
0377 if (read_done >= target || (pipe && read_done))
0378 break;
0379
0380 if (conn->killed)
0381 break;
0382
0383 if (smc_rx_recvmsg_data_available(smc))
0384 goto copy;
0385
0386 if (sk->sk_shutdown & RCV_SHUTDOWN) {
0387
0388
0389
0390 if (smc_rx_recvmsg_data_available(smc))
0391 goto copy;
0392 break;
0393 }
0394
0395 if (read_done) {
0396 if (sk->sk_err ||
0397 sk->sk_state == SMC_CLOSED ||
0398 !timeo ||
0399 signal_pending(current))
0400 break;
0401 } else {
0402 if (sk->sk_err) {
0403 read_done = sock_error(sk);
0404 break;
0405 }
0406 if (sk->sk_state == SMC_CLOSED) {
0407 if (!sock_flag(sk, SOCK_DONE)) {
0408
0409
0410
0411 read_done = -ENOTCONN;
0412 break;
0413 }
0414 break;
0415 }
0416 if (!timeo)
0417 return -EAGAIN;
0418 if (signal_pending(current)) {
0419 read_done = sock_intr_errno(timeo);
0420 break;
0421 }
0422 }
0423
0424 if (!smc_rx_data_available(conn)) {
0425 smc_rx_wait(smc, &timeo, smc_rx_data_available);
0426 continue;
0427 }
0428
0429 copy:
0430
0431
0432 readable = atomic_read(&conn->bytes_to_rcv);
0433 splbytes = atomic_read(&conn->splice_pending);
0434 if (!readable || (msg && splbytes)) {
0435 if (splbytes)
0436 func = smc_rx_data_available_and_no_splice_pend;
0437 else
0438 func = smc_rx_data_available;
0439 smc_rx_wait(smc, &timeo, func);
0440 continue;
0441 }
0442
0443 smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
0444
0445 if (splbytes)
0446 smc_curs_add(conn->rmb_desc->len, &cons, splbytes);
0447 if (conn->urg_state == SMC_URG_VALID &&
0448 sock_flag(&smc->sk, SOCK_URGINLINE) &&
0449 readable > 1)
0450 readable--;
0451
0452 copylen = min_t(size_t, read_remaining, readable);
0453
0454
0455 chunk_len = min_t(size_t, copylen, conn->rmb_desc->len -
0456 cons.count);
0457 chunk_len_sum = chunk_len;
0458 chunk_off = cons.count;
0459 smc_rmb_sync_sg_for_cpu(conn);
0460 for (chunk = 0; chunk < 2; chunk++) {
0461 if (!(flags & MSG_TRUNC)) {
0462 if (msg) {
0463 rc = memcpy_to_msg(msg, rcvbuf_base +
0464 chunk_off,
0465 chunk_len);
0466 } else {
0467 rc = smc_rx_splice(pipe, rcvbuf_base +
0468 chunk_off, chunk_len,
0469 smc);
0470 }
0471 if (rc < 0) {
0472 if (!read_done)
0473 read_done = -EFAULT;
0474 goto out;
0475 }
0476 }
0477 read_remaining -= chunk_len;
0478 read_done += chunk_len;
0479
0480 if (chunk_len_sum == copylen)
0481 break;
0482
0483 chunk_len = copylen - chunk_len;
0484 chunk_len_sum += chunk_len;
0485 chunk_off = 0;
0486 }
0487
0488
0489 if (!(flags & MSG_PEEK)) {
0490
0491 smp_mb__before_atomic();
0492 atomic_sub(copylen, &conn->bytes_to_rcv);
0493
0494 smp_mb__after_atomic();
0495 if (msg && smc_rx_update_consumer(smc, cons, copylen))
0496 goto out;
0497 }
0498
0499 trace_smc_rx_recvmsg(smc, copylen);
0500 } while (read_remaining);
0501 out:
0502 return read_done;
0503 }
0504
0505
0506 void smc_rx_init(struct smc_sock *smc)
0507 {
0508 smc->sk.sk_data_ready = smc_rx_wake_up;
0509 atomic_set(&smc->conn.splice_pending, 0);
0510 smc->conn.urg_state = SMC_URG_READ;
0511 }