Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0 or BSD-3-Clause
0002 
0003 /* Authors: Bernard Metzler <bmt@zurich.ibm.com> */
0004 /* Copyright (c) 2008-2019, IBM Corporation */
0005 
0006 #include <linux/errno.h>
0007 #include <linux/types.h>
0008 #include <linux/net.h>
0009 #include <linux/scatterlist.h>
0010 #include <linux/highmem.h>
0011 #include <net/tcp.h>
0012 
0013 #include <rdma/iw_cm.h>
0014 #include <rdma/ib_verbs.h>
0015 #include <rdma/ib_user_verbs.h>
0016 
0017 #include "siw.h"
0018 #include "siw_verbs.h"
0019 #include "siw_mem.h"
0020 
0021 #define MAX_HDR_INLINE                  \
0022     (((uint32_t)(sizeof(struct siw_rreq_pkt) -  \
0023              sizeof(struct iwarp_send))) & 0xF8)
0024 
0025 static struct page *siw_get_pblpage(struct siw_mem *mem, u64 addr, int *idx)
0026 {
0027     struct siw_pbl *pbl = mem->pbl;
0028     u64 offset = addr - mem->va;
0029     dma_addr_t paddr = siw_pbl_get_buffer(pbl, offset, NULL, idx);
0030 
0031     if (paddr)
0032         return virt_to_page((void *)paddr);
0033 
0034     return NULL;
0035 }
0036 
0037 /*
0038  * Copy short payload at provided destination payload address
0039  */
0040 static int siw_try_1seg(struct siw_iwarp_tx *c_tx, void *paddr)
0041 {
0042     struct siw_wqe *wqe = &c_tx->wqe_active;
0043     struct siw_sge *sge = &wqe->sqe.sge[0];
0044     u32 bytes = sge->length;
0045 
0046     if (bytes > MAX_HDR_INLINE || wqe->sqe.num_sge != 1)
0047         return MAX_HDR_INLINE + 1;
0048 
0049     if (!bytes)
0050         return 0;
0051 
0052     if (tx_flags(wqe) & SIW_WQE_INLINE) {
0053         memcpy(paddr, &wqe->sqe.sge[1], bytes);
0054     } else {
0055         struct siw_mem *mem = wqe->mem[0];
0056 
0057         if (!mem->mem_obj) {
0058             /* Kernel client using kva */
0059             memcpy(paddr,
0060                    (const void *)(uintptr_t)sge->laddr, bytes);
0061         } else if (c_tx->in_syscall) {
0062             if (copy_from_user(paddr, u64_to_user_ptr(sge->laddr),
0063                        bytes))
0064                 return -EFAULT;
0065         } else {
0066             unsigned int off = sge->laddr & ~PAGE_MASK;
0067             struct page *p;
0068             char *buffer;
0069             int pbl_idx = 0;
0070 
0071             if (!mem->is_pbl)
0072                 p = siw_get_upage(mem->umem, sge->laddr);
0073             else
0074                 p = siw_get_pblpage(mem, sge->laddr, &pbl_idx);
0075 
0076             if (unlikely(!p))
0077                 return -EFAULT;
0078 
0079             buffer = kmap_local_page(p);
0080 
0081             if (likely(PAGE_SIZE - off >= bytes)) {
0082                 memcpy(paddr, buffer + off, bytes);
0083             } else {
0084                 unsigned long part = bytes - (PAGE_SIZE - off);
0085 
0086                 memcpy(paddr, buffer + off, part);
0087                 kunmap_local(buffer);
0088 
0089                 if (!mem->is_pbl)
0090                     p = siw_get_upage(mem->umem,
0091                               sge->laddr + part);
0092                 else
0093                     p = siw_get_pblpage(mem,
0094                                 sge->laddr + part,
0095                                 &pbl_idx);
0096                 if (unlikely(!p))
0097                     return -EFAULT;
0098 
0099                 buffer = kmap_local_page(p);
0100                 memcpy(paddr + part, buffer, bytes - part);
0101             }
0102             kunmap_local(buffer);
0103         }
0104     }
0105     return (int)bytes;
0106 }
0107 
0108 #define PKT_FRAGMENTED 1
0109 #define PKT_COMPLETE 0
0110 
0111 /*
0112  * siw_qp_prepare_tx()
0113  *
0114  * Prepare tx state for sending out one fpdu. Builds complete pkt
0115  * if no user data or only immediate data are present.
0116  *
0117  * returns PKT_COMPLETE if complete pkt built, PKT_FRAGMENTED otherwise.
0118  */
0119 static int siw_qp_prepare_tx(struct siw_iwarp_tx *c_tx)
0120 {
0121     struct siw_wqe *wqe = &c_tx->wqe_active;
0122     char *crc = NULL;
0123     int data = 0;
0124 
0125     switch (tx_type(wqe)) {
0126     case SIW_OP_READ:
0127     case SIW_OP_READ_LOCAL_INV:
0128         memcpy(&c_tx->pkt.ctrl,
0129                &iwarp_pktinfo[RDMAP_RDMA_READ_REQ].ctrl,
0130                sizeof(struct iwarp_ctrl));
0131 
0132         c_tx->pkt.rreq.rsvd = 0;
0133         c_tx->pkt.rreq.ddp_qn = htonl(RDMAP_UNTAGGED_QN_RDMA_READ);
0134         c_tx->pkt.rreq.ddp_msn =
0135             htonl(++c_tx->ddp_msn[RDMAP_UNTAGGED_QN_RDMA_READ]);
0136         c_tx->pkt.rreq.ddp_mo = 0;
0137         c_tx->pkt.rreq.sink_stag = htonl(wqe->sqe.sge[0].lkey);
0138         c_tx->pkt.rreq.sink_to =
0139             cpu_to_be64(wqe->sqe.sge[0].laddr);
0140         c_tx->pkt.rreq.source_stag = htonl(wqe->sqe.rkey);
0141         c_tx->pkt.rreq.source_to = cpu_to_be64(wqe->sqe.raddr);
0142         c_tx->pkt.rreq.read_size = htonl(wqe->sqe.sge[0].length);
0143 
0144         c_tx->ctrl_len = sizeof(struct iwarp_rdma_rreq);
0145         crc = (char *)&c_tx->pkt.rreq_pkt.crc;
0146         break;
0147 
0148     case SIW_OP_SEND:
0149         if (tx_flags(wqe) & SIW_WQE_SOLICITED)
0150             memcpy(&c_tx->pkt.ctrl,
0151                    &iwarp_pktinfo[RDMAP_SEND_SE].ctrl,
0152                    sizeof(struct iwarp_ctrl));
0153         else
0154             memcpy(&c_tx->pkt.ctrl, &iwarp_pktinfo[RDMAP_SEND].ctrl,
0155                    sizeof(struct iwarp_ctrl));
0156 
0157         c_tx->pkt.send.ddp_qn = RDMAP_UNTAGGED_QN_SEND;
0158         c_tx->pkt.send.ddp_msn =
0159             htonl(++c_tx->ddp_msn[RDMAP_UNTAGGED_QN_SEND]);
0160         c_tx->pkt.send.ddp_mo = 0;
0161 
0162         c_tx->pkt.send_inv.inval_stag = 0;
0163 
0164         c_tx->ctrl_len = sizeof(struct iwarp_send);
0165 
0166         crc = (char *)&c_tx->pkt.send_pkt.crc;
0167         data = siw_try_1seg(c_tx, crc);
0168         break;
0169 
0170     case SIW_OP_SEND_REMOTE_INV:
0171         if (tx_flags(wqe) & SIW_WQE_SOLICITED)
0172             memcpy(&c_tx->pkt.ctrl,
0173                    &iwarp_pktinfo[RDMAP_SEND_SE_INVAL].ctrl,
0174                    sizeof(struct iwarp_ctrl));
0175         else
0176             memcpy(&c_tx->pkt.ctrl,
0177                    &iwarp_pktinfo[RDMAP_SEND_INVAL].ctrl,
0178                    sizeof(struct iwarp_ctrl));
0179 
0180         c_tx->pkt.send.ddp_qn = RDMAP_UNTAGGED_QN_SEND;
0181         c_tx->pkt.send.ddp_msn =
0182             htonl(++c_tx->ddp_msn[RDMAP_UNTAGGED_QN_SEND]);
0183         c_tx->pkt.send.ddp_mo = 0;
0184 
0185         c_tx->pkt.send_inv.inval_stag = cpu_to_be32(wqe->sqe.rkey);
0186 
0187         c_tx->ctrl_len = sizeof(struct iwarp_send_inv);
0188 
0189         crc = (char *)&c_tx->pkt.send_pkt.crc;
0190         data = siw_try_1seg(c_tx, crc);
0191         break;
0192 
0193     case SIW_OP_WRITE:
0194         memcpy(&c_tx->pkt.ctrl, &iwarp_pktinfo[RDMAP_RDMA_WRITE].ctrl,
0195                sizeof(struct iwarp_ctrl));
0196 
0197         c_tx->pkt.rwrite.sink_stag = htonl(wqe->sqe.rkey);
0198         c_tx->pkt.rwrite.sink_to = cpu_to_be64(wqe->sqe.raddr);
0199         c_tx->ctrl_len = sizeof(struct iwarp_rdma_write);
0200 
0201         crc = (char *)&c_tx->pkt.write_pkt.crc;
0202         data = siw_try_1seg(c_tx, crc);
0203         break;
0204 
0205     case SIW_OP_READ_RESPONSE:
0206         memcpy(&c_tx->pkt.ctrl,
0207                &iwarp_pktinfo[RDMAP_RDMA_READ_RESP].ctrl,
0208                sizeof(struct iwarp_ctrl));
0209 
0210         /* NBO */
0211         c_tx->pkt.rresp.sink_stag = cpu_to_be32(wqe->sqe.rkey);
0212         c_tx->pkt.rresp.sink_to = cpu_to_be64(wqe->sqe.raddr);
0213 
0214         c_tx->ctrl_len = sizeof(struct iwarp_rdma_rresp);
0215 
0216         crc = (char *)&c_tx->pkt.write_pkt.crc;
0217         data = siw_try_1seg(c_tx, crc);
0218         break;
0219 
0220     default:
0221         siw_dbg_qp(tx_qp(c_tx), "stale wqe type %d\n", tx_type(wqe));
0222         return -EOPNOTSUPP;
0223     }
0224     if (unlikely(data < 0))
0225         return data;
0226 
0227     c_tx->ctrl_sent = 0;
0228 
0229     if (data <= MAX_HDR_INLINE) {
0230         if (data) {
0231             wqe->processed = data;
0232 
0233             c_tx->pkt.ctrl.mpa_len =
0234                 htons(c_tx->ctrl_len + data - MPA_HDR_SIZE);
0235 
0236             /* Add pad, if needed */
0237             data += -(int)data & 0x3;
0238             /* advance CRC location after payload */
0239             crc += data;
0240             c_tx->ctrl_len += data;
0241 
0242             if (!(c_tx->pkt.ctrl.ddp_rdmap_ctrl & DDP_FLAG_TAGGED))
0243                 c_tx->pkt.c_untagged.ddp_mo = 0;
0244             else
0245                 c_tx->pkt.c_tagged.ddp_to =
0246                     cpu_to_be64(wqe->sqe.raddr);
0247         }
0248 
0249         *(u32 *)crc = 0;
0250         /*
0251          * Do complete CRC if enabled and short packet
0252          */
0253         if (c_tx->mpa_crc_hd) {
0254             crypto_shash_init(c_tx->mpa_crc_hd);
0255             if (crypto_shash_update(c_tx->mpa_crc_hd,
0256                         (u8 *)&c_tx->pkt,
0257                         c_tx->ctrl_len))
0258                 return -EINVAL;
0259             crypto_shash_final(c_tx->mpa_crc_hd, (u8 *)crc);
0260         }
0261         c_tx->ctrl_len += MPA_CRC_SIZE;
0262 
0263         return PKT_COMPLETE;
0264     }
0265     c_tx->ctrl_len += MPA_CRC_SIZE;
0266     c_tx->sge_idx = 0;
0267     c_tx->sge_off = 0;
0268     c_tx->pbl_idx = 0;
0269 
0270     /*
0271      * Allow direct sending out of user buffer if WR is non signalled
0272      * and payload is over threshold.
0273      * Per RDMA verbs, the application should not change the send buffer
0274      * until the work completed. In iWarp, work completion is only
0275      * local delivery to TCP. TCP may reuse the buffer for
0276      * retransmission. Changing unsent data also breaks the CRC,
0277      * if applied.
0278      */
0279     if (c_tx->zcopy_tx && wqe->bytes >= SENDPAGE_THRESH &&
0280         !(tx_flags(wqe) & SIW_WQE_SIGNALLED))
0281         c_tx->use_sendpage = 1;
0282     else
0283         c_tx->use_sendpage = 0;
0284 
0285     return PKT_FRAGMENTED;
0286 }
0287 
0288 /*
0289  * Send out one complete control type FPDU, or header of FPDU carrying
0290  * data. Used for fixed sized packets like Read.Requests or zero length
0291  * SENDs, WRITEs, READ.Responses, or header only.
0292  */
0293 static int siw_tx_ctrl(struct siw_iwarp_tx *c_tx, struct socket *s,
0294                   int flags)
0295 {
0296     struct msghdr msg = { .msg_flags = flags };
0297     struct kvec iov = { .iov_base =
0298                     (char *)&c_tx->pkt.ctrl + c_tx->ctrl_sent,
0299                 .iov_len = c_tx->ctrl_len - c_tx->ctrl_sent };
0300 
0301     int rv = kernel_sendmsg(s, &msg, &iov, 1,
0302                 c_tx->ctrl_len - c_tx->ctrl_sent);
0303 
0304     if (rv >= 0) {
0305         c_tx->ctrl_sent += rv;
0306 
0307         if (c_tx->ctrl_sent == c_tx->ctrl_len)
0308             rv = 0;
0309         else
0310             rv = -EAGAIN;
0311     }
0312     return rv;
0313 }
0314 
0315 /*
0316  * 0copy TCP transmit interface: Use do_tcp_sendpages.
0317  *
0318  * Using sendpage to push page by page appears to be less efficient
0319  * than using sendmsg, even if data are copied.
0320  *
0321  * A general performance limitation might be the extra four bytes
0322  * trailer checksum segment to be pushed after user data.
0323  */
0324 static int siw_tcp_sendpages(struct socket *s, struct page **page, int offset,
0325                  size_t size)
0326 {
0327     struct sock *sk = s->sk;
0328     int i = 0, rv = 0, sent = 0,
0329         flags = MSG_MORE | MSG_DONTWAIT | MSG_SENDPAGE_NOTLAST;
0330 
0331     while (size) {
0332         size_t bytes = min_t(size_t, PAGE_SIZE - offset, size);
0333 
0334         if (size + offset <= PAGE_SIZE)
0335             flags = MSG_MORE | MSG_DONTWAIT;
0336 
0337         tcp_rate_check_app_limited(sk);
0338 try_page_again:
0339         lock_sock(sk);
0340         rv = do_tcp_sendpages(sk, page[i], offset, bytes, flags);
0341         release_sock(sk);
0342 
0343         if (rv > 0) {
0344             size -= rv;
0345             sent += rv;
0346             if (rv != bytes) {
0347                 offset += rv;
0348                 bytes -= rv;
0349                 goto try_page_again;
0350             }
0351             offset = 0;
0352         } else {
0353             if (rv == -EAGAIN || rv == 0)
0354                 break;
0355             return rv;
0356         }
0357         i++;
0358     }
0359     return sent;
0360 }
0361 
0362 /*
0363  * siw_0copy_tx()
0364  *
0365  * Pushes list of pages to TCP socket. If pages from multiple
0366  * SGE's, all referenced pages of each SGE are pushed in one
0367  * shot.
0368  */
0369 static int siw_0copy_tx(struct socket *s, struct page **page,
0370             struct siw_sge *sge, unsigned int offset,
0371             unsigned int size)
0372 {
0373     int i = 0, sent = 0, rv;
0374     int sge_bytes = min(sge->length - offset, size);
0375 
0376     offset = (sge->laddr + offset) & ~PAGE_MASK;
0377 
0378     while (sent != size) {
0379         rv = siw_tcp_sendpages(s, &page[i], offset, sge_bytes);
0380         if (rv >= 0) {
0381             sent += rv;
0382             if (size == sent || sge_bytes > rv)
0383                 break;
0384 
0385             i += PAGE_ALIGN(sge_bytes + offset) >> PAGE_SHIFT;
0386             sge++;
0387             sge_bytes = min(sge->length, size - sent);
0388             offset = sge->laddr & ~PAGE_MASK;
0389         } else {
0390             sent = rv;
0391             break;
0392         }
0393     }
0394     return sent;
0395 }
0396 
0397 #define MAX_TRAILER (MPA_CRC_SIZE + 4)
0398 
0399 static void siw_unmap_pages(struct kvec *iov, unsigned long kmap_mask, int len)
0400 {
0401     int i;
0402 
0403     /*
0404      * Work backwards through the array to honor the kmap_local_page()
0405      * ordering requirements.
0406      */
0407     for (i = (len-1); i >= 0; i--) {
0408         if (kmap_mask & BIT(i)) {
0409             unsigned long addr = (unsigned long)iov[i].iov_base;
0410 
0411             kunmap_local((void *)(addr & PAGE_MASK));
0412         }
0413     }
0414 }
0415 
0416 /*
0417  * siw_tx_hdt() tries to push a complete packet to TCP where all
0418  * packet fragments are referenced by the elements of one iovec.
0419  * For the data portion, each involved page must be referenced by
0420  * one extra element. All sge's data can be non-aligned to page
0421  * boundaries. Two more elements are referencing iWARP header
0422  * and trailer:
0423  * MAX_ARRAY = 64KB/PAGE_SIZE + 1 + (2 * (SIW_MAX_SGE - 1) + HDR + TRL
0424  */
0425 #define MAX_ARRAY ((0xffff / PAGE_SIZE) + 1 + (2 * (SIW_MAX_SGE - 1) + 2))
0426 
0427 /*
0428  * Write out iov referencing hdr, data and trailer of current FPDU.
0429  * Update transmit state dependent on write return status
0430  */
0431 static int siw_tx_hdt(struct siw_iwarp_tx *c_tx, struct socket *s)
0432 {
0433     struct siw_wqe *wqe = &c_tx->wqe_active;
0434     struct siw_sge *sge = &wqe->sqe.sge[c_tx->sge_idx];
0435     struct kvec iov[MAX_ARRAY];
0436     struct page *page_array[MAX_ARRAY];
0437     struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_EOR };
0438 
0439     int seg = 0, do_crc = c_tx->do_crc, is_kva = 0, rv;
0440     unsigned int data_len = c_tx->bytes_unsent, hdr_len = 0, trl_len = 0,
0441              sge_off = c_tx->sge_off, sge_idx = c_tx->sge_idx,
0442              pbl_idx = c_tx->pbl_idx;
0443     unsigned long kmap_mask = 0L;
0444 
0445     if (c_tx->state == SIW_SEND_HDR) {
0446         if (c_tx->use_sendpage) {
0447             rv = siw_tx_ctrl(c_tx, s, MSG_DONTWAIT | MSG_MORE);
0448             if (rv)
0449                 goto done;
0450 
0451             c_tx->state = SIW_SEND_DATA;
0452         } else {
0453             iov[0].iov_base =
0454                 (char *)&c_tx->pkt.ctrl + c_tx->ctrl_sent;
0455             iov[0].iov_len = hdr_len =
0456                 c_tx->ctrl_len - c_tx->ctrl_sent;
0457             seg = 1;
0458         }
0459     }
0460 
0461     wqe->processed += data_len;
0462 
0463     while (data_len) { /* walk the list of SGE's */
0464         unsigned int sge_len = min(sge->length - sge_off, data_len);
0465         unsigned int fp_off = (sge->laddr + sge_off) & ~PAGE_MASK;
0466         struct siw_mem *mem;
0467 
0468         if (!(tx_flags(wqe) & SIW_WQE_INLINE)) {
0469             mem = wqe->mem[sge_idx];
0470             is_kva = mem->mem_obj == NULL ? 1 : 0;
0471         } else {
0472             is_kva = 1;
0473         }
0474         if (is_kva && !c_tx->use_sendpage) {
0475             /*
0476              * tx from kernel virtual address: either inline data
0477              * or memory region with assigned kernel buffer
0478              */
0479             iov[seg].iov_base =
0480                 (void *)(uintptr_t)(sge->laddr + sge_off);
0481             iov[seg].iov_len = sge_len;
0482 
0483             if (do_crc)
0484                 crypto_shash_update(c_tx->mpa_crc_hd,
0485                             iov[seg].iov_base,
0486                             sge_len);
0487             sge_off += sge_len;
0488             data_len -= sge_len;
0489             seg++;
0490             goto sge_done;
0491         }
0492 
0493         while (sge_len) {
0494             size_t plen = min((int)PAGE_SIZE - fp_off, sge_len);
0495             void *kaddr;
0496 
0497             if (!is_kva) {
0498                 struct page *p;
0499 
0500                 if (mem->is_pbl)
0501                     p = siw_get_pblpage(
0502                         mem, sge->laddr + sge_off,
0503                         &pbl_idx);
0504                 else
0505                     p = siw_get_upage(mem->umem,
0506                               sge->laddr + sge_off);
0507                 if (unlikely(!p)) {
0508                     siw_unmap_pages(iov, kmap_mask, seg);
0509                     wqe->processed -= c_tx->bytes_unsent;
0510                     rv = -EFAULT;
0511                     goto done_crc;
0512                 }
0513                 page_array[seg] = p;
0514 
0515                 if (!c_tx->use_sendpage) {
0516                     void *kaddr = kmap_local_page(p);
0517 
0518                     /* Remember for later kunmap() */
0519                     kmap_mask |= BIT(seg);
0520                     iov[seg].iov_base = kaddr + fp_off;
0521                     iov[seg].iov_len = plen;
0522 
0523                     if (do_crc)
0524                         crypto_shash_update(
0525                             c_tx->mpa_crc_hd,
0526                             iov[seg].iov_base,
0527                             plen);
0528                 } else if (do_crc) {
0529                     kaddr = kmap_local_page(p);
0530                     crypto_shash_update(c_tx->mpa_crc_hd,
0531                                 kaddr + fp_off,
0532                                 plen);
0533                     kunmap_local(kaddr);
0534                 }
0535             } else {
0536                 /*
0537                  * Cast to an uintptr_t to preserve all 64 bits
0538                  * in sge->laddr.
0539                  */
0540                 uintptr_t va = (uintptr_t)(sge->laddr + sge_off);
0541 
0542                 /*
0543                  * virt_to_page() takes a (void *) pointer
0544                  * so cast to a (void *) meaning it will be 64
0545                  * bits on a 64 bit platform and 32 bits on a
0546                  * 32 bit platform.
0547                  */
0548                 page_array[seg] = virt_to_page((void *)(va & PAGE_MASK));
0549                 if (do_crc)
0550                     crypto_shash_update(
0551                         c_tx->mpa_crc_hd,
0552                         (void *)va,
0553                         plen);
0554             }
0555 
0556             sge_len -= plen;
0557             sge_off += plen;
0558             data_len -= plen;
0559             fp_off = 0;
0560 
0561             if (++seg > (int)MAX_ARRAY) {
0562                 siw_dbg_qp(tx_qp(c_tx), "to many fragments\n");
0563                 siw_unmap_pages(iov, kmap_mask, seg-1);
0564                 wqe->processed -= c_tx->bytes_unsent;
0565                 rv = -EMSGSIZE;
0566                 goto done_crc;
0567             }
0568         }
0569 sge_done:
0570         /* Update SGE variables at end of SGE */
0571         if (sge_off == sge->length &&
0572             (data_len != 0 || wqe->processed < wqe->bytes)) {
0573             sge_idx++;
0574             sge++;
0575             sge_off = 0;
0576         }
0577     }
0578     /* trailer */
0579     if (likely(c_tx->state != SIW_SEND_TRAILER)) {
0580         iov[seg].iov_base = &c_tx->trailer.pad[4 - c_tx->pad];
0581         iov[seg].iov_len = trl_len = MAX_TRAILER - (4 - c_tx->pad);
0582     } else {
0583         iov[seg].iov_base = &c_tx->trailer.pad[c_tx->ctrl_sent];
0584         iov[seg].iov_len = trl_len = MAX_TRAILER - c_tx->ctrl_sent;
0585     }
0586 
0587     if (c_tx->pad) {
0588         *(u32 *)c_tx->trailer.pad = 0;
0589         if (do_crc)
0590             crypto_shash_update(c_tx->mpa_crc_hd,
0591                 (u8 *)&c_tx->trailer.crc - c_tx->pad,
0592                 c_tx->pad);
0593     }
0594     if (!c_tx->mpa_crc_hd)
0595         c_tx->trailer.crc = 0;
0596     else if (do_crc)
0597         crypto_shash_final(c_tx->mpa_crc_hd, (u8 *)&c_tx->trailer.crc);
0598 
0599     data_len = c_tx->bytes_unsent;
0600 
0601     if (c_tx->use_sendpage) {
0602         rv = siw_0copy_tx(s, page_array, &wqe->sqe.sge[c_tx->sge_idx],
0603                   c_tx->sge_off, data_len);
0604         if (rv == data_len) {
0605             rv = kernel_sendmsg(s, &msg, &iov[seg], 1, trl_len);
0606             if (rv > 0)
0607                 rv += data_len;
0608             else
0609                 rv = data_len;
0610         }
0611     } else {
0612         rv = kernel_sendmsg(s, &msg, iov, seg + 1,
0613                     hdr_len + data_len + trl_len);
0614         siw_unmap_pages(iov, kmap_mask, seg);
0615     }
0616     if (rv < (int)hdr_len) {
0617         /* Not even complete hdr pushed or negative rv */
0618         wqe->processed -= data_len;
0619         if (rv >= 0) {
0620             c_tx->ctrl_sent += rv;
0621             rv = -EAGAIN;
0622         }
0623         goto done_crc;
0624     }
0625     rv -= hdr_len;
0626 
0627     if (rv >= (int)data_len) {
0628         /* all user data pushed to TCP or no data to push */
0629         if (data_len > 0 && wqe->processed < wqe->bytes) {
0630             /* Save the current state for next tx */
0631             c_tx->sge_idx = sge_idx;
0632             c_tx->sge_off = sge_off;
0633             c_tx->pbl_idx = pbl_idx;
0634         }
0635         rv -= data_len;
0636 
0637         if (rv == trl_len) /* all pushed */
0638             rv = 0;
0639         else {
0640             c_tx->state = SIW_SEND_TRAILER;
0641             c_tx->ctrl_len = MAX_TRAILER;
0642             c_tx->ctrl_sent = rv + 4 - c_tx->pad;
0643             c_tx->bytes_unsent = 0;
0644             rv = -EAGAIN;
0645         }
0646 
0647     } else if (data_len > 0) {
0648         /* Maybe some user data pushed to TCP */
0649         c_tx->state = SIW_SEND_DATA;
0650         wqe->processed -= data_len - rv;
0651 
0652         if (rv) {
0653             /*
0654              * Some bytes out. Recompute tx state based
0655              * on old state and bytes pushed
0656              */
0657             unsigned int sge_unsent;
0658 
0659             c_tx->bytes_unsent -= rv;
0660             sge = &wqe->sqe.sge[c_tx->sge_idx];
0661             sge_unsent = sge->length - c_tx->sge_off;
0662 
0663             while (sge_unsent <= rv) {
0664                 rv -= sge_unsent;
0665                 c_tx->sge_idx++;
0666                 c_tx->sge_off = 0;
0667                 sge++;
0668                 sge_unsent = sge->length;
0669             }
0670             c_tx->sge_off += rv;
0671         }
0672         rv = -EAGAIN;
0673     }
0674 done_crc:
0675     c_tx->do_crc = 0;
0676 done:
0677     return rv;
0678 }
0679 
0680 static void siw_update_tcpseg(struct siw_iwarp_tx *c_tx,
0681                      struct socket *s)
0682 {
0683     struct tcp_sock *tp = tcp_sk(s->sk);
0684 
0685     if (tp->gso_segs) {
0686         if (c_tx->gso_seg_limit == 0)
0687             c_tx->tcp_seglen = tp->mss_cache * tp->gso_segs;
0688         else
0689             c_tx->tcp_seglen =
0690                 tp->mss_cache *
0691                 min_t(u16, c_tx->gso_seg_limit, tp->gso_segs);
0692     } else {
0693         c_tx->tcp_seglen = tp->mss_cache;
0694     }
0695     /* Loopback may give odd numbers */
0696     c_tx->tcp_seglen &= 0xfffffff8;
0697 }
0698 
0699 /*
0700  * siw_prepare_fpdu()
0701  *
0702  * Prepares transmit context to send out one FPDU if FPDU will contain
0703  * user data and user data are not immediate data.
0704  * Computes maximum FPDU length to fill up TCP MSS if possible.
0705  *
0706  * @qp:     QP from which to transmit
0707  * @wqe:    Current WQE causing transmission
0708  *
0709  * TODO: Take into account real available sendspace on socket
0710  *       to avoid header misalignment due to send pausing within
0711  *       fpdu transmission
0712  */
0713 static void siw_prepare_fpdu(struct siw_qp *qp, struct siw_wqe *wqe)
0714 {
0715     struct siw_iwarp_tx *c_tx = &qp->tx_ctx;
0716     int data_len;
0717 
0718     c_tx->ctrl_len =
0719         iwarp_pktinfo[__rdmap_get_opcode(&c_tx->pkt.ctrl)].hdr_len;
0720     c_tx->ctrl_sent = 0;
0721 
0722     /*
0723      * Update target buffer offset if any
0724      */
0725     if (!(c_tx->pkt.ctrl.ddp_rdmap_ctrl & DDP_FLAG_TAGGED))
0726         /* Untagged message */
0727         c_tx->pkt.c_untagged.ddp_mo = cpu_to_be32(wqe->processed);
0728     else /* Tagged message */
0729         c_tx->pkt.c_tagged.ddp_to =
0730             cpu_to_be64(wqe->sqe.raddr + wqe->processed);
0731 
0732     data_len = wqe->bytes - wqe->processed;
0733     if (data_len + c_tx->ctrl_len + MPA_CRC_SIZE > c_tx->tcp_seglen) {
0734         /* Trim DDP payload to fit into current TCP segment */
0735         data_len = c_tx->tcp_seglen - (c_tx->ctrl_len + MPA_CRC_SIZE);
0736         c_tx->pkt.ctrl.ddp_rdmap_ctrl &= ~DDP_FLAG_LAST;
0737         c_tx->pad = 0;
0738     } else {
0739         c_tx->pkt.ctrl.ddp_rdmap_ctrl |= DDP_FLAG_LAST;
0740         c_tx->pad = -data_len & 0x3;
0741     }
0742     c_tx->bytes_unsent = data_len;
0743 
0744     c_tx->pkt.ctrl.mpa_len =
0745         htons(c_tx->ctrl_len + data_len - MPA_HDR_SIZE);
0746 
0747     /*
0748      * Init MPA CRC computation
0749      */
0750     if (c_tx->mpa_crc_hd) {
0751         crypto_shash_init(c_tx->mpa_crc_hd);
0752         crypto_shash_update(c_tx->mpa_crc_hd, (u8 *)&c_tx->pkt,
0753                     c_tx->ctrl_len);
0754         c_tx->do_crc = 1;
0755     }
0756 }
0757 
0758 /*
0759  * siw_check_sgl_tx()
0760  *
0761  * Check permissions for a list of SGE's (SGL).
0762  * A successful check will have all memory referenced
0763  * for transmission resolved and assigned to the WQE.
0764  *
0765  * @pd:     Protection Domain SGL should belong to
0766  * @wqe:    WQE to be checked
0767  * @perms:  requested access permissions
0768  *
0769  */
0770 
0771 static int siw_check_sgl_tx(struct ib_pd *pd, struct siw_wqe *wqe,
0772                 enum ib_access_flags perms)
0773 {
0774     struct siw_sge *sge = &wqe->sqe.sge[0];
0775     int i, len, num_sge = wqe->sqe.num_sge;
0776 
0777     if (unlikely(num_sge > SIW_MAX_SGE))
0778         return -EINVAL;
0779 
0780     for (i = 0, len = 0; num_sge; num_sge--, i++, sge++) {
0781         /*
0782          * rdma verbs: do not check stag for a zero length sge
0783          */
0784         if (sge->length) {
0785             int rv = siw_check_sge(pd, sge, &wqe->mem[i], perms, 0,
0786                            sge->length);
0787 
0788             if (unlikely(rv != E_ACCESS_OK))
0789                 return rv;
0790         }
0791         len += sge->length;
0792     }
0793     return len;
0794 }
0795 
0796 /*
0797  * siw_qp_sq_proc_tx()
0798  *
0799  * Process one WQE which needs transmission on the wire.
0800  */
0801 static int siw_qp_sq_proc_tx(struct siw_qp *qp, struct siw_wqe *wqe)
0802 {
0803     struct siw_iwarp_tx *c_tx = &qp->tx_ctx;
0804     struct socket *s = qp->attrs.sk;
0805     int rv = 0, burst_len = qp->tx_ctx.burst;
0806     enum rdmap_ecode ecode = RDMAP_ECODE_CATASTROPHIC_STREAM;
0807 
0808     if (unlikely(wqe->wr_status == SIW_WR_IDLE))
0809         return 0;
0810 
0811     if (!burst_len)
0812         burst_len = SQ_USER_MAXBURST;
0813 
0814     if (wqe->wr_status == SIW_WR_QUEUED) {
0815         if (!(wqe->sqe.flags & SIW_WQE_INLINE)) {
0816             if (tx_type(wqe) == SIW_OP_READ_RESPONSE)
0817                 wqe->sqe.num_sge = 1;
0818 
0819             if (tx_type(wqe) != SIW_OP_READ &&
0820                 tx_type(wqe) != SIW_OP_READ_LOCAL_INV) {
0821                 /*
0822                  * Reference memory to be tx'd w/o checking
0823                  * access for LOCAL_READ permission, since
0824                  * not defined in RDMA core.
0825                  */
0826                 rv = siw_check_sgl_tx(qp->pd, wqe, 0);
0827                 if (rv < 0) {
0828                     if (tx_type(wqe) ==
0829                         SIW_OP_READ_RESPONSE)
0830                         ecode = siw_rdmap_error(-rv);
0831                     rv = -EINVAL;
0832                     goto tx_error;
0833                 }
0834                 wqe->bytes = rv;
0835             } else {
0836                 wqe->bytes = 0;
0837             }
0838         } else {
0839             wqe->bytes = wqe->sqe.sge[0].length;
0840             if (!rdma_is_kernel_res(&qp->base_qp.res)) {
0841                 if (wqe->bytes > SIW_MAX_INLINE) {
0842                     rv = -EINVAL;
0843                     goto tx_error;
0844                 }
0845                 wqe->sqe.sge[0].laddr =
0846                     (u64)(uintptr_t)&wqe->sqe.sge[1];
0847             }
0848         }
0849         wqe->wr_status = SIW_WR_INPROGRESS;
0850         wqe->processed = 0;
0851 
0852         siw_update_tcpseg(c_tx, s);
0853 
0854         rv = siw_qp_prepare_tx(c_tx);
0855         if (rv == PKT_FRAGMENTED) {
0856             c_tx->state = SIW_SEND_HDR;
0857             siw_prepare_fpdu(qp, wqe);
0858         } else if (rv == PKT_COMPLETE) {
0859             c_tx->state = SIW_SEND_SHORT_FPDU;
0860         } else {
0861             goto tx_error;
0862         }
0863     }
0864 
0865 next_segment:
0866     siw_dbg_qp(qp, "wr type %d, state %d, data %u, sent %u, id %llx\n",
0867            tx_type(wqe), wqe->wr_status, wqe->bytes, wqe->processed,
0868            wqe->sqe.id);
0869 
0870     if (--burst_len == 0) {
0871         rv = -EINPROGRESS;
0872         goto tx_done;
0873     }
0874     if (c_tx->state == SIW_SEND_SHORT_FPDU) {
0875         enum siw_opcode tx_type = tx_type(wqe);
0876         unsigned int msg_flags;
0877 
0878         if (siw_sq_empty(qp) || !siw_tcp_nagle || burst_len == 1)
0879             /*
0880              * End current TCP segment, if SQ runs empty,
0881              * or siw_tcp_nagle is not set, or we bail out
0882              * soon due to no burst credit left.
0883              */
0884             msg_flags = MSG_DONTWAIT;
0885         else
0886             msg_flags = MSG_DONTWAIT | MSG_MORE;
0887 
0888         rv = siw_tx_ctrl(c_tx, s, msg_flags);
0889 
0890         if (!rv && tx_type != SIW_OP_READ &&
0891             tx_type != SIW_OP_READ_LOCAL_INV)
0892             wqe->processed = wqe->bytes;
0893 
0894         goto tx_done;
0895 
0896     } else {
0897         rv = siw_tx_hdt(c_tx, s);
0898     }
0899     if (!rv) {
0900         /*
0901          * One segment sent. Processing completed if last
0902          * segment, Do next segment otherwise.
0903          */
0904         if (unlikely(c_tx->tx_suspend)) {
0905             /*
0906              * Verbs, 6.4.: Try stopping sending after a full
0907              * DDP segment if the connection goes down
0908              * (== peer halfclose)
0909              */
0910             rv = -ECONNABORTED;
0911             goto tx_done;
0912         }
0913         if (c_tx->pkt.ctrl.ddp_rdmap_ctrl & DDP_FLAG_LAST) {
0914             siw_dbg_qp(qp, "WQE completed\n");
0915             goto tx_done;
0916         }
0917         c_tx->state = SIW_SEND_HDR;
0918 
0919         siw_update_tcpseg(c_tx, s);
0920 
0921         siw_prepare_fpdu(qp, wqe);
0922         goto next_segment;
0923     }
0924 tx_done:
0925     qp->tx_ctx.burst = burst_len;
0926     return rv;
0927 
0928 tx_error:
0929     if (ecode != RDMAP_ECODE_CATASTROPHIC_STREAM)
0930         siw_init_terminate(qp, TERM_ERROR_LAYER_RDMAP,
0931                    RDMAP_ETYPE_REMOTE_PROTECTION, ecode, 1);
0932     else
0933         siw_init_terminate(qp, TERM_ERROR_LAYER_RDMAP,
0934                    RDMAP_ETYPE_CATASTROPHIC,
0935                    RDMAP_ECODE_UNSPECIFIED, 1);
0936     return rv;
0937 }
0938 
0939 static int siw_fastreg_mr(struct ib_pd *pd, struct siw_sqe *sqe)
0940 {
0941     struct ib_mr *base_mr = (struct ib_mr *)(uintptr_t)sqe->base_mr;
0942     struct siw_device *sdev = to_siw_dev(pd->device);
0943     struct siw_mem *mem;
0944     int rv = 0;
0945 
0946     siw_dbg_pd(pd, "STag 0x%08x\n", sqe->rkey);
0947 
0948     if (unlikely(!base_mr)) {
0949         pr_warn("siw: fastreg: STag 0x%08x unknown\n", sqe->rkey);
0950         return -EINVAL;
0951     }
0952 
0953     if (unlikely(base_mr->rkey >> 8 != sqe->rkey  >> 8)) {
0954         pr_warn("siw: fastreg: STag 0x%08x: bad MR\n", sqe->rkey);
0955         return -EINVAL;
0956     }
0957 
0958     mem = siw_mem_id2obj(sdev, sqe->rkey  >> 8);
0959     if (unlikely(!mem)) {
0960         pr_warn("siw: fastreg: STag 0x%08x unknown\n", sqe->rkey);
0961         return -EINVAL;
0962     }
0963 
0964     if (unlikely(mem->pd != pd)) {
0965         pr_warn("siw: fastreg: PD mismatch\n");
0966         rv = -EINVAL;
0967         goto out;
0968     }
0969     if (unlikely(mem->stag_valid)) {
0970         pr_warn("siw: fastreg: STag 0x%08x already valid\n", sqe->rkey);
0971         rv = -EINVAL;
0972         goto out;
0973     }
0974     /* Refresh STag since user may have changed key part */
0975     mem->stag = sqe->rkey;
0976     mem->perms = sqe->access;
0977 
0978     siw_dbg_mem(mem, "STag 0x%08x now valid\n", sqe->rkey);
0979     mem->va = base_mr->iova;
0980     mem->stag_valid = 1;
0981 out:
0982     siw_mem_put(mem);
0983     return rv;
0984 }
0985 
0986 static int siw_qp_sq_proc_local(struct siw_qp *qp, struct siw_wqe *wqe)
0987 {
0988     int rv;
0989 
0990     switch (tx_type(wqe)) {
0991     case SIW_OP_REG_MR:
0992         rv = siw_fastreg_mr(qp->pd, &wqe->sqe);
0993         break;
0994 
0995     case SIW_OP_INVAL_STAG:
0996         rv = siw_invalidate_stag(qp->pd, wqe->sqe.rkey);
0997         break;
0998 
0999     default:
1000         rv = -EINVAL;
1001     }
1002     return rv;
1003 }
1004 
1005 /*
1006  * siw_qp_sq_process()
1007  *
1008  * Core TX path routine for RDMAP/DDP/MPA using a TCP kernel socket.
1009  * Sends RDMAP payload for the current SQ WR @wqe of @qp in one or more
1010  * MPA FPDUs, each containing a DDP segment.
1011  *
1012  * SQ processing may occur in user context as a result of posting
1013  * new WQE's or from siw_sq_work_handler() context. Processing in
1014  * user context is limited to non-kernel verbs users.
1015  *
1016  * SQ processing may get paused anytime, possibly in the middle of a WR
1017  * or FPDU, if insufficient send space is available. SQ processing
1018  * gets resumed from siw_sq_work_handler(), if send space becomes
1019  * available again.
1020  *
1021  * Must be called with the QP state read-locked.
1022  *
1023  * Note:
1024  * An outbound RREQ can be satisfied by the corresponding RRESP
1025  * _before_ it gets assigned to the ORQ. This happens regularly
1026  * in RDMA READ via loopback case. Since both outbound RREQ and
1027  * inbound RRESP can be handled by the same CPU, locking the ORQ
1028  * is dead-lock prone and thus not an option. With that, the
1029  * RREQ gets assigned to the ORQ _before_ being sent - see
1030  * siw_activate_tx() - and pulled back in case of send failure.
1031  */
1032 int siw_qp_sq_process(struct siw_qp *qp)
1033 {
1034     struct siw_wqe *wqe = tx_wqe(qp);
1035     enum siw_opcode tx_type;
1036     unsigned long flags;
1037     int rv = 0;
1038 
1039     siw_dbg_qp(qp, "enter for type %d\n", tx_type(wqe));
1040 
1041 next_wqe:
1042     /*
1043      * Stop QP processing if SQ state changed
1044      */
1045     if (unlikely(qp->tx_ctx.tx_suspend)) {
1046         siw_dbg_qp(qp, "tx suspended\n");
1047         goto done;
1048     }
1049     tx_type = tx_type(wqe);
1050 
1051     if (tx_type <= SIW_OP_READ_RESPONSE)
1052         rv = siw_qp_sq_proc_tx(qp, wqe);
1053     else
1054         rv = siw_qp_sq_proc_local(qp, wqe);
1055 
1056     if (!rv) {
1057         /*
1058          * WQE processing done
1059          */
1060         switch (tx_type) {
1061         case SIW_OP_SEND:
1062         case SIW_OP_SEND_REMOTE_INV:
1063         case SIW_OP_WRITE:
1064             siw_wqe_put_mem(wqe, tx_type);
1065             fallthrough;
1066 
1067         case SIW_OP_INVAL_STAG:
1068         case SIW_OP_REG_MR:
1069             if (tx_flags(wqe) & SIW_WQE_SIGNALLED)
1070                 siw_sqe_complete(qp, &wqe->sqe, wqe->bytes,
1071                          SIW_WC_SUCCESS);
1072             break;
1073 
1074         case SIW_OP_READ:
1075         case SIW_OP_READ_LOCAL_INV:
1076             /*
1077              * already enqueued to ORQ queue
1078              */
1079             break;
1080 
1081         case SIW_OP_READ_RESPONSE:
1082             siw_wqe_put_mem(wqe, tx_type);
1083             break;
1084 
1085         default:
1086             WARN(1, "undefined WQE type %d\n", tx_type);
1087             rv = -EINVAL;
1088             goto done;
1089         }
1090 
1091         spin_lock_irqsave(&qp->sq_lock, flags);
1092         wqe->wr_status = SIW_WR_IDLE;
1093         rv = siw_activate_tx(qp);
1094         spin_unlock_irqrestore(&qp->sq_lock, flags);
1095 
1096         if (rv <= 0)
1097             goto done;
1098 
1099         goto next_wqe;
1100 
1101     } else if (rv == -EAGAIN) {
1102         siw_dbg_qp(qp, "sq paused: hd/tr %d of %d, data %d\n",
1103                qp->tx_ctx.ctrl_sent, qp->tx_ctx.ctrl_len,
1104                qp->tx_ctx.bytes_unsent);
1105         rv = 0;
1106         goto done;
1107     } else if (rv == -EINPROGRESS) {
1108         rv = siw_sq_start(qp);
1109         goto done;
1110     } else {
1111         /*
1112          * WQE processing failed.
1113          * Verbs 8.3.2:
1114          * o It turns any WQE into a signalled WQE.
1115          * o Local catastrophic error must be surfaced
1116          * o QP must be moved into Terminate state: done by code
1117          *   doing socket state change processing
1118          *
1119          * o TODO: Termination message must be sent.
1120          * o TODO: Implement more precise work completion errors,
1121          *         see enum ib_wc_status in ib_verbs.h
1122          */
1123         siw_dbg_qp(qp, "wqe type %d processing failed: %d\n",
1124                tx_type(wqe), rv);
1125 
1126         spin_lock_irqsave(&qp->sq_lock, flags);
1127         /*
1128          * RREQ may have already been completed by inbound RRESP!
1129          */
1130         if ((tx_type == SIW_OP_READ ||
1131              tx_type == SIW_OP_READ_LOCAL_INV) && qp->attrs.orq_size) {
1132             /* Cleanup pending entry in ORQ */
1133             qp->orq_put--;
1134             qp->orq[qp->orq_put % qp->attrs.orq_size].flags = 0;
1135         }
1136         spin_unlock_irqrestore(&qp->sq_lock, flags);
1137         /*
1138          * immediately suspends further TX processing
1139          */
1140         if (!qp->tx_ctx.tx_suspend)
1141             siw_qp_cm_drop(qp, 0);
1142 
1143         switch (tx_type) {
1144         case SIW_OP_SEND:
1145         case SIW_OP_SEND_REMOTE_INV:
1146         case SIW_OP_SEND_WITH_IMM:
1147         case SIW_OP_WRITE:
1148         case SIW_OP_READ:
1149         case SIW_OP_READ_LOCAL_INV:
1150             siw_wqe_put_mem(wqe, tx_type);
1151             fallthrough;
1152 
1153         case SIW_OP_INVAL_STAG:
1154         case SIW_OP_REG_MR:
1155             siw_sqe_complete(qp, &wqe->sqe, wqe->bytes,
1156                      SIW_WC_LOC_QP_OP_ERR);
1157 
1158             siw_qp_event(qp, IB_EVENT_QP_FATAL);
1159 
1160             break;
1161 
1162         case SIW_OP_READ_RESPONSE:
1163             siw_dbg_qp(qp, "proc. read.response failed: %d\n", rv);
1164 
1165             siw_qp_event(qp, IB_EVENT_QP_REQ_ERR);
1166 
1167             siw_wqe_put_mem(wqe, SIW_OP_READ_RESPONSE);
1168 
1169             break;
1170 
1171         default:
1172             WARN(1, "undefined WQE type %d\n", tx_type);
1173             rv = -EINVAL;
1174         }
1175         wqe->wr_status = SIW_WR_IDLE;
1176     }
1177 done:
1178     return rv;
1179 }
1180 
1181 static void siw_sq_resume(struct siw_qp *qp)
1182 {
1183     if (down_read_trylock(&qp->state_lock)) {
1184         if (likely(qp->attrs.state == SIW_QP_STATE_RTS &&
1185                !qp->tx_ctx.tx_suspend)) {
1186             int rv = siw_qp_sq_process(qp);
1187 
1188             up_read(&qp->state_lock);
1189 
1190             if (unlikely(rv < 0)) {
1191                 siw_dbg_qp(qp, "SQ task failed: err %d\n", rv);
1192 
1193                 if (!qp->tx_ctx.tx_suspend)
1194                     siw_qp_cm_drop(qp, 0);
1195             }
1196         } else {
1197             up_read(&qp->state_lock);
1198         }
1199     } else {
1200         siw_dbg_qp(qp, "Resume SQ while QP locked\n");
1201     }
1202     siw_qp_put(qp);
1203 }
1204 
1205 struct tx_task_t {
1206     struct llist_head active;
1207     wait_queue_head_t waiting;
1208 };
1209 
1210 static DEFINE_PER_CPU(struct tx_task_t, siw_tx_task_g);
1211 
1212 void siw_stop_tx_thread(int nr_cpu)
1213 {
1214     kthread_stop(siw_tx_thread[nr_cpu]);
1215     wake_up(&per_cpu(siw_tx_task_g, nr_cpu).waiting);
1216 }
1217 
1218 int siw_run_sq(void *data)
1219 {
1220     const int nr_cpu = (unsigned int)(long)data;
1221     struct llist_node *active;
1222     struct siw_qp *qp;
1223     struct tx_task_t *tx_task = &per_cpu(siw_tx_task_g, nr_cpu);
1224 
1225     init_llist_head(&tx_task->active);
1226     init_waitqueue_head(&tx_task->waiting);
1227 
1228     while (1) {
1229         struct llist_node *fifo_list = NULL;
1230 
1231         wait_event_interruptible(tx_task->waiting,
1232                      !llist_empty(&tx_task->active) ||
1233                          kthread_should_stop());
1234 
1235         if (kthread_should_stop())
1236             break;
1237 
1238         active = llist_del_all(&tx_task->active);
1239         /*
1240          * llist_del_all returns a list with newest entry first.
1241          * Re-order list for fairness among QP's.
1242          */
1243         while (active) {
1244             struct llist_node *tmp = active;
1245 
1246             active = llist_next(active);
1247             tmp->next = fifo_list;
1248             fifo_list = tmp;
1249         }
1250         while (fifo_list) {
1251             qp = container_of(fifo_list, struct siw_qp, tx_list);
1252             fifo_list = llist_next(fifo_list);
1253             qp->tx_list.next = NULL;
1254 
1255             siw_sq_resume(qp);
1256         }
1257     }
1258     active = llist_del_all(&tx_task->active);
1259     if (active) {
1260         llist_for_each_entry(qp, active, tx_list) {
1261             qp->tx_list.next = NULL;
1262             siw_sq_resume(qp);
1263         }
1264     }
1265     return 0;
1266 }
1267 
1268 int siw_sq_start(struct siw_qp *qp)
1269 {
1270     if (tx_wqe(qp)->wr_status == SIW_WR_IDLE)
1271         return 0;
1272 
1273     if (unlikely(!cpu_online(qp->tx_cpu))) {
1274         siw_put_tx_cpu(qp->tx_cpu);
1275         qp->tx_cpu = siw_get_tx_cpu(qp->sdev);
1276         if (qp->tx_cpu < 0) {
1277             pr_warn("siw: no tx cpu available\n");
1278 
1279             return -EIO;
1280         }
1281     }
1282     siw_qp_get(qp);
1283 
1284     llist_add(&qp->tx_list, &per_cpu(siw_tx_task_g, qp->tx_cpu).active);
1285 
1286     wake_up(&per_cpu(siw_tx_task_g, qp->tx_cpu).waiting);
1287 
1288     return 0;
1289 }