Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
0003  *
0004  * This software is available to you under a choice of one of two
0005  * licenses.  You may choose to be licensed under the terms of the GNU
0006  * General Public License (GPL) Version 2, available from the file
0007  * COPYING in the main directory of this source tree, or the
0008  * OpenIB.org BSD license below:
0009  *
0010  *     Redistribution and use in source and binary forms, with or
0011  *     without modification, are permitted provided that the following
0012  *     conditions are met:
0013  *
0014  *      - Redistributions of source code must retain the above
0015  *        copyright notice, this list of conditions and the following
0016  *        disclaimer.
0017  *
0018  *      - Redistributions in binary form must reproduce the above
0019  *        copyright notice, this list of conditions and the following
0020  *        disclaimer in the documentation and/or other materials
0021  *        provided with the distribution.
0022  *
0023  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
0024  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
0025  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
0026  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
0027  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
0028  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
0029  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
0030  * SOFTWARE.
0031  *
0032  */
0033 #include <linux/kernel.h>
0034 #include <linux/in.h>
0035 #include <linux/device.h>
0036 #include <linux/dmapool.h>
0037 #include <linux/ratelimit.h>
0038 
0039 #include "rds_single_path.h"
0040 #include "rds.h"
0041 #include "ib.h"
0042 #include "ib_mr.h"
0043 
0044 /*
0045  * Convert IB-specific error message to RDS error message and call core
0046  * completion handler.
0047  */
0048 static void rds_ib_send_complete(struct rds_message *rm,
0049                  int wc_status,
0050                  void (*complete)(struct rds_message *rm, int status))
0051 {
0052     int notify_status;
0053 
0054     switch (wc_status) {
0055     case IB_WC_WR_FLUSH_ERR:
0056         return;
0057 
0058     case IB_WC_SUCCESS:
0059         notify_status = RDS_RDMA_SUCCESS;
0060         break;
0061 
0062     case IB_WC_REM_ACCESS_ERR:
0063         notify_status = RDS_RDMA_REMOTE_ERROR;
0064         break;
0065 
0066     default:
0067         notify_status = RDS_RDMA_OTHER_ERROR;
0068         break;
0069     }
0070     complete(rm, notify_status);
0071 }
0072 
0073 static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
0074                    struct rm_data_op *op,
0075                    int wc_status)
0076 {
0077     if (op->op_nents)
0078         ib_dma_unmap_sg(ic->i_cm_id->device,
0079                 op->op_sg, op->op_nents,
0080                 DMA_TO_DEVICE);
0081 }
0082 
0083 static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
0084                    struct rm_rdma_op *op,
0085                    int wc_status)
0086 {
0087     if (op->op_mapped) {
0088         ib_dma_unmap_sg(ic->i_cm_id->device,
0089                 op->op_sg, op->op_nents,
0090                 op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
0091         op->op_mapped = 0;
0092     }
0093 
0094     /* If the user asked for a completion notification on this
0095      * message, we can implement three different semantics:
0096      *  1.  Notify when we received the ACK on the RDS message
0097      *  that was queued with the RDMA. This provides reliable
0098      *  notification of RDMA status at the expense of a one-way
0099      *  packet delay.
0100      *  2.  Notify when the IB stack gives us the completion event for
0101      *  the RDMA operation.
0102      *  3.  Notify when the IB stack gives us the completion event for
0103      *  the accompanying RDS messages.
0104      * Here, we implement approach #3. To implement approach #2,
0105      * we would need to take an event for the rdma WR. To implement #1,
0106      * don't call rds_rdma_send_complete at all, and fall back to the notify
0107      * handling in the ACK processing code.
0108      *
0109      * Note: There's no need to explicitly sync any RDMA buffers using
0110      * ib_dma_sync_sg_for_cpu - the completion for the RDMA
0111      * operation itself unmapped the RDMA buffers, which takes care
0112      * of synching.
0113      */
0114     rds_ib_send_complete(container_of(op, struct rds_message, rdma),
0115                  wc_status, rds_rdma_send_complete);
0116 
0117     if (op->op_write)
0118         rds_stats_add(s_send_rdma_bytes, op->op_bytes);
0119     else
0120         rds_stats_add(s_recv_rdma_bytes, op->op_bytes);
0121 }
0122 
0123 static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
0124                      struct rm_atomic_op *op,
0125                      int wc_status)
0126 {
0127     /* unmap atomic recvbuf */
0128     if (op->op_mapped) {
0129         ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
0130                 DMA_FROM_DEVICE);
0131         op->op_mapped = 0;
0132     }
0133 
0134     rds_ib_send_complete(container_of(op, struct rds_message, atomic),
0135                  wc_status, rds_atomic_send_complete);
0136 
0137     if (op->op_type == RDS_ATOMIC_TYPE_CSWP)
0138         rds_ib_stats_inc(s_ib_atomic_cswp);
0139     else
0140         rds_ib_stats_inc(s_ib_atomic_fadd);
0141 }
0142 
0143 /*
0144  * Unmap the resources associated with a struct send_work.
0145  *
0146  * Returns the rm for no good reason other than it is unobtainable
0147  * other than by switching on wr.opcode, currently, and the caller,
0148  * the event handler, needs it.
0149  */
0150 static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic,
0151                         struct rds_ib_send_work *send,
0152                         int wc_status)
0153 {
0154     struct rds_message *rm = NULL;
0155 
0156     /* In the error case, wc.opcode sometimes contains garbage */
0157     switch (send->s_wr.opcode) {
0158     case IB_WR_SEND:
0159         if (send->s_op) {
0160             rm = container_of(send->s_op, struct rds_message, data);
0161             rds_ib_send_unmap_data(ic, send->s_op, wc_status);
0162         }
0163         break;
0164     case IB_WR_RDMA_WRITE:
0165     case IB_WR_RDMA_READ:
0166         if (send->s_op) {
0167             rm = container_of(send->s_op, struct rds_message, rdma);
0168             rds_ib_send_unmap_rdma(ic, send->s_op, wc_status);
0169         }
0170         break;
0171     case IB_WR_ATOMIC_FETCH_AND_ADD:
0172     case IB_WR_ATOMIC_CMP_AND_SWP:
0173         if (send->s_op) {
0174             rm = container_of(send->s_op, struct rds_message, atomic);
0175             rds_ib_send_unmap_atomic(ic, send->s_op, wc_status);
0176         }
0177         break;
0178     default:
0179         printk_ratelimited(KERN_NOTICE
0180                    "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
0181                    __func__, send->s_wr.opcode);
0182         break;
0183     }
0184 
0185     send->s_wr.opcode = 0xdead;
0186 
0187     return rm;
0188 }
0189 
0190 void rds_ib_send_init_ring(struct rds_ib_connection *ic)
0191 {
0192     struct rds_ib_send_work *send;
0193     u32 i;
0194 
0195     for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
0196         struct ib_sge *sge;
0197 
0198         send->s_op = NULL;
0199 
0200         send->s_wr.wr_id = i;
0201         send->s_wr.sg_list = send->s_sge;
0202         send->s_wr.ex.imm_data = 0;
0203 
0204         sge = &send->s_sge[0];
0205         sge->addr = ic->i_send_hdrs_dma[i];
0206 
0207         sge->length = sizeof(struct rds_header);
0208         sge->lkey = ic->i_pd->local_dma_lkey;
0209 
0210         send->s_sge[1].lkey = ic->i_pd->local_dma_lkey;
0211     }
0212 }
0213 
0214 void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
0215 {
0216     struct rds_ib_send_work *send;
0217     u32 i;
0218 
0219     for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
0220         if (send->s_op && send->s_wr.opcode != 0xdead)
0221             rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);
0222     }
0223 }
0224 
0225 /*
0226  * The only fast path caller always has a non-zero nr, so we don't
0227  * bother testing nr before performing the atomic sub.
0228  */
0229 static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
0230 {
0231     if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
0232         waitqueue_active(&rds_ib_ring_empty_wait))
0233         wake_up(&rds_ib_ring_empty_wait);
0234     BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
0235 }
0236 
0237 /*
0238  * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
0239  * operations performed in the send path.  As the sender allocs and potentially
0240  * unallocs the next free entry in the ring it doesn't alter which is
0241  * the next to be freed, which is what this is concerned with.
0242  */
0243 void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
0244 {
0245     struct rds_message *rm = NULL;
0246     struct rds_connection *conn = ic->conn;
0247     struct rds_ib_send_work *send;
0248     u32 completed;
0249     u32 oldest;
0250     u32 i = 0;
0251     int nr_sig = 0;
0252 
0253 
0254     rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
0255          (unsigned long long)wc->wr_id, wc->status,
0256          ib_wc_status_msg(wc->status), wc->byte_len,
0257          be32_to_cpu(wc->ex.imm_data));
0258     rds_ib_stats_inc(s_ib_tx_cq_event);
0259 
0260     if (wc->wr_id == RDS_IB_ACK_WR_ID) {
0261         if (time_after(jiffies, ic->i_ack_queued + HZ / 2))
0262             rds_ib_stats_inc(s_ib_tx_stalled);
0263         rds_ib_ack_send_complete(ic);
0264         return;
0265     }
0266 
0267     oldest = rds_ib_ring_oldest(&ic->i_send_ring);
0268 
0269     completed = rds_ib_ring_completed(&ic->i_send_ring, wc->wr_id, oldest);
0270 
0271     for (i = 0; i < completed; i++) {
0272         send = &ic->i_sends[oldest];
0273         if (send->s_wr.send_flags & IB_SEND_SIGNALED)
0274             nr_sig++;
0275 
0276         rm = rds_ib_send_unmap_op(ic, send, wc->status);
0277 
0278         if (time_after(jiffies, send->s_queued + HZ / 2))
0279             rds_ib_stats_inc(s_ib_tx_stalled);
0280 
0281         if (send->s_op) {
0282             if (send->s_op == rm->m_final_op) {
0283                 /* If anyone waited for this message to get
0284                  * flushed out, wake them up now
0285                  */
0286                 rds_message_unmapped(rm);
0287             }
0288             rds_message_put(rm);
0289             send->s_op = NULL;
0290         }
0291 
0292         oldest = (oldest + 1) % ic->i_send_ring.w_nr;
0293     }
0294 
0295     rds_ib_ring_free(&ic->i_send_ring, completed);
0296     rds_ib_sub_signaled(ic, nr_sig);
0297 
0298     if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
0299         test_bit(0, &conn->c_map_queued))
0300         queue_delayed_work(rds_wq, &conn->c_send_w, 0);
0301 
0302     /* We expect errors as the qp is drained during shutdown */
0303     if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) {
0304         rds_ib_conn_error(conn, "send completion on <%pI6c,%pI6c,%d> had status %u (%s), vendor err 0x%x, disconnecting and reconnecting\n",
0305                   &conn->c_laddr, &conn->c_faddr,
0306                   conn->c_tos, wc->status,
0307                   ib_wc_status_msg(wc->status), wc->vendor_err);
0308     }
0309 }
0310 
0311 /*
0312  * This is the main function for allocating credits when sending
0313  * messages.
0314  *
0315  * Conceptually, we have two counters:
0316  *  -   send credits: this tells us how many WRs we're allowed
0317  *  to submit without overruning the receiver's queue. For
0318  *  each SEND WR we post, we decrement this by one.
0319  *
0320  *  -   posted credits: this tells us how many WRs we recently
0321  *  posted to the receive queue. This value is transferred
0322  *  to the peer as a "credit update" in a RDS header field.
0323  *  Every time we transmit credits to the peer, we subtract
0324  *  the amount of transferred credits from this counter.
0325  *
0326  * It is essential that we avoid situations where both sides have
0327  * exhausted their send credits, and are unable to send new credits
0328  * to the peer. We achieve this by requiring that we send at least
0329  * one credit update to the peer before exhausting our credits.
0330  * When new credits arrive, we subtract one credit that is withheld
0331  * until we've posted new buffers and are ready to transmit these
0332  * credits (see rds_ib_send_add_credits below).
0333  *
0334  * The RDS send code is essentially single-threaded; rds_send_xmit
0335  * sets RDS_IN_XMIT to ensure exclusive access to the send ring.
0336  * However, the ACK sending code is independent and can race with
0337  * message SENDs.
0338  *
0339  * In the send path, we need to update the counters for send credits
0340  * and the counter of posted buffers atomically - when we use the
0341  * last available credit, we cannot allow another thread to race us
0342  * and grab the posted credits counter.  Hence, we have to use a
0343  * spinlock to protect the credit counter, or use atomics.
0344  *
0345  * Spinlocks shared between the send and the receive path are bad,
0346  * because they create unnecessary delays. An early implementation
0347  * using a spinlock showed a 5% degradation in throughput at some
0348  * loads.
0349  *
0350  * This implementation avoids spinlocks completely, putting both
0351  * counters into a single atomic, and updating that atomic using
0352  * atomic_add (in the receive path, when receiving fresh credits),
0353  * and using atomic_cmpxchg when updating the two counters.
0354  */
0355 int rds_ib_send_grab_credits(struct rds_ib_connection *ic,
0356                  u32 wanted, u32 *adv_credits, int need_posted, int max_posted)
0357 {
0358     unsigned int avail, posted, got = 0, advertise;
0359     long oldval, newval;
0360 
0361     *adv_credits = 0;
0362     if (!ic->i_flowctl)
0363         return wanted;
0364 
0365 try_again:
0366     advertise = 0;
0367     oldval = newval = atomic_read(&ic->i_credits);
0368     posted = IB_GET_POST_CREDITS(oldval);
0369     avail = IB_GET_SEND_CREDITS(oldval);
0370 
0371     rdsdebug("wanted=%u credits=%u posted=%u\n",
0372             wanted, avail, posted);
0373 
0374     /* The last credit must be used to send a credit update. */
0375     if (avail && !posted)
0376         avail--;
0377 
0378     if (avail < wanted) {
0379         struct rds_connection *conn = ic->i_cm_id->context;
0380 
0381         /* Oops, there aren't that many credits left! */
0382         set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
0383         got = avail;
0384     } else {
0385         /* Sometimes you get what you want, lalala. */
0386         got = wanted;
0387     }
0388     newval -= IB_SET_SEND_CREDITS(got);
0389 
0390     /*
0391      * If need_posted is non-zero, then the caller wants
0392      * the posted regardless of whether any send credits are
0393      * available.
0394      */
0395     if (posted && (got || need_posted)) {
0396         advertise = min_t(unsigned int, posted, max_posted);
0397         newval -= IB_SET_POST_CREDITS(advertise);
0398     }
0399 
0400     /* Finally bill everything */
0401     if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
0402         goto try_again;
0403 
0404     *adv_credits = advertise;
0405     return got;
0406 }
0407 
0408 void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
0409 {
0410     struct rds_ib_connection *ic = conn->c_transport_data;
0411 
0412     if (credits == 0)
0413         return;
0414 
0415     rdsdebug("credits=%u current=%u%s\n",
0416             credits,
0417             IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
0418             test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ? ", ll_send_full" : "");
0419 
0420     atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
0421     if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
0422         queue_delayed_work(rds_wq, &conn->c_send_w, 0);
0423 
0424     WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
0425 
0426     rds_ib_stats_inc(s_ib_rx_credit_updates);
0427 }
0428 
0429 void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
0430 {
0431     struct rds_ib_connection *ic = conn->c_transport_data;
0432 
0433     if (posted == 0)
0434         return;
0435 
0436     atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits);
0437 
0438     /* Decide whether to send an update to the peer now.
0439      * If we would send a credit update for every single buffer we
0440      * post, we would end up with an ACK storm (ACK arrives,
0441      * consumes buffer, we refill the ring, send ACK to remote
0442      * advertising the newly posted buffer... ad inf)
0443      *
0444      * Performance pretty much depends on how often we send
0445      * credit updates - too frequent updates mean lots of ACKs.
0446      * Too infrequent updates, and the peer will run out of
0447      * credits and has to throttle.
0448      * For the time being, 16 seems to be a good compromise.
0449      */
0450     if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
0451         set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
0452 }
0453 
0454 static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
0455                          struct rds_ib_send_work *send,
0456                          bool notify)
0457 {
0458     /*
0459      * We want to delay signaling completions just enough to get
0460      * the batching benefits but not so much that we create dead time
0461      * on the wire.
0462      */
0463     if (ic->i_unsignaled_wrs-- == 0 || notify) {
0464         ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
0465         send->s_wr.send_flags |= IB_SEND_SIGNALED;
0466         return 1;
0467     }
0468     return 0;
0469 }
0470 
0471 /*
0472  * This can be called multiple times for a given message.  The first time
0473  * we see a message we map its scatterlist into the IB device so that
0474  * we can provide that mapped address to the IB scatter gather entries
0475  * in the IB work requests.  We translate the scatterlist into a series
0476  * of work requests that fragment the message.  These work requests complete
0477  * in order so we pass ownership of the message to the completion handler
0478  * once we send the final fragment.
0479  *
0480  * The RDS core uses the c_send_lock to only enter this function once
0481  * per connection.  This makes sure that the tx ring alloc/unalloc pairs
0482  * don't get out of sync and confuse the ring.
0483  */
0484 int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
0485         unsigned int hdr_off, unsigned int sg, unsigned int off)
0486 {
0487     struct rds_ib_connection *ic = conn->c_transport_data;
0488     struct ib_device *dev = ic->i_cm_id->device;
0489     struct rds_ib_send_work *send = NULL;
0490     struct rds_ib_send_work *first;
0491     struct rds_ib_send_work *prev;
0492     const struct ib_send_wr *failed_wr;
0493     struct scatterlist *scat;
0494     u32 pos;
0495     u32 i;
0496     u32 work_alloc;
0497     u32 credit_alloc = 0;
0498     u32 posted;
0499     u32 adv_credits = 0;
0500     int send_flags = 0;
0501     int bytes_sent = 0;
0502     int ret;
0503     int flow_controlled = 0;
0504     int nr_sig = 0;
0505 
0506     BUG_ON(off % RDS_FRAG_SIZE);
0507     BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
0508 
0509     /* Do not send cong updates to IB loopback */
0510     if (conn->c_loopback
0511         && rm->m_inc.i_hdr.h_flags & RDS_FLAG_CONG_BITMAP) {
0512         rds_cong_map_updated(conn->c_fcong, ~(u64) 0);
0513         scat = &rm->data.op_sg[sg];
0514         ret = max_t(int, RDS_CONG_MAP_BYTES, scat->length);
0515         return sizeof(struct rds_header) + ret;
0516     }
0517 
0518     /* FIXME we may overallocate here */
0519     if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
0520         i = 1;
0521     else
0522         i = DIV_ROUND_UP(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
0523 
0524     work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
0525     if (work_alloc == 0) {
0526         set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
0527         rds_ib_stats_inc(s_ib_tx_ring_full);
0528         ret = -ENOMEM;
0529         goto out;
0530     }
0531 
0532     if (ic->i_flowctl) {
0533         credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT);
0534         adv_credits += posted;
0535         if (credit_alloc < work_alloc) {
0536             rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);
0537             work_alloc = credit_alloc;
0538             flow_controlled = 1;
0539         }
0540         if (work_alloc == 0) {
0541             set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
0542             rds_ib_stats_inc(s_ib_tx_throttle);
0543             ret = -ENOMEM;
0544             goto out;
0545         }
0546     }
0547 
0548     /* map the message the first time we see it */
0549     if (!ic->i_data_op) {
0550         if (rm->data.op_nents) {
0551             rm->data.op_count = ib_dma_map_sg(dev,
0552                               rm->data.op_sg,
0553                               rm->data.op_nents,
0554                               DMA_TO_DEVICE);
0555             rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->data.op_count);
0556             if (rm->data.op_count == 0) {
0557                 rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
0558                 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
0559                 ret = -ENOMEM; /* XXX ? */
0560                 goto out;
0561             }
0562         } else {
0563             rm->data.op_count = 0;
0564         }
0565 
0566         rds_message_addref(rm);
0567         rm->data.op_dmasg = 0;
0568         rm->data.op_dmaoff = 0;
0569         ic->i_data_op = &rm->data;
0570 
0571         /* Finalize the header */
0572         if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
0573             rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED;
0574         if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))
0575             rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED;
0576 
0577         /* If it has a RDMA op, tell the peer we did it. This is
0578          * used by the peer to release use-once RDMA MRs. */
0579         if (rm->rdma.op_active) {
0580             struct rds_ext_header_rdma ext_hdr;
0581 
0582             ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey);
0583             rds_message_add_extension(&rm->m_inc.i_hdr,
0584                     RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr));
0585         }
0586         if (rm->m_rdma_cookie) {
0587             rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr,
0588                     rds_rdma_cookie_key(rm->m_rdma_cookie),
0589                     rds_rdma_cookie_offset(rm->m_rdma_cookie));
0590         }
0591 
0592         /* Note - rds_ib_piggyb_ack clears the ACK_REQUIRED bit, so
0593          * we should not do this unless we have a chance of at least
0594          * sticking the header into the send ring. Which is why we
0595          * should call rds_ib_ring_alloc first. */
0596         rm->m_inc.i_hdr.h_ack = cpu_to_be64(rds_ib_piggyb_ack(ic));
0597         rds_message_make_checksum(&rm->m_inc.i_hdr);
0598 
0599         /*
0600          * Update adv_credits since we reset the ACK_REQUIRED bit.
0601          */
0602         if (ic->i_flowctl) {
0603             rds_ib_send_grab_credits(ic, 0, &posted, 1, RDS_MAX_ADV_CREDIT - adv_credits);
0604             adv_credits += posted;
0605             BUG_ON(adv_credits > 255);
0606         }
0607     }
0608 
0609     /* Sometimes you want to put a fence between an RDMA
0610      * READ and the following SEND.
0611      * We could either do this all the time
0612      * or when requested by the user. Right now, we let
0613      * the application choose.
0614      */
0615     if (rm->rdma.op_active && rm->rdma.op_fence)
0616         send_flags = IB_SEND_FENCE;
0617 
0618     /* Each frag gets a header. Msgs may be 0 bytes */
0619     send = &ic->i_sends[pos];
0620     first = send;
0621     prev = NULL;
0622     scat = &ic->i_data_op->op_sg[rm->data.op_dmasg];
0623     i = 0;
0624     do {
0625         unsigned int len = 0;
0626 
0627         /* Set up the header */
0628         send->s_wr.send_flags = send_flags;
0629         send->s_wr.opcode = IB_WR_SEND;
0630         send->s_wr.num_sge = 1;
0631         send->s_wr.next = NULL;
0632         send->s_queued = jiffies;
0633         send->s_op = NULL;
0634 
0635         send->s_sge[0].addr = ic->i_send_hdrs_dma[pos];
0636 
0637         send->s_sge[0].length = sizeof(struct rds_header);
0638         send->s_sge[0].lkey = ic->i_pd->local_dma_lkey;
0639 
0640         ib_dma_sync_single_for_cpu(ic->rds_ibdev->dev,
0641                        ic->i_send_hdrs_dma[pos],
0642                        sizeof(struct rds_header),
0643                        DMA_TO_DEVICE);
0644         memcpy(ic->i_send_hdrs[pos], &rm->m_inc.i_hdr,
0645                sizeof(struct rds_header));
0646 
0647 
0648         /* Set up the data, if present */
0649         if (i < work_alloc
0650             && scat != &rm->data.op_sg[rm->data.op_count]) {
0651             len = min(RDS_FRAG_SIZE,
0652                   sg_dma_len(scat) - rm->data.op_dmaoff);
0653             send->s_wr.num_sge = 2;
0654 
0655             send->s_sge[1].addr = sg_dma_address(scat);
0656             send->s_sge[1].addr += rm->data.op_dmaoff;
0657             send->s_sge[1].length = len;
0658             send->s_sge[1].lkey = ic->i_pd->local_dma_lkey;
0659 
0660             bytes_sent += len;
0661             rm->data.op_dmaoff += len;
0662             if (rm->data.op_dmaoff == sg_dma_len(scat)) {
0663                 scat++;
0664                 rm->data.op_dmasg++;
0665                 rm->data.op_dmaoff = 0;
0666             }
0667         }
0668 
0669         rds_ib_set_wr_signal_state(ic, send, false);
0670 
0671         /*
0672          * Always signal the last one if we're stopping due to flow control.
0673          */
0674         if (ic->i_flowctl && flow_controlled && i == (work_alloc - 1)) {
0675             rds_ib_set_wr_signal_state(ic, send, true);
0676             send->s_wr.send_flags |= IB_SEND_SOLICITED;
0677         }
0678 
0679         if (send->s_wr.send_flags & IB_SEND_SIGNALED)
0680             nr_sig++;
0681 
0682         rdsdebug("send %p wr %p num_sge %u next %p\n", send,
0683              &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
0684 
0685         if (ic->i_flowctl && adv_credits) {
0686             struct rds_header *hdr = ic->i_send_hdrs[pos];
0687 
0688             /* add credit and redo the header checksum */
0689             hdr->h_credit = adv_credits;
0690             rds_message_make_checksum(hdr);
0691             adv_credits = 0;
0692             rds_ib_stats_inc(s_ib_tx_credit_updates);
0693         }
0694         ib_dma_sync_single_for_device(ic->rds_ibdev->dev,
0695                           ic->i_send_hdrs_dma[pos],
0696                           sizeof(struct rds_header),
0697                           DMA_TO_DEVICE);
0698 
0699         if (prev)
0700             prev->s_wr.next = &send->s_wr;
0701         prev = send;
0702 
0703         pos = (pos + 1) % ic->i_send_ring.w_nr;
0704         send = &ic->i_sends[pos];
0705         i++;
0706 
0707     } while (i < work_alloc
0708          && scat != &rm->data.op_sg[rm->data.op_count]);
0709 
0710     /* Account the RDS header in the number of bytes we sent, but just once.
0711      * The caller has no concept of fragmentation. */
0712     if (hdr_off == 0)
0713         bytes_sent += sizeof(struct rds_header);
0714 
0715     /* if we finished the message then send completion owns it */
0716     if (scat == &rm->data.op_sg[rm->data.op_count]) {
0717         prev->s_op = ic->i_data_op;
0718         prev->s_wr.send_flags |= IB_SEND_SOLICITED;
0719         if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED))
0720             nr_sig += rds_ib_set_wr_signal_state(ic, prev, true);
0721         ic->i_data_op = NULL;
0722     }
0723 
0724     /* Put back wrs & credits we didn't use */
0725     if (i < work_alloc) {
0726         rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
0727         work_alloc = i;
0728     }
0729     if (ic->i_flowctl && i < credit_alloc)
0730         rds_ib_send_add_credits(conn, credit_alloc - i);
0731 
0732     if (nr_sig)
0733         atomic_add(nr_sig, &ic->i_signaled_sends);
0734 
0735     /* XXX need to worry about failed_wr and partial sends. */
0736     failed_wr = &first->s_wr;
0737     ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
0738     rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
0739          first, &first->s_wr, ret, failed_wr);
0740     BUG_ON(failed_wr != &first->s_wr);
0741     if (ret) {
0742         printk(KERN_WARNING "RDS/IB: ib_post_send to %pI6c "
0743                "returned %d\n", &conn->c_faddr, ret);
0744         rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
0745         rds_ib_sub_signaled(ic, nr_sig);
0746         if (prev->s_op) {
0747             ic->i_data_op = prev->s_op;
0748             prev->s_op = NULL;
0749         }
0750 
0751         rds_ib_conn_error(ic->conn, "ib_post_send failed\n");
0752         goto out;
0753     }
0754 
0755     ret = bytes_sent;
0756 out:
0757     BUG_ON(adv_credits);
0758     return ret;
0759 }
0760 
0761 /*
0762  * Issue atomic operation.
0763  * A simplified version of the rdma case, we always map 1 SG, and
0764  * only 8 bytes, for the return value from the atomic operation.
0765  */
0766 int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
0767 {
0768     struct rds_ib_connection *ic = conn->c_transport_data;
0769     struct rds_ib_send_work *send = NULL;
0770     const struct ib_send_wr *failed_wr;
0771     u32 pos;
0772     u32 work_alloc;
0773     int ret;
0774     int nr_sig = 0;
0775 
0776     work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos);
0777     if (work_alloc != 1) {
0778         rds_ib_stats_inc(s_ib_tx_ring_full);
0779         ret = -ENOMEM;
0780         goto out;
0781     }
0782 
0783     /* address of send request in ring */
0784     send = &ic->i_sends[pos];
0785     send->s_queued = jiffies;
0786 
0787     if (op->op_type == RDS_ATOMIC_TYPE_CSWP) {
0788         send->s_atomic_wr.wr.opcode = IB_WR_MASKED_ATOMIC_CMP_AND_SWP;
0789         send->s_atomic_wr.compare_add = op->op_m_cswp.compare;
0790         send->s_atomic_wr.swap = op->op_m_cswp.swap;
0791         send->s_atomic_wr.compare_add_mask = op->op_m_cswp.compare_mask;
0792         send->s_atomic_wr.swap_mask = op->op_m_cswp.swap_mask;
0793     } else { /* FADD */
0794         send->s_atomic_wr.wr.opcode = IB_WR_MASKED_ATOMIC_FETCH_AND_ADD;
0795         send->s_atomic_wr.compare_add = op->op_m_fadd.add;
0796         send->s_atomic_wr.swap = 0;
0797         send->s_atomic_wr.compare_add_mask = op->op_m_fadd.nocarry_mask;
0798         send->s_atomic_wr.swap_mask = 0;
0799     }
0800     send->s_wr.send_flags = 0;
0801     nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
0802     send->s_atomic_wr.wr.num_sge = 1;
0803     send->s_atomic_wr.wr.next = NULL;
0804     send->s_atomic_wr.remote_addr = op->op_remote_addr;
0805     send->s_atomic_wr.rkey = op->op_rkey;
0806     send->s_op = op;
0807     rds_message_addref(container_of(send->s_op, struct rds_message, atomic));
0808 
0809     /* map 8 byte retval buffer to the device */
0810     ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
0811     rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
0812     if (ret != 1) {
0813         rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
0814         rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
0815         ret = -ENOMEM; /* XXX ? */
0816         goto out;
0817     }
0818 
0819     /* Convert our struct scatterlist to struct ib_sge */
0820     send->s_sge[0].addr = sg_dma_address(op->op_sg);
0821     send->s_sge[0].length = sg_dma_len(op->op_sg);
0822     send->s_sge[0].lkey = ic->i_pd->local_dma_lkey;
0823 
0824     rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
0825          send->s_sge[0].addr, send->s_sge[0].length);
0826 
0827     if (nr_sig)
0828         atomic_add(nr_sig, &ic->i_signaled_sends);
0829 
0830     failed_wr = &send->s_atomic_wr.wr;
0831     ret = ib_post_send(ic->i_cm_id->qp, &send->s_atomic_wr.wr, &failed_wr);
0832     rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
0833          send, &send->s_atomic_wr, ret, failed_wr);
0834     BUG_ON(failed_wr != &send->s_atomic_wr.wr);
0835     if (ret) {
0836         printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI6c "
0837                "returned %d\n", &conn->c_faddr, ret);
0838         rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
0839         rds_ib_sub_signaled(ic, nr_sig);
0840         goto out;
0841     }
0842 
0843     if (unlikely(failed_wr != &send->s_atomic_wr.wr)) {
0844         printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
0845         BUG_ON(failed_wr != &send->s_atomic_wr.wr);
0846     }
0847 
0848 out:
0849     return ret;
0850 }
0851 
0852 int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
0853 {
0854     struct rds_ib_connection *ic = conn->c_transport_data;
0855     struct rds_ib_send_work *send = NULL;
0856     struct rds_ib_send_work *first;
0857     struct rds_ib_send_work *prev;
0858     const struct ib_send_wr *failed_wr;
0859     struct scatterlist *scat;
0860     unsigned long len;
0861     u64 remote_addr = op->op_remote_addr;
0862     u32 max_sge = ic->rds_ibdev->max_sge;
0863     u32 pos;
0864     u32 work_alloc;
0865     u32 i;
0866     u32 j;
0867     int sent;
0868     int ret;
0869     int num_sge;
0870     int nr_sig = 0;
0871     u64 odp_addr = op->op_odp_addr;
0872     u32 odp_lkey = 0;
0873 
0874     /* map the op the first time we see it */
0875     if (!op->op_odp_mr) {
0876         if (!op->op_mapped) {
0877             op->op_count =
0878                 ib_dma_map_sg(ic->i_cm_id->device, op->op_sg,
0879                           op->op_nents,
0880                           (op->op_write) ? DMA_TO_DEVICE :
0881                                    DMA_FROM_DEVICE);
0882             rdsdebug("ic %p mapping op %p: %d\n", ic, op,
0883                  op->op_count);
0884             if (op->op_count == 0) {
0885                 rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
0886                 ret = -ENOMEM; /* XXX ? */
0887                 goto out;
0888             }
0889             op->op_mapped = 1;
0890         }
0891     } else {
0892         op->op_count = op->op_nents;
0893         odp_lkey = rds_ib_get_lkey(op->op_odp_mr->r_trans_private);
0894     }
0895 
0896     /*
0897      * Instead of knowing how to return a partial rdma read/write we insist that there
0898      * be enough work requests to send the entire message.
0899      */
0900     i = DIV_ROUND_UP(op->op_count, max_sge);
0901 
0902     work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
0903     if (work_alloc != i) {
0904         rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
0905         rds_ib_stats_inc(s_ib_tx_ring_full);
0906         ret = -ENOMEM;
0907         goto out;
0908     }
0909 
0910     send = &ic->i_sends[pos];
0911     first = send;
0912     prev = NULL;
0913     scat = &op->op_sg[0];
0914     sent = 0;
0915     num_sge = op->op_count;
0916 
0917     for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) {
0918         send->s_wr.send_flags = 0;
0919         send->s_queued = jiffies;
0920         send->s_op = NULL;
0921 
0922         if (!op->op_notify)
0923             nr_sig += rds_ib_set_wr_signal_state(ic, send,
0924                                  op->op_notify);
0925 
0926         send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
0927         send->s_rdma_wr.remote_addr = remote_addr;
0928         send->s_rdma_wr.rkey = op->op_rkey;
0929 
0930         if (num_sge > max_sge) {
0931             send->s_rdma_wr.wr.num_sge = max_sge;
0932             num_sge -= max_sge;
0933         } else {
0934             send->s_rdma_wr.wr.num_sge = num_sge;
0935         }
0936 
0937         send->s_rdma_wr.wr.next = NULL;
0938 
0939         if (prev)
0940             prev->s_rdma_wr.wr.next = &send->s_rdma_wr.wr;
0941 
0942         for (j = 0; j < send->s_rdma_wr.wr.num_sge &&
0943              scat != &op->op_sg[op->op_count]; j++) {
0944             len = sg_dma_len(scat);
0945             if (!op->op_odp_mr) {
0946                 send->s_sge[j].addr = sg_dma_address(scat);
0947                 send->s_sge[j].lkey = ic->i_pd->local_dma_lkey;
0948             } else {
0949                 send->s_sge[j].addr = odp_addr;
0950                 send->s_sge[j].lkey = odp_lkey;
0951             }
0952             send->s_sge[j].length = len;
0953 
0954             sent += len;
0955             rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
0956 
0957             remote_addr += len;
0958             odp_addr += len;
0959             scat++;
0960         }
0961 
0962         rdsdebug("send %p wr %p num_sge %u next %p\n", send,
0963             &send->s_rdma_wr.wr,
0964             send->s_rdma_wr.wr.num_sge,
0965             send->s_rdma_wr.wr.next);
0966 
0967         prev = send;
0968         if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
0969             send = ic->i_sends;
0970     }
0971 
0972     /* give a reference to the last op */
0973     if (scat == &op->op_sg[op->op_count]) {
0974         prev->s_op = op;
0975         rds_message_addref(container_of(op, struct rds_message, rdma));
0976     }
0977 
0978     if (i < work_alloc) {
0979         rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
0980         work_alloc = i;
0981     }
0982 
0983     if (nr_sig)
0984         atomic_add(nr_sig, &ic->i_signaled_sends);
0985 
0986     failed_wr = &first->s_rdma_wr.wr;
0987     ret = ib_post_send(ic->i_cm_id->qp, &first->s_rdma_wr.wr, &failed_wr);
0988     rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
0989          first, &first->s_rdma_wr.wr, ret, failed_wr);
0990     BUG_ON(failed_wr != &first->s_rdma_wr.wr);
0991     if (ret) {
0992         printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI6c "
0993                "returned %d\n", &conn->c_faddr, ret);
0994         rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
0995         rds_ib_sub_signaled(ic, nr_sig);
0996         goto out;
0997     }
0998 
0999     if (unlikely(failed_wr != &first->s_rdma_wr.wr)) {
1000         printk(KERN_WARNING "RDS/IB: ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
1001         BUG_ON(failed_wr != &first->s_rdma_wr.wr);
1002     }
1003 
1004 
1005 out:
1006     return ret;
1007 }
1008 
1009 void rds_ib_xmit_path_complete(struct rds_conn_path *cp)
1010 {
1011     struct rds_connection *conn = cp->cp_conn;
1012     struct rds_ib_connection *ic = conn->c_transport_data;
1013 
1014     /* We may have a pending ACK or window update we were unable
1015      * to send previously (due to flow control). Try again. */
1016     rds_ib_attempt_ack(ic);
1017 }