Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Copyright (c) 2007, 2020 Oracle and/or its affiliates.
0003  *
0004  * This software is available to you under a choice of one of two
0005  * licenses.  You may choose to be licensed under the terms of the GNU
0006  * General Public License (GPL) Version 2, available from the file
0007  * COPYING in the main directory of this source tree, or the
0008  * OpenIB.org BSD license below:
0009  *
0010  *     Redistribution and use in source and binary forms, with or
0011  *     without modification, are permitted provided that the following
0012  *     conditions are met:
0013  *
0014  *      - Redistributions of source code must retain the above
0015  *        copyright notice, this list of conditions and the following
0016  *        disclaimer.
0017  *
0018  *      - Redistributions in binary form must reproduce the above
0019  *        copyright notice, this list of conditions and the following
0020  *        disclaimer in the documentation and/or other materials
0021  *        provided with the distribution.
0022  *
0023  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
0024  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
0025  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
0026  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
0027  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
0028  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
0029  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
0030  * SOFTWARE.
0031  *
0032  */
0033 #include <linux/pagemap.h>
0034 #include <linux/slab.h>
0035 #include <linux/rbtree.h>
0036 #include <linux/dma-mapping.h> /* for DMA_*_DEVICE */
0037 
0038 #include "rds.h"
0039 
0040 /*
0041  * XXX
0042  *  - build with sparse
0043  *  - should we detect duplicate keys on a socket?  hmm.
0044  *  - an rdma is an mlock, apply rlimit?
0045  */
0046 
0047 /*
0048  * get the number of pages by looking at the page indices that the start and
0049  * end addresses fall in.
0050  *
0051  * Returns 0 if the vec is invalid.  It is invalid if the number of bytes
0052  * causes the address to wrap or overflows an unsigned int.  This comes
0053  * from being stored in the 'length' member of 'struct scatterlist'.
0054  */
0055 static unsigned int rds_pages_in_vec(struct rds_iovec *vec)
0056 {
0057     if ((vec->addr + vec->bytes <= vec->addr) ||
0058         (vec->bytes > (u64)UINT_MAX))
0059         return 0;
0060 
0061     return ((vec->addr + vec->bytes + PAGE_SIZE - 1) >> PAGE_SHIFT) -
0062         (vec->addr >> PAGE_SHIFT);
0063 }
0064 
0065 static struct rds_mr *rds_mr_tree_walk(struct rb_root *root, u64 key,
0066                        struct rds_mr *insert)
0067 {
0068     struct rb_node **p = &root->rb_node;
0069     struct rb_node *parent = NULL;
0070     struct rds_mr *mr;
0071 
0072     while (*p) {
0073         parent = *p;
0074         mr = rb_entry(parent, struct rds_mr, r_rb_node);
0075 
0076         if (key < mr->r_key)
0077             p = &(*p)->rb_left;
0078         else if (key > mr->r_key)
0079             p = &(*p)->rb_right;
0080         else
0081             return mr;
0082     }
0083 
0084     if (insert) {
0085         rb_link_node(&insert->r_rb_node, parent, p);
0086         rb_insert_color(&insert->r_rb_node, root);
0087         kref_get(&insert->r_kref);
0088     }
0089     return NULL;
0090 }
0091 
0092 /*
0093  * Destroy the transport-specific part of a MR.
0094  */
0095 static void rds_destroy_mr(struct rds_mr *mr)
0096 {
0097     struct rds_sock *rs = mr->r_sock;
0098     void *trans_private = NULL;
0099     unsigned long flags;
0100 
0101     rdsdebug("RDS: destroy mr key is %x refcnt %u\n",
0102          mr->r_key, kref_read(&mr->r_kref));
0103 
0104     spin_lock_irqsave(&rs->rs_rdma_lock, flags);
0105     if (!RB_EMPTY_NODE(&mr->r_rb_node))
0106         rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
0107     trans_private = mr->r_trans_private;
0108     mr->r_trans_private = NULL;
0109     spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
0110 
0111     if (trans_private)
0112         mr->r_trans->free_mr(trans_private, mr->r_invalidate);
0113 }
0114 
0115 void __rds_put_mr_final(struct kref *kref)
0116 {
0117     struct rds_mr *mr = container_of(kref, struct rds_mr, r_kref);
0118 
0119     rds_destroy_mr(mr);
0120     kfree(mr);
0121 }
0122 
0123 /*
0124  * By the time this is called we can't have any more ioctls called on
0125  * the socket so we don't need to worry about racing with others.
0126  */
0127 void rds_rdma_drop_keys(struct rds_sock *rs)
0128 {
0129     struct rds_mr *mr;
0130     struct rb_node *node;
0131     unsigned long flags;
0132 
0133     /* Release any MRs associated with this socket */
0134     spin_lock_irqsave(&rs->rs_rdma_lock, flags);
0135     while ((node = rb_first(&rs->rs_rdma_keys))) {
0136         mr = rb_entry(node, struct rds_mr, r_rb_node);
0137         if (mr->r_trans == rs->rs_transport)
0138             mr->r_invalidate = 0;
0139         rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
0140         RB_CLEAR_NODE(&mr->r_rb_node);
0141         spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
0142         kref_put(&mr->r_kref, __rds_put_mr_final);
0143         spin_lock_irqsave(&rs->rs_rdma_lock, flags);
0144     }
0145     spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
0146 
0147     if (rs->rs_transport && rs->rs_transport->flush_mrs)
0148         rs->rs_transport->flush_mrs();
0149 }
0150 
0151 /*
0152  * Helper function to pin user pages.
0153  */
0154 static int rds_pin_pages(unsigned long user_addr, unsigned int nr_pages,
0155             struct page **pages, int write)
0156 {
0157     unsigned int gup_flags = FOLL_LONGTERM;
0158     int ret;
0159 
0160     if (write)
0161         gup_flags |= FOLL_WRITE;
0162 
0163     ret = pin_user_pages_fast(user_addr, nr_pages, gup_flags, pages);
0164     if (ret >= 0 && ret < nr_pages) {
0165         unpin_user_pages(pages, ret);
0166         ret = -EFAULT;
0167     }
0168 
0169     return ret;
0170 }
0171 
0172 static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
0173               u64 *cookie_ret, struct rds_mr **mr_ret,
0174               struct rds_conn_path *cp)
0175 {
0176     struct rds_mr *mr = NULL, *found;
0177     struct scatterlist *sg = NULL;
0178     unsigned int nr_pages;
0179     struct page **pages = NULL;
0180     void *trans_private;
0181     unsigned long flags;
0182     rds_rdma_cookie_t cookie;
0183     unsigned int nents = 0;
0184     int need_odp = 0;
0185     long i;
0186     int ret;
0187 
0188     if (ipv6_addr_any(&rs->rs_bound_addr) || !rs->rs_transport) {
0189         ret = -ENOTCONN; /* XXX not a great errno */
0190         goto out;
0191     }
0192 
0193     if (!rs->rs_transport->get_mr) {
0194         ret = -EOPNOTSUPP;
0195         goto out;
0196     }
0197 
0198     /* If the combination of the addr and size requested for this memory
0199      * region causes an integer overflow, return error.
0200      */
0201     if (((args->vec.addr + args->vec.bytes) < args->vec.addr) ||
0202         PAGE_ALIGN(args->vec.addr + args->vec.bytes) <
0203             (args->vec.addr + args->vec.bytes)) {
0204         ret = -EINVAL;
0205         goto out;
0206     }
0207 
0208     if (!can_do_mlock()) {
0209         ret = -EPERM;
0210         goto out;
0211     }
0212 
0213     nr_pages = rds_pages_in_vec(&args->vec);
0214     if (nr_pages == 0) {
0215         ret = -EINVAL;
0216         goto out;
0217     }
0218 
0219     /* Restrict the size of mr irrespective of underlying transport
0220      * To account for unaligned mr regions, subtract one from nr_pages
0221      */
0222     if ((nr_pages - 1) > (RDS_MAX_MSG_SIZE >> PAGE_SHIFT)) {
0223         ret = -EMSGSIZE;
0224         goto out;
0225     }
0226 
0227     rdsdebug("RDS: get_mr addr %llx len %llu nr_pages %u\n",
0228         args->vec.addr, args->vec.bytes, nr_pages);
0229 
0230     /* XXX clamp nr_pages to limit the size of this alloc? */
0231     pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
0232     if (!pages) {
0233         ret = -ENOMEM;
0234         goto out;
0235     }
0236 
0237     mr = kzalloc(sizeof(struct rds_mr), GFP_KERNEL);
0238     if (!mr) {
0239         ret = -ENOMEM;
0240         goto out;
0241     }
0242 
0243     kref_init(&mr->r_kref);
0244     RB_CLEAR_NODE(&mr->r_rb_node);
0245     mr->r_trans = rs->rs_transport;
0246     mr->r_sock = rs;
0247 
0248     if (args->flags & RDS_RDMA_USE_ONCE)
0249         mr->r_use_once = 1;
0250     if (args->flags & RDS_RDMA_INVALIDATE)
0251         mr->r_invalidate = 1;
0252     if (args->flags & RDS_RDMA_READWRITE)
0253         mr->r_write = 1;
0254 
0255     /*
0256      * Pin the pages that make up the user buffer and transfer the page
0257      * pointers to the mr's sg array.  We check to see if we've mapped
0258      * the whole region after transferring the partial page references
0259      * to the sg array so that we can have one page ref cleanup path.
0260      *
0261      * For now we have no flag that tells us whether the mapping is
0262      * r/o or r/w. We need to assume r/w, or we'll do a lot of RDMA to
0263      * the zero page.
0264      */
0265     ret = rds_pin_pages(args->vec.addr, nr_pages, pages, 1);
0266     if (ret == -EOPNOTSUPP) {
0267         need_odp = 1;
0268     } else if (ret <= 0) {
0269         goto out;
0270     } else {
0271         nents = ret;
0272         sg = kmalloc_array(nents, sizeof(*sg), GFP_KERNEL);
0273         if (!sg) {
0274             ret = -ENOMEM;
0275             goto out;
0276         }
0277         WARN_ON(!nents);
0278         sg_init_table(sg, nents);
0279 
0280         /* Stick all pages into the scatterlist */
0281         for (i = 0 ; i < nents; i++)
0282             sg_set_page(&sg[i], pages[i], PAGE_SIZE, 0);
0283 
0284         rdsdebug("RDS: trans_private nents is %u\n", nents);
0285     }
0286     /* Obtain a transport specific MR. If this succeeds, the
0287      * s/g list is now owned by the MR.
0288      * Note that dma_map() implies that pending writes are
0289      * flushed to RAM, so no dma_sync is needed here. */
0290     trans_private = rs->rs_transport->get_mr(
0291         sg, nents, rs, &mr->r_key, cp ? cp->cp_conn : NULL,
0292         args->vec.addr, args->vec.bytes,
0293         need_odp ? ODP_ZEROBASED : ODP_NOT_NEEDED);
0294 
0295     if (IS_ERR(trans_private)) {
0296         /* In ODP case, we don't GUP pages, so don't need
0297          * to release anything.
0298          */
0299         if (!need_odp) {
0300             unpin_user_pages(pages, nr_pages);
0301             kfree(sg);
0302         }
0303         ret = PTR_ERR(trans_private);
0304         goto out;
0305     }
0306 
0307     mr->r_trans_private = trans_private;
0308 
0309     rdsdebug("RDS: get_mr put_user key is %x cookie_addr %p\n",
0310            mr->r_key, (void *)(unsigned long) args->cookie_addr);
0311 
0312     /* The user may pass us an unaligned address, but we can only
0313      * map page aligned regions. So we keep the offset, and build
0314      * a 64bit cookie containing <R_Key, offset> and pass that
0315      * around. */
0316     if (need_odp)
0317         cookie = rds_rdma_make_cookie(mr->r_key, 0);
0318     else
0319         cookie = rds_rdma_make_cookie(mr->r_key,
0320                           args->vec.addr & ~PAGE_MASK);
0321     if (cookie_ret)
0322         *cookie_ret = cookie;
0323 
0324     if (args->cookie_addr &&
0325         put_user(cookie, (u64 __user *)(unsigned long)args->cookie_addr)) {
0326         if (!need_odp) {
0327             unpin_user_pages(pages, nr_pages);
0328             kfree(sg);
0329         }
0330         ret = -EFAULT;
0331         goto out;
0332     }
0333 
0334     /* Inserting the new MR into the rbtree bumps its
0335      * reference count. */
0336     spin_lock_irqsave(&rs->rs_rdma_lock, flags);
0337     found = rds_mr_tree_walk(&rs->rs_rdma_keys, mr->r_key, mr);
0338     spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
0339 
0340     BUG_ON(found && found != mr);
0341 
0342     rdsdebug("RDS: get_mr key is %x\n", mr->r_key);
0343     if (mr_ret) {
0344         kref_get(&mr->r_kref);
0345         *mr_ret = mr;
0346     }
0347 
0348     ret = 0;
0349 out:
0350     kfree(pages);
0351     if (mr)
0352         kref_put(&mr->r_kref, __rds_put_mr_final);
0353     return ret;
0354 }
0355 
0356 int rds_get_mr(struct rds_sock *rs, sockptr_t optval, int optlen)
0357 {
0358     struct rds_get_mr_args args;
0359 
0360     if (optlen != sizeof(struct rds_get_mr_args))
0361         return -EINVAL;
0362 
0363     if (copy_from_sockptr(&args, optval, sizeof(struct rds_get_mr_args)))
0364         return -EFAULT;
0365 
0366     return __rds_rdma_map(rs, &args, NULL, NULL, NULL);
0367 }
0368 
0369 int rds_get_mr_for_dest(struct rds_sock *rs, sockptr_t optval, int optlen)
0370 {
0371     struct rds_get_mr_for_dest_args args;
0372     struct rds_get_mr_args new_args;
0373 
0374     if (optlen != sizeof(struct rds_get_mr_for_dest_args))
0375         return -EINVAL;
0376 
0377     if (copy_from_sockptr(&args, optval,
0378                sizeof(struct rds_get_mr_for_dest_args)))
0379         return -EFAULT;
0380 
0381     /*
0382      * Initially, just behave like get_mr().
0383      * TODO: Implement get_mr as wrapper around this
0384      *   and deprecate it.
0385      */
0386     new_args.vec = args.vec;
0387     new_args.cookie_addr = args.cookie_addr;
0388     new_args.flags = args.flags;
0389 
0390     return __rds_rdma_map(rs, &new_args, NULL, NULL, NULL);
0391 }
0392 
0393 /*
0394  * Free the MR indicated by the given R_Key
0395  */
0396 int rds_free_mr(struct rds_sock *rs, sockptr_t optval, int optlen)
0397 {
0398     struct rds_free_mr_args args;
0399     struct rds_mr *mr;
0400     unsigned long flags;
0401 
0402     if (optlen != sizeof(struct rds_free_mr_args))
0403         return -EINVAL;
0404 
0405     if (copy_from_sockptr(&args, optval, sizeof(struct rds_free_mr_args)))
0406         return -EFAULT;
0407 
0408     /* Special case - a null cookie means flush all unused MRs */
0409     if (args.cookie == 0) {
0410         if (!rs->rs_transport || !rs->rs_transport->flush_mrs)
0411             return -EINVAL;
0412         rs->rs_transport->flush_mrs();
0413         return 0;
0414     }
0415 
0416     /* Look up the MR given its R_key and remove it from the rbtree
0417      * so nobody else finds it.
0418      * This should also prevent races with rds_rdma_unuse.
0419      */
0420     spin_lock_irqsave(&rs->rs_rdma_lock, flags);
0421     mr = rds_mr_tree_walk(&rs->rs_rdma_keys, rds_rdma_cookie_key(args.cookie), NULL);
0422     if (mr) {
0423         rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
0424         RB_CLEAR_NODE(&mr->r_rb_node);
0425         if (args.flags & RDS_RDMA_INVALIDATE)
0426             mr->r_invalidate = 1;
0427     }
0428     spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
0429 
0430     if (!mr)
0431         return -EINVAL;
0432 
0433     kref_put(&mr->r_kref, __rds_put_mr_final);
0434     return 0;
0435 }
0436 
0437 /*
0438  * This is called when we receive an extension header that
0439  * tells us this MR was used. It allows us to implement
0440  * use_once semantics
0441  */
0442 void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force)
0443 {
0444     struct rds_mr *mr;
0445     unsigned long flags;
0446     int zot_me = 0;
0447 
0448     spin_lock_irqsave(&rs->rs_rdma_lock, flags);
0449     mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL);
0450     if (!mr) {
0451         pr_debug("rds: trying to unuse MR with unknown r_key %u!\n",
0452              r_key);
0453         spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
0454         return;
0455     }
0456 
0457     /* Get a reference so that the MR won't go away before calling
0458      * sync_mr() below.
0459      */
0460     kref_get(&mr->r_kref);
0461 
0462     /* If it is going to be freed, remove it from the tree now so
0463      * that no other thread can find it and free it.
0464      */
0465     if (mr->r_use_once || force) {
0466         rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
0467         RB_CLEAR_NODE(&mr->r_rb_node);
0468         zot_me = 1;
0469     }
0470     spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
0471 
0472     /* May have to issue a dma_sync on this memory region.
0473      * Note we could avoid this if the operation was a RDMA READ,
0474      * but at this point we can't tell. */
0475     if (mr->r_trans->sync_mr)
0476         mr->r_trans->sync_mr(mr->r_trans_private, DMA_FROM_DEVICE);
0477 
0478     /* Release the reference held above. */
0479     kref_put(&mr->r_kref, __rds_put_mr_final);
0480 
0481     /* If the MR was marked as invalidate, this will
0482      * trigger an async flush. */
0483     if (zot_me)
0484         kref_put(&mr->r_kref, __rds_put_mr_final);
0485 }
0486 
0487 void rds_rdma_free_op(struct rm_rdma_op *ro)
0488 {
0489     unsigned int i;
0490 
0491     if (ro->op_odp_mr) {
0492         kref_put(&ro->op_odp_mr->r_kref, __rds_put_mr_final);
0493     } else {
0494         for (i = 0; i < ro->op_nents; i++) {
0495             struct page *page = sg_page(&ro->op_sg[i]);
0496 
0497             /* Mark page dirty if it was possibly modified, which
0498              * is the case for a RDMA_READ which copies from remote
0499              * to local memory
0500              */
0501             unpin_user_pages_dirty_lock(&page, 1, !ro->op_write);
0502         }
0503     }
0504 
0505     kfree(ro->op_notifier);
0506     ro->op_notifier = NULL;
0507     ro->op_active = 0;
0508     ro->op_odp_mr = NULL;
0509 }
0510 
0511 void rds_atomic_free_op(struct rm_atomic_op *ao)
0512 {
0513     struct page *page = sg_page(ao->op_sg);
0514 
0515     /* Mark page dirty if it was possibly modified, which
0516      * is the case for a RDMA_READ which copies from remote
0517      * to local memory */
0518     unpin_user_pages_dirty_lock(&page, 1, true);
0519 
0520     kfree(ao->op_notifier);
0521     ao->op_notifier = NULL;
0522     ao->op_active = 0;
0523 }
0524 
0525 
0526 /*
0527  * Count the number of pages needed to describe an incoming iovec array.
0528  */
0529 static int rds_rdma_pages(struct rds_iovec iov[], int nr_iovecs)
0530 {
0531     int tot_pages = 0;
0532     unsigned int nr_pages;
0533     unsigned int i;
0534 
0535     /* figure out the number of pages in the vector */
0536     for (i = 0; i < nr_iovecs; i++) {
0537         nr_pages = rds_pages_in_vec(&iov[i]);
0538         if (nr_pages == 0)
0539             return -EINVAL;
0540 
0541         tot_pages += nr_pages;
0542 
0543         /*
0544          * nr_pages for one entry is limited to (UINT_MAX>>PAGE_SHIFT)+1,
0545          * so tot_pages cannot overflow without first going negative.
0546          */
0547         if (tot_pages < 0)
0548             return -EINVAL;
0549     }
0550 
0551     return tot_pages;
0552 }
0553 
0554 int rds_rdma_extra_size(struct rds_rdma_args *args,
0555             struct rds_iov_vector *iov)
0556 {
0557     struct rds_iovec *vec;
0558     struct rds_iovec __user *local_vec;
0559     int tot_pages = 0;
0560     unsigned int nr_pages;
0561     unsigned int i;
0562 
0563     local_vec = (struct rds_iovec __user *)(unsigned long) args->local_vec_addr;
0564 
0565     if (args->nr_local == 0)
0566         return -EINVAL;
0567 
0568     if (args->nr_local > UIO_MAXIOV)
0569         return -EMSGSIZE;
0570 
0571     iov->iov = kcalloc(args->nr_local,
0572                sizeof(struct rds_iovec),
0573                GFP_KERNEL);
0574     if (!iov->iov)
0575         return -ENOMEM;
0576 
0577     vec = &iov->iov[0];
0578 
0579     if (copy_from_user(vec, local_vec, args->nr_local *
0580                sizeof(struct rds_iovec)))
0581         return -EFAULT;
0582     iov->len = args->nr_local;
0583 
0584     /* figure out the number of pages in the vector */
0585     for (i = 0; i < args->nr_local; i++, vec++) {
0586 
0587         nr_pages = rds_pages_in_vec(vec);
0588         if (nr_pages == 0)
0589             return -EINVAL;
0590 
0591         tot_pages += nr_pages;
0592 
0593         /*
0594          * nr_pages for one entry is limited to (UINT_MAX>>PAGE_SHIFT)+1,
0595          * so tot_pages cannot overflow without first going negative.
0596          */
0597         if (tot_pages < 0)
0598             return -EINVAL;
0599     }
0600 
0601     return tot_pages * sizeof(struct scatterlist);
0602 }
0603 
0604 /*
0605  * The application asks for a RDMA transfer.
0606  * Extract all arguments and set up the rdma_op
0607  */
0608 int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
0609                struct cmsghdr *cmsg,
0610                struct rds_iov_vector *vec)
0611 {
0612     struct rds_rdma_args *args;
0613     struct rm_rdma_op *op = &rm->rdma;
0614     int nr_pages;
0615     unsigned int nr_bytes;
0616     struct page **pages = NULL;
0617     struct rds_iovec *iovs;
0618     unsigned int i, j;
0619     int ret = 0;
0620     bool odp_supported = true;
0621 
0622     if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_rdma_args))
0623         || rm->rdma.op_active)
0624         return -EINVAL;
0625 
0626     args = CMSG_DATA(cmsg);
0627 
0628     if (ipv6_addr_any(&rs->rs_bound_addr)) {
0629         ret = -ENOTCONN; /* XXX not a great errno */
0630         goto out_ret;
0631     }
0632 
0633     if (args->nr_local > UIO_MAXIOV) {
0634         ret = -EMSGSIZE;
0635         goto out_ret;
0636     }
0637 
0638     if (vec->len != args->nr_local) {
0639         ret = -EINVAL;
0640         goto out_ret;
0641     }
0642     /* odp-mr is not supported for multiple requests within one message */
0643     if (args->nr_local != 1)
0644         odp_supported = false;
0645 
0646     iovs = vec->iov;
0647 
0648     nr_pages = rds_rdma_pages(iovs, args->nr_local);
0649     if (nr_pages < 0) {
0650         ret = -EINVAL;
0651         goto out_ret;
0652     }
0653 
0654     pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
0655     if (!pages) {
0656         ret = -ENOMEM;
0657         goto out_ret;
0658     }
0659 
0660     op->op_write = !!(args->flags & RDS_RDMA_READWRITE);
0661     op->op_fence = !!(args->flags & RDS_RDMA_FENCE);
0662     op->op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
0663     op->op_silent = !!(args->flags & RDS_RDMA_SILENT);
0664     op->op_active = 1;
0665     op->op_recverr = rs->rs_recverr;
0666     op->op_odp_mr = NULL;
0667 
0668     WARN_ON(!nr_pages);
0669     op->op_sg = rds_message_alloc_sgs(rm, nr_pages);
0670     if (IS_ERR(op->op_sg)) {
0671         ret = PTR_ERR(op->op_sg);
0672         goto out_pages;
0673     }
0674 
0675     if (op->op_notify || op->op_recverr) {
0676         /* We allocate an uninitialized notifier here, because
0677          * we don't want to do that in the completion handler. We
0678          * would have to use GFP_ATOMIC there, and don't want to deal
0679          * with failed allocations.
0680          */
0681         op->op_notifier = kmalloc(sizeof(struct rds_notifier), GFP_KERNEL);
0682         if (!op->op_notifier) {
0683             ret = -ENOMEM;
0684             goto out_pages;
0685         }
0686         op->op_notifier->n_user_token = args->user_token;
0687         op->op_notifier->n_status = RDS_RDMA_SUCCESS;
0688     }
0689 
0690     /* The cookie contains the R_Key of the remote memory region, and
0691      * optionally an offset into it. This is how we implement RDMA into
0692      * unaligned memory.
0693      * When setting up the RDMA, we need to add that offset to the
0694      * destination address (which is really an offset into the MR)
0695      * FIXME: We may want to move this into ib_rdma.c
0696      */
0697     op->op_rkey = rds_rdma_cookie_key(args->cookie);
0698     op->op_remote_addr = args->remote_vec.addr + rds_rdma_cookie_offset(args->cookie);
0699 
0700     nr_bytes = 0;
0701 
0702     rdsdebug("RDS: rdma prepare nr_local %llu rva %llx rkey %x\n",
0703            (unsigned long long)args->nr_local,
0704            (unsigned long long)args->remote_vec.addr,
0705            op->op_rkey);
0706 
0707     for (i = 0; i < args->nr_local; i++) {
0708         struct rds_iovec *iov = &iovs[i];
0709         /* don't need to check, rds_rdma_pages() verified nr will be +nonzero */
0710         unsigned int nr = rds_pages_in_vec(iov);
0711 
0712         rs->rs_user_addr = iov->addr;
0713         rs->rs_user_bytes = iov->bytes;
0714 
0715         /* If it's a WRITE operation, we want to pin the pages for reading.
0716          * If it's a READ operation, we need to pin the pages for writing.
0717          */
0718         ret = rds_pin_pages(iov->addr, nr, pages, !op->op_write);
0719         if ((!odp_supported && ret <= 0) ||
0720             (odp_supported && ret <= 0 && ret != -EOPNOTSUPP))
0721             goto out_pages;
0722 
0723         if (ret == -EOPNOTSUPP) {
0724             struct rds_mr *local_odp_mr;
0725 
0726             if (!rs->rs_transport->get_mr) {
0727                 ret = -EOPNOTSUPP;
0728                 goto out_pages;
0729             }
0730             local_odp_mr =
0731                 kzalloc(sizeof(*local_odp_mr), GFP_KERNEL);
0732             if (!local_odp_mr) {
0733                 ret = -ENOMEM;
0734                 goto out_pages;
0735             }
0736             RB_CLEAR_NODE(&local_odp_mr->r_rb_node);
0737             kref_init(&local_odp_mr->r_kref);
0738             local_odp_mr->r_trans = rs->rs_transport;
0739             local_odp_mr->r_sock = rs;
0740             local_odp_mr->r_trans_private =
0741                 rs->rs_transport->get_mr(
0742                     NULL, 0, rs, &local_odp_mr->r_key, NULL,
0743                     iov->addr, iov->bytes, ODP_VIRTUAL);
0744             if (IS_ERR(local_odp_mr->r_trans_private)) {
0745                 ret = PTR_ERR(local_odp_mr->r_trans_private);
0746                 rdsdebug("get_mr ret %d %p\"", ret,
0747                      local_odp_mr->r_trans_private);
0748                 kfree(local_odp_mr);
0749                 ret = -EOPNOTSUPP;
0750                 goto out_pages;
0751             }
0752             rdsdebug("Need odp; local_odp_mr %p trans_private %p\n",
0753                  local_odp_mr, local_odp_mr->r_trans_private);
0754             op->op_odp_mr = local_odp_mr;
0755             op->op_odp_addr = iov->addr;
0756         }
0757 
0758         rdsdebug("RDS: nr_bytes %u nr %u iov->bytes %llu iov->addr %llx\n",
0759              nr_bytes, nr, iov->bytes, iov->addr);
0760 
0761         nr_bytes += iov->bytes;
0762 
0763         for (j = 0; j < nr; j++) {
0764             unsigned int offset = iov->addr & ~PAGE_MASK;
0765             struct scatterlist *sg;
0766 
0767             sg = &op->op_sg[op->op_nents + j];
0768             sg_set_page(sg, pages[j],
0769                     min_t(unsigned int, iov->bytes, PAGE_SIZE - offset),
0770                     offset);
0771 
0772             sg_dma_len(sg) = sg->length;
0773             rdsdebug("RDS: sg->offset %x sg->len %x iov->addr %llx iov->bytes %llu\n",
0774                    sg->offset, sg->length, iov->addr, iov->bytes);
0775 
0776             iov->addr += sg->length;
0777             iov->bytes -= sg->length;
0778         }
0779 
0780         op->op_nents += nr;
0781     }
0782 
0783     if (nr_bytes > args->remote_vec.bytes) {
0784         rdsdebug("RDS nr_bytes %u remote_bytes %u do not match\n",
0785                 nr_bytes,
0786                 (unsigned int) args->remote_vec.bytes);
0787         ret = -EINVAL;
0788         goto out_pages;
0789     }
0790     op->op_bytes = nr_bytes;
0791     ret = 0;
0792 
0793 out_pages:
0794     kfree(pages);
0795 out_ret:
0796     if (ret)
0797         rds_rdma_free_op(op);
0798     else
0799         rds_stats_inc(s_send_rdma);
0800 
0801     return ret;
0802 }
0803 
0804 /*
0805  * The application wants us to pass an RDMA destination (aka MR)
0806  * to the remote
0807  */
0808 int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
0809               struct cmsghdr *cmsg)
0810 {
0811     unsigned long flags;
0812     struct rds_mr *mr;
0813     u32 r_key;
0814     int err = 0;
0815 
0816     if (cmsg->cmsg_len < CMSG_LEN(sizeof(rds_rdma_cookie_t)) ||
0817         rm->m_rdma_cookie != 0)
0818         return -EINVAL;
0819 
0820     memcpy(&rm->m_rdma_cookie, CMSG_DATA(cmsg), sizeof(rm->m_rdma_cookie));
0821 
0822     /* We are reusing a previously mapped MR here. Most likely, the
0823      * application has written to the buffer, so we need to explicitly
0824      * flush those writes to RAM. Otherwise the HCA may not see them
0825      * when doing a DMA from that buffer.
0826      */
0827     r_key = rds_rdma_cookie_key(rm->m_rdma_cookie);
0828 
0829     spin_lock_irqsave(&rs->rs_rdma_lock, flags);
0830     mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL);
0831     if (!mr)
0832         err = -EINVAL;  /* invalid r_key */
0833     else
0834         kref_get(&mr->r_kref);
0835     spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
0836 
0837     if (mr) {
0838         mr->r_trans->sync_mr(mr->r_trans_private,
0839                      DMA_TO_DEVICE);
0840         rm->rdma.op_rdma_mr = mr;
0841     }
0842     return err;
0843 }
0844 
0845 /*
0846  * The application passes us an address range it wants to enable RDMA
0847  * to/from. We map the area, and save the <R_Key,offset> pair
0848  * in rm->m_rdma_cookie. This causes it to be sent along to the peer
0849  * in an extension header.
0850  */
0851 int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
0852               struct cmsghdr *cmsg)
0853 {
0854     if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_get_mr_args)) ||
0855         rm->m_rdma_cookie != 0)
0856         return -EINVAL;
0857 
0858     return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie,
0859                   &rm->rdma.op_rdma_mr, rm->m_conn_path);
0860 }
0861 
0862 /*
0863  * Fill in rds_message for an atomic request.
0864  */
0865 int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
0866             struct cmsghdr *cmsg)
0867 {
0868     struct page *page = NULL;
0869     struct rds_atomic_args *args;
0870     int ret = 0;
0871 
0872     if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_atomic_args))
0873      || rm->atomic.op_active)
0874         return -EINVAL;
0875 
0876     args = CMSG_DATA(cmsg);
0877 
0878     /* Nonmasked & masked cmsg ops converted to masked hw ops */
0879     switch (cmsg->cmsg_type) {
0880     case RDS_CMSG_ATOMIC_FADD:
0881         rm->atomic.op_type = RDS_ATOMIC_TYPE_FADD;
0882         rm->atomic.op_m_fadd.add = args->fadd.add;
0883         rm->atomic.op_m_fadd.nocarry_mask = 0;
0884         break;
0885     case RDS_CMSG_MASKED_ATOMIC_FADD:
0886         rm->atomic.op_type = RDS_ATOMIC_TYPE_FADD;
0887         rm->atomic.op_m_fadd.add = args->m_fadd.add;
0888         rm->atomic.op_m_fadd.nocarry_mask = args->m_fadd.nocarry_mask;
0889         break;
0890     case RDS_CMSG_ATOMIC_CSWP:
0891         rm->atomic.op_type = RDS_ATOMIC_TYPE_CSWP;
0892         rm->atomic.op_m_cswp.compare = args->cswp.compare;
0893         rm->atomic.op_m_cswp.swap = args->cswp.swap;
0894         rm->atomic.op_m_cswp.compare_mask = ~0;
0895         rm->atomic.op_m_cswp.swap_mask = ~0;
0896         break;
0897     case RDS_CMSG_MASKED_ATOMIC_CSWP:
0898         rm->atomic.op_type = RDS_ATOMIC_TYPE_CSWP;
0899         rm->atomic.op_m_cswp.compare = args->m_cswp.compare;
0900         rm->atomic.op_m_cswp.swap = args->m_cswp.swap;
0901         rm->atomic.op_m_cswp.compare_mask = args->m_cswp.compare_mask;
0902         rm->atomic.op_m_cswp.swap_mask = args->m_cswp.swap_mask;
0903         break;
0904     default:
0905         BUG(); /* should never happen */
0906     }
0907 
0908     rm->atomic.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
0909     rm->atomic.op_silent = !!(args->flags & RDS_RDMA_SILENT);
0910     rm->atomic.op_active = 1;
0911     rm->atomic.op_recverr = rs->rs_recverr;
0912     rm->atomic.op_sg = rds_message_alloc_sgs(rm, 1);
0913     if (IS_ERR(rm->atomic.op_sg)) {
0914         ret = PTR_ERR(rm->atomic.op_sg);
0915         goto err;
0916     }
0917 
0918     /* verify 8 byte-aligned */
0919     if (args->local_addr & 0x7) {
0920         ret = -EFAULT;
0921         goto err;
0922     }
0923 
0924     ret = rds_pin_pages(args->local_addr, 1, &page, 1);
0925     if (ret != 1)
0926         goto err;
0927     ret = 0;
0928 
0929     sg_set_page(rm->atomic.op_sg, page, 8, offset_in_page(args->local_addr));
0930 
0931     if (rm->atomic.op_notify || rm->atomic.op_recverr) {
0932         /* We allocate an uninitialized notifier here, because
0933          * we don't want to do that in the completion handler. We
0934          * would have to use GFP_ATOMIC there, and don't want to deal
0935          * with failed allocations.
0936          */
0937         rm->atomic.op_notifier = kmalloc(sizeof(*rm->atomic.op_notifier), GFP_KERNEL);
0938         if (!rm->atomic.op_notifier) {
0939             ret = -ENOMEM;
0940             goto err;
0941         }
0942 
0943         rm->atomic.op_notifier->n_user_token = args->user_token;
0944         rm->atomic.op_notifier->n_status = RDS_RDMA_SUCCESS;
0945     }
0946 
0947     rm->atomic.op_rkey = rds_rdma_cookie_key(args->cookie);
0948     rm->atomic.op_remote_addr = args->remote_addr + rds_rdma_cookie_offset(args->cookie);
0949 
0950     return ret;
0951 err:
0952     if (page)
0953         unpin_user_page(page);
0954     rm->atomic.op_active = 0;
0955     kfree(rm->atomic.op_notifier);
0956 
0957     return ret;
0958 }