0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #include <linux/net.h>
0017 #include <linux/rcupdate.h>
0018 #include <linux/workqueue.h>
0019 #include <linux/sched/signal.h>
0020
0021 #include <net/sock.h>
0022 #include <net/tcp.h>
0023
0024 #include "smc.h"
0025 #include "smc_wr.h"
0026 #include "smc_cdc.h"
0027 #include "smc_close.h"
0028 #include "smc_ism.h"
0029 #include "smc_tx.h"
0030 #include "smc_stats.h"
0031 #include "smc_tracepoint.h"
0032
0033 #define SMC_TX_WORK_DELAY 0
0034
0035
0036
0037
0038
0039
0040
0041 static void smc_tx_write_space(struct sock *sk)
0042 {
0043 struct socket *sock = sk->sk_socket;
0044 struct smc_sock *smc = smc_sk(sk);
0045 struct socket_wq *wq;
0046
0047
0048 if (atomic_read(&smc->conn.sndbuf_space) && sock) {
0049 if (test_bit(SOCK_NOSPACE, &sock->flags))
0050 SMC_STAT_RMB_TX_FULL(smc, !smc->conn.lnk);
0051 clear_bit(SOCK_NOSPACE, &sock->flags);
0052 rcu_read_lock();
0053 wq = rcu_dereference(sk->sk_wq);
0054 if (skwq_has_sleeper(wq))
0055 wake_up_interruptible_poll(&wq->wait,
0056 EPOLLOUT | EPOLLWRNORM |
0057 EPOLLWRBAND);
0058 if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
0059 sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
0060 rcu_read_unlock();
0061 }
0062 }
0063
0064
0065
0066
0067 void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
0068 {
0069 if (smc->sk.sk_socket &&
0070 test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
0071 smc->sk.sk_write_space(&smc->sk);
0072 }
0073
0074
0075
0076
0077 static int smc_tx_wait(struct smc_sock *smc, int flags)
0078 {
0079 DEFINE_WAIT_FUNC(wait, woken_wake_function);
0080 struct smc_connection *conn = &smc->conn;
0081 struct sock *sk = &smc->sk;
0082 long timeo;
0083 int rc = 0;
0084
0085
0086 timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
0087 add_wait_queue(sk_sleep(sk), &wait);
0088 while (1) {
0089 sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
0090 if (sk->sk_err ||
0091 (sk->sk_shutdown & SEND_SHUTDOWN) ||
0092 conn->killed ||
0093 conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
0094 rc = -EPIPE;
0095 break;
0096 }
0097 if (smc_cdc_rxed_any_close(conn)) {
0098 rc = -ECONNRESET;
0099 break;
0100 }
0101 if (!timeo) {
0102
0103 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
0104 rc = -EAGAIN;
0105 break;
0106 }
0107 if (signal_pending(current)) {
0108 rc = sock_intr_errno(timeo);
0109 break;
0110 }
0111 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
0112 if (atomic_read(&conn->sndbuf_space) && !conn->urg_tx_pend)
0113 break;
0114 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
0115 sk_wait_event(sk, &timeo,
0116 sk->sk_err ||
0117 (sk->sk_shutdown & SEND_SHUTDOWN) ||
0118 smc_cdc_rxed_any_close(conn) ||
0119 (atomic_read(&conn->sndbuf_space) &&
0120 !conn->urg_tx_pend),
0121 &wait);
0122 }
0123 remove_wait_queue(sk_sleep(sk), &wait);
0124 return rc;
0125 }
0126
0127 static bool smc_tx_is_corked(struct smc_sock *smc)
0128 {
0129 struct tcp_sock *tp = tcp_sk(smc->clcsock->sk);
0130
0131 return (tp->nonagle & TCP_NAGLE_CORK) ? true : false;
0132 }
0133
0134
0135
0136
0137
0138
0139
0140
0141
0142
0143
0144
0145 static bool smc_should_autocork(struct smc_sock *smc)
0146 {
0147 struct smc_connection *conn = &smc->conn;
0148 int corking_size;
0149
0150 corking_size = min_t(unsigned int, conn->sndbuf_desc->len >> 1,
0151 sock_net(&smc->sk)->smc.sysctl_autocorking_size);
0152
0153 if (atomic_read(&conn->cdc_pend_tx_wr) == 0 ||
0154 smc_tx_prepared_sends(conn) > corking_size)
0155 return false;
0156 return true;
0157 }
0158
0159 static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg)
0160 {
0161 struct smc_connection *conn = &smc->conn;
0162
0163 if (smc_should_autocork(smc))
0164 return true;
0165
0166
0167
0168
0169
0170 if ((msg->msg_flags & MSG_MORE ||
0171 smc_tx_is_corked(smc) ||
0172 msg->msg_flags & MSG_SENDPAGE_NOTLAST) &&
0173 atomic_read(&conn->sndbuf_space))
0174 return true;
0175
0176 return false;
0177 }
0178
0179
0180
0181
0182 int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
0183 {
0184 size_t copylen, send_done = 0, send_remaining = len;
0185 size_t chunk_len, chunk_off, chunk_len_sum;
0186 struct smc_connection *conn = &smc->conn;
0187 union smc_host_cursor prep;
0188 struct sock *sk = &smc->sk;
0189 char *sndbuf_base;
0190 int tx_cnt_prep;
0191 int writespace;
0192 int rc, chunk;
0193
0194
0195 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
0196
0197 if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
0198 rc = -EPIPE;
0199 goto out_err;
0200 }
0201
0202 if (sk->sk_state == SMC_INIT)
0203 return -ENOTCONN;
0204
0205 if (len > conn->sndbuf_desc->len)
0206 SMC_STAT_RMB_TX_SIZE_SMALL(smc, !conn->lnk);
0207
0208 if (len > conn->peer_rmbe_size)
0209 SMC_STAT_RMB_TX_PEER_SIZE_SMALL(smc, !conn->lnk);
0210
0211 if (msg->msg_flags & MSG_OOB)
0212 SMC_STAT_INC(smc, urg_data_cnt);
0213
0214 while (msg_data_left(msg)) {
0215 if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
0216 (smc->sk.sk_err == ECONNABORTED) ||
0217 conn->killed)
0218 return -EPIPE;
0219 if (smc_cdc_rxed_any_close(conn))
0220 return send_done ?: -ECONNRESET;
0221
0222 if (msg->msg_flags & MSG_OOB)
0223 conn->local_tx_ctrl.prod_flags.urg_data_pending = 1;
0224
0225 if (!atomic_read(&conn->sndbuf_space) || conn->urg_tx_pend) {
0226 if (send_done)
0227 return send_done;
0228 rc = smc_tx_wait(smc, msg->msg_flags);
0229 if (rc)
0230 goto out_err;
0231 continue;
0232 }
0233
0234
0235
0236 writespace = atomic_read(&conn->sndbuf_space);
0237
0238 copylen = min_t(size_t, send_remaining, writespace);
0239
0240 sndbuf_base = conn->sndbuf_desc->cpu_addr;
0241 smc_curs_copy(&prep, &conn->tx_curs_prep, conn);
0242 tx_cnt_prep = prep.count;
0243
0244
0245 chunk_len = min_t(size_t, copylen, conn->sndbuf_desc->len -
0246 tx_cnt_prep);
0247 chunk_len_sum = chunk_len;
0248 chunk_off = tx_cnt_prep;
0249 for (chunk = 0; chunk < 2; chunk++) {
0250 rc = memcpy_from_msg(sndbuf_base + chunk_off,
0251 msg, chunk_len);
0252 if (rc) {
0253 smc_sndbuf_sync_sg_for_device(conn);
0254 if (send_done)
0255 return send_done;
0256 goto out_err;
0257 }
0258 send_done += chunk_len;
0259 send_remaining -= chunk_len;
0260
0261 if (chunk_len_sum == copylen)
0262 break;
0263
0264 chunk_len = copylen - chunk_len;
0265 chunk_len_sum += chunk_len;
0266 chunk_off = 0;
0267 }
0268 smc_sndbuf_sync_sg_for_device(conn);
0269
0270 smc_curs_add(conn->sndbuf_desc->len, &prep, copylen);
0271 smc_curs_copy(&conn->tx_curs_prep, &prep, conn);
0272
0273 smp_mb__before_atomic();
0274 atomic_sub(copylen, &conn->sndbuf_space);
0275
0276 smp_mb__after_atomic();
0277
0278
0279
0280 if ((msg->msg_flags & MSG_OOB) && !send_remaining)
0281 conn->urg_tx_pend = true;
0282
0283
0284
0285 if (!smc_tx_should_cork(smc, msg))
0286 smc_tx_sndbuf_nonempty(conn);
0287
0288 trace_smc_tx_sendmsg(smc, copylen);
0289 }
0290
0291 return send_done;
0292
0293 out_err:
0294 rc = sk_stream_error(sk, msg->msg_flags, rc);
0295
0296 if (unlikely(rc == -EAGAIN))
0297 sk->sk_write_space(sk);
0298 return rc;
0299 }
0300
0301 int smc_tx_sendpage(struct smc_sock *smc, struct page *page, int offset,
0302 size_t size, int flags)
0303 {
0304 struct msghdr msg = {.msg_flags = flags};
0305 char *kaddr = kmap(page);
0306 struct kvec iov;
0307 int rc;
0308
0309 iov.iov_base = kaddr + offset;
0310 iov.iov_len = size;
0311 iov_iter_kvec(&msg.msg_iter, WRITE, &iov, 1, size);
0312 rc = smc_tx_sendmsg(smc, &msg, size);
0313 kunmap(page);
0314 return rc;
0315 }
0316
0317
0318
0319
0320 int smcd_tx_ism_write(struct smc_connection *conn, void *data, size_t len,
0321 u32 offset, int signal)
0322 {
0323 int rc;
0324
0325 rc = smc_ism_write(conn->lgr->smcd, conn->peer_token,
0326 conn->peer_rmbe_idx, signal, conn->tx_off + offset,
0327 data, len);
0328 if (rc)
0329 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
0330 return rc;
0331 }
0332
0333
0334 static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
0335 int num_sges, struct ib_rdma_wr *rdma_wr)
0336 {
0337 struct smc_link_group *lgr = conn->lgr;
0338 struct smc_link *link = conn->lnk;
0339 int rc;
0340
0341 rdma_wr->wr.wr_id = smc_wr_tx_get_next_wr_id(link);
0342 rdma_wr->wr.num_sge = num_sges;
0343 rdma_wr->remote_addr =
0344 lgr->rtokens[conn->rtoken_idx][link->link_idx].dma_addr +
0345
0346 conn->tx_off +
0347
0348 peer_rmbe_offset;
0349 rdma_wr->rkey = lgr->rtokens[conn->rtoken_idx][link->link_idx].rkey;
0350 rc = ib_post_send(link->roce_qp, &rdma_wr->wr, NULL);
0351 if (rc)
0352 smcr_link_down_cond_sched(link);
0353 return rc;
0354 }
0355
0356
0357 static inline void smc_tx_advance_cursors(struct smc_connection *conn,
0358 union smc_host_cursor *prod,
0359 union smc_host_cursor *sent,
0360 size_t len)
0361 {
0362 smc_curs_add(conn->peer_rmbe_size, prod, len);
0363
0364 smp_mb__before_atomic();
0365
0366 atomic_sub(len, &conn->peer_rmbe_space);
0367
0368 smp_mb__after_atomic();
0369 smc_curs_add(conn->sndbuf_desc->len, sent, len);
0370 }
0371
0372
0373 static int smcr_tx_rdma_writes(struct smc_connection *conn, size_t len,
0374 size_t src_off, size_t src_len,
0375 size_t dst_off, size_t dst_len,
0376 struct smc_rdma_wr *wr_rdma_buf)
0377 {
0378 struct smc_link *link = conn->lnk;
0379
0380 dma_addr_t dma_addr =
0381 sg_dma_address(conn->sndbuf_desc->sgt[link->link_idx].sgl);
0382 u64 virt_addr = (uintptr_t)conn->sndbuf_desc->cpu_addr;
0383 int src_len_sum = src_len, dst_len_sum = dst_len;
0384 int sent_count = src_off;
0385 int srcchunk, dstchunk;
0386 int num_sges;
0387 int rc;
0388
0389 for (dstchunk = 0; dstchunk < 2; dstchunk++) {
0390 struct ib_rdma_wr *wr = &wr_rdma_buf->wr_tx_rdma[dstchunk];
0391 struct ib_sge *sge = wr->wr.sg_list;
0392 u64 base_addr = dma_addr;
0393
0394 if (dst_len < link->qp_attr.cap.max_inline_data) {
0395 base_addr = virt_addr;
0396 wr->wr.send_flags |= IB_SEND_INLINE;
0397 } else {
0398 wr->wr.send_flags &= ~IB_SEND_INLINE;
0399 }
0400
0401 num_sges = 0;
0402 for (srcchunk = 0; srcchunk < 2; srcchunk++) {
0403 sge[srcchunk].addr = conn->sndbuf_desc->is_vm ?
0404 (virt_addr + src_off) : (base_addr + src_off);
0405 sge[srcchunk].length = src_len;
0406 if (conn->sndbuf_desc->is_vm)
0407 sge[srcchunk].lkey =
0408 conn->sndbuf_desc->mr[link->link_idx]->lkey;
0409 num_sges++;
0410
0411 src_off += src_len;
0412 if (src_off >= conn->sndbuf_desc->len)
0413 src_off -= conn->sndbuf_desc->len;
0414
0415 if (src_len_sum == dst_len)
0416 break;
0417
0418 src_len = dst_len - src_len;
0419 src_len_sum += src_len;
0420 }
0421 rc = smc_tx_rdma_write(conn, dst_off, num_sges, wr);
0422 if (rc)
0423 return rc;
0424 if (dst_len_sum == len)
0425 break;
0426
0427 dst_off = 0;
0428 dst_len = len - dst_len;
0429 dst_len_sum += dst_len;
0430 src_len = min_t(int, dst_len, conn->sndbuf_desc->len -
0431 sent_count);
0432 src_len_sum = src_len;
0433 }
0434 return 0;
0435 }
0436
0437
0438 static int smcd_tx_rdma_writes(struct smc_connection *conn, size_t len,
0439 size_t src_off, size_t src_len,
0440 size_t dst_off, size_t dst_len)
0441 {
0442 int src_len_sum = src_len, dst_len_sum = dst_len;
0443 int srcchunk, dstchunk;
0444 int rc;
0445
0446 for (dstchunk = 0; dstchunk < 2; dstchunk++) {
0447 for (srcchunk = 0; srcchunk < 2; srcchunk++) {
0448 void *data = conn->sndbuf_desc->cpu_addr + src_off;
0449
0450 rc = smcd_tx_ism_write(conn, data, src_len, dst_off +
0451 sizeof(struct smcd_cdc_msg), 0);
0452 if (rc)
0453 return rc;
0454 dst_off += src_len;
0455 src_off += src_len;
0456 if (src_off >= conn->sndbuf_desc->len)
0457 src_off -= conn->sndbuf_desc->len;
0458
0459 if (src_len_sum == dst_len)
0460 break;
0461
0462 src_len = dst_len - src_len;
0463 src_len_sum += src_len;
0464 }
0465 if (dst_len_sum == len)
0466 break;
0467
0468 dst_off = 0;
0469 dst_len = len - dst_len;
0470 dst_len_sum += dst_len;
0471 src_len = min_t(int, dst_len, conn->sndbuf_desc->len - src_off);
0472 src_len_sum = src_len;
0473 }
0474 return 0;
0475 }
0476
0477
0478
0479
0480 static int smc_tx_rdma_writes(struct smc_connection *conn,
0481 struct smc_rdma_wr *wr_rdma_buf)
0482 {
0483 size_t len, src_len, dst_off, dst_len;
0484 union smc_host_cursor sent, prep, prod, cons;
0485 struct smc_cdc_producer_flags *pflags;
0486 int to_send, rmbespace;
0487 int rc;
0488
0489
0490 smc_curs_copy(&sent, &conn->tx_curs_sent, conn);
0491 smc_curs_copy(&prep, &conn->tx_curs_prep, conn);
0492
0493 to_send = smc_curs_diff(conn->sndbuf_desc->len, &sent, &prep);
0494 if (to_send <= 0)
0495 return 0;
0496
0497
0498
0499 rmbespace = atomic_read(&conn->peer_rmbe_space);
0500 if (rmbespace <= 0) {
0501 struct smc_sock *smc = container_of(conn, struct smc_sock,
0502 conn);
0503 SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk);
0504 return 0;
0505 }
0506 smc_curs_copy(&prod, &conn->local_tx_ctrl.prod, conn);
0507 smc_curs_copy(&cons, &conn->local_rx_ctrl.cons, conn);
0508
0509
0510 pflags = &conn->local_tx_ctrl.prod_flags;
0511 pflags->write_blocked = (to_send >= rmbespace);
0512
0513 len = min(to_send, rmbespace);
0514
0515
0516 dst_off = prod.count;
0517 if (prod.wrap == cons.wrap) {
0518
0519
0520
0521
0522
0523 dst_len = min_t(size_t,
0524 conn->peer_rmbe_size - prod.count, len);
0525 } else {
0526
0527
0528
0529
0530 dst_len = len;
0531 }
0532
0533 if (sent.count + dst_len <= conn->sndbuf_desc->len) {
0534
0535 src_len = dst_len;
0536 } else {
0537
0538 src_len = conn->sndbuf_desc->len - sent.count;
0539 }
0540
0541 if (conn->lgr->is_smcd)
0542 rc = smcd_tx_rdma_writes(conn, len, sent.count, src_len,
0543 dst_off, dst_len);
0544 else
0545 rc = smcr_tx_rdma_writes(conn, len, sent.count, src_len,
0546 dst_off, dst_len, wr_rdma_buf);
0547 if (rc)
0548 return rc;
0549
0550 if (conn->urg_tx_pend && len == to_send)
0551 pflags->urg_data_present = 1;
0552 smc_tx_advance_cursors(conn, &prod, &sent, len);
0553
0554 smc_curs_copy(&conn->local_tx_ctrl.prod, &prod, conn);
0555
0556 smc_curs_copy(&conn->tx_curs_sent, &sent, conn);
0557
0558 return 0;
0559 }
0560
0561
0562
0563
0564 static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
0565 {
0566 struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
0567 struct smc_link *link = conn->lnk;
0568 struct smc_rdma_wr *wr_rdma_buf;
0569 struct smc_cdc_tx_pend *pend;
0570 struct smc_wr_buf *wr_buf;
0571 int rc;
0572
0573 if (!link || !smc_wr_tx_link_hold(link))
0574 return -ENOLINK;
0575 rc = smc_cdc_get_free_slot(conn, link, &wr_buf, &wr_rdma_buf, &pend);
0576 if (rc < 0) {
0577 smc_wr_tx_link_put(link);
0578 if (rc == -EBUSY) {
0579 struct smc_sock *smc =
0580 container_of(conn, struct smc_sock, conn);
0581
0582 if (smc->sk.sk_err == ECONNABORTED)
0583 return sock_error(&smc->sk);
0584 if (conn->killed)
0585 return -EPIPE;
0586 rc = 0;
0587 mod_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
0588 SMC_TX_WORK_DELAY);
0589 }
0590 return rc;
0591 }
0592
0593 spin_lock_bh(&conn->send_lock);
0594 if (link != conn->lnk) {
0595
0596 smc_wr_tx_put_slot(link,
0597 (struct smc_wr_tx_pend_priv *)pend);
0598 rc = -ENOLINK;
0599 goto out_unlock;
0600 }
0601 if (!pflags->urg_data_present) {
0602 rc = smc_tx_rdma_writes(conn, wr_rdma_buf);
0603 if (rc) {
0604 smc_wr_tx_put_slot(link,
0605 (struct smc_wr_tx_pend_priv *)pend);
0606 goto out_unlock;
0607 }
0608 }
0609
0610 rc = smc_cdc_msg_send(conn, wr_buf, pend);
0611 if (!rc && pflags->urg_data_present) {
0612 pflags->urg_data_pending = 0;
0613 pflags->urg_data_present = 0;
0614 }
0615
0616 out_unlock:
0617 spin_unlock_bh(&conn->send_lock);
0618 smc_wr_tx_link_put(link);
0619 return rc;
0620 }
0621
0622 static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn)
0623 {
0624 struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
0625 int rc = 0;
0626
0627 spin_lock_bh(&conn->send_lock);
0628 if (!pflags->urg_data_present)
0629 rc = smc_tx_rdma_writes(conn, NULL);
0630 if (!rc)
0631 rc = smcd_cdc_msg_send(conn);
0632
0633 if (!rc && pflags->urg_data_present) {
0634 pflags->urg_data_pending = 0;
0635 pflags->urg_data_present = 0;
0636 }
0637 spin_unlock_bh(&conn->send_lock);
0638 return rc;
0639 }
0640
0641 static int __smc_tx_sndbuf_nonempty(struct smc_connection *conn)
0642 {
0643 struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
0644 int rc = 0;
0645
0646
0647 if (unlikely(smc_tx_prepared_sends(conn) <= 0))
0648 goto out;
0649
0650
0651 if (unlikely(atomic_read(&conn->peer_rmbe_space) <= 0)) {
0652 SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk);
0653 goto out;
0654 }
0655
0656 if (conn->killed ||
0657 conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
0658 rc = -EPIPE;
0659 goto out;
0660 }
0661 if (conn->lgr->is_smcd)
0662 rc = smcd_tx_sndbuf_nonempty(conn);
0663 else
0664 rc = smcr_tx_sndbuf_nonempty(conn);
0665
0666 if (!rc) {
0667
0668 smc_close_wake_tx_prepared(smc);
0669 }
0670
0671 out:
0672 return rc;
0673 }
0674
0675 int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
0676 {
0677 int rc;
0678
0679
0680
0681
0682
0683 if (atomic_inc_return(&conn->tx_pushing) > 1)
0684 return 0;
0685
0686 again:
0687 atomic_set(&conn->tx_pushing, 1);
0688 smp_wmb();
0689 rc = __smc_tx_sndbuf_nonempty(conn);
0690
0691
0692
0693
0694
0695
0696
0697 if (unlikely(!atomic_dec_and_test(&conn->tx_pushing)))
0698 goto again;
0699
0700 return rc;
0701 }
0702
0703
0704
0705
0706
0707 void smc_tx_pending(struct smc_connection *conn)
0708 {
0709 struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
0710 int rc;
0711
0712 if (smc->sk.sk_err)
0713 return;
0714
0715 rc = smc_tx_sndbuf_nonempty(conn);
0716 if (!rc && conn->local_rx_ctrl.prod_flags.write_blocked &&
0717 !atomic_read(&conn->bytes_to_rcv))
0718 conn->local_rx_ctrl.prod_flags.write_blocked = 0;
0719 }
0720
0721
0722
0723
0724
0725 void smc_tx_work(struct work_struct *work)
0726 {
0727 struct smc_connection *conn = container_of(to_delayed_work(work),
0728 struct smc_connection,
0729 tx_work);
0730 struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
0731
0732 lock_sock(&smc->sk);
0733 smc_tx_pending(conn);
0734 release_sock(&smc->sk);
0735 }
0736
0737 void smc_tx_consumer_update(struct smc_connection *conn, bool force)
0738 {
0739 union smc_host_cursor cfed, cons, prod;
0740 int sender_free = conn->rmb_desc->len;
0741 int to_confirm;
0742
0743 smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
0744 smc_curs_copy(&cfed, &conn->rx_curs_confirmed, conn);
0745 to_confirm = smc_curs_diff(conn->rmb_desc->len, &cfed, &cons);
0746 if (to_confirm > conn->rmbe_update_limit) {
0747 smc_curs_copy(&prod, &conn->local_rx_ctrl.prod, conn);
0748 sender_free = conn->rmb_desc->len -
0749 smc_curs_diff_large(conn->rmb_desc->len,
0750 &cfed, &prod);
0751 }
0752
0753 if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
0754 force ||
0755 ((to_confirm > conn->rmbe_update_limit) &&
0756 ((sender_free <= (conn->rmb_desc->len / 2)) ||
0757 conn->local_rx_ctrl.prod_flags.write_blocked))) {
0758 if (conn->killed ||
0759 conn->local_rx_ctrl.conn_state_flags.peer_conn_abort)
0760 return;
0761 if ((smc_cdc_get_slot_and_msg_send(conn) < 0) &&
0762 !conn->killed) {
0763 queue_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
0764 SMC_TX_WORK_DELAY);
0765 return;
0766 }
0767 }
0768 if (conn->local_rx_ctrl.prod_flags.write_blocked &&
0769 !atomic_read(&conn->bytes_to_rcv))
0770 conn->local_rx_ctrl.prod_flags.write_blocked = 0;
0771 }
0772
0773
0774
0775
0776 void smc_tx_init(struct smc_sock *smc)
0777 {
0778 smc->sk.sk_write_space = smc_tx_write_space;
0779 }