Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /* RxRPC recvmsg() implementation
0003  *
0004  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
0005  * Written by David Howells (dhowells@redhat.com)
0006  */
0007 
0008 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
0009 
0010 #include <linux/net.h>
0011 #include <linux/skbuff.h>
0012 #include <linux/export.h>
0013 #include <linux/sched/signal.h>
0014 
0015 #include <net/sock.h>
0016 #include <net/af_rxrpc.h>
0017 #include "ar-internal.h"
0018 
0019 /*
0020  * Post a call for attention by the socket or kernel service.  Further
0021  * notifications are suppressed by putting recvmsg_link on a dummy queue.
0022  */
0023 void rxrpc_notify_socket(struct rxrpc_call *call)
0024 {
0025     struct rxrpc_sock *rx;
0026     struct sock *sk;
0027 
0028     _enter("%d", call->debug_id);
0029 
0030     if (!list_empty(&call->recvmsg_link))
0031         return;
0032 
0033     rcu_read_lock();
0034 
0035     rx = rcu_dereference(call->socket);
0036     sk = &rx->sk;
0037     if (rx && sk->sk_state < RXRPC_CLOSE) {
0038         if (call->notify_rx) {
0039             spin_lock_bh(&call->notify_lock);
0040             call->notify_rx(sk, call, call->user_call_ID);
0041             spin_unlock_bh(&call->notify_lock);
0042         } else {
0043             write_lock_bh(&rx->recvmsg_lock);
0044             if (list_empty(&call->recvmsg_link)) {
0045                 rxrpc_get_call(call, rxrpc_call_got);
0046                 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
0047             }
0048             write_unlock_bh(&rx->recvmsg_lock);
0049 
0050             if (!sock_flag(sk, SOCK_DEAD)) {
0051                 _debug("call %ps", sk->sk_data_ready);
0052                 sk->sk_data_ready(sk);
0053             }
0054         }
0055     }
0056 
0057     rcu_read_unlock();
0058     _leave("");
0059 }
0060 
0061 /*
0062  * Transition a call to the complete state.
0063  */
0064 bool __rxrpc_set_call_completion(struct rxrpc_call *call,
0065                  enum rxrpc_call_completion compl,
0066                  u32 abort_code,
0067                  int error)
0068 {
0069     if (call->state < RXRPC_CALL_COMPLETE) {
0070         call->abort_code = abort_code;
0071         call->error = error;
0072         call->completion = compl;
0073         call->state = RXRPC_CALL_COMPLETE;
0074         trace_rxrpc_call_complete(call);
0075         wake_up(&call->waitq);
0076         rxrpc_notify_socket(call);
0077         return true;
0078     }
0079     return false;
0080 }
0081 
0082 bool rxrpc_set_call_completion(struct rxrpc_call *call,
0083                    enum rxrpc_call_completion compl,
0084                    u32 abort_code,
0085                    int error)
0086 {
0087     bool ret = false;
0088 
0089     if (call->state < RXRPC_CALL_COMPLETE) {
0090         write_lock_bh(&call->state_lock);
0091         ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
0092         write_unlock_bh(&call->state_lock);
0093     }
0094     return ret;
0095 }
0096 
0097 /*
0098  * Record that a call successfully completed.
0099  */
0100 bool __rxrpc_call_completed(struct rxrpc_call *call)
0101 {
0102     return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
0103 }
0104 
0105 bool rxrpc_call_completed(struct rxrpc_call *call)
0106 {
0107     bool ret = false;
0108 
0109     if (call->state < RXRPC_CALL_COMPLETE) {
0110         write_lock_bh(&call->state_lock);
0111         ret = __rxrpc_call_completed(call);
0112         write_unlock_bh(&call->state_lock);
0113     }
0114     return ret;
0115 }
0116 
0117 /*
0118  * Record that a call is locally aborted.
0119  */
0120 bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call,
0121             rxrpc_seq_t seq, u32 abort_code, int error)
0122 {
0123     trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq,
0124               abort_code, error);
0125     return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED,
0126                        abort_code, error);
0127 }
0128 
0129 bool rxrpc_abort_call(const char *why, struct rxrpc_call *call,
0130               rxrpc_seq_t seq, u32 abort_code, int error)
0131 {
0132     bool ret;
0133 
0134     write_lock_bh(&call->state_lock);
0135     ret = __rxrpc_abort_call(why, call, seq, abort_code, error);
0136     write_unlock_bh(&call->state_lock);
0137     return ret;
0138 }
0139 
0140 /*
0141  * Pass a call terminating message to userspace.
0142  */
0143 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
0144 {
0145     u32 tmp = 0;
0146     int ret;
0147 
0148     switch (call->completion) {
0149     case RXRPC_CALL_SUCCEEDED:
0150         ret = 0;
0151         if (rxrpc_is_service_call(call))
0152             ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
0153         break;
0154     case RXRPC_CALL_REMOTELY_ABORTED:
0155         tmp = call->abort_code;
0156         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
0157         break;
0158     case RXRPC_CALL_LOCALLY_ABORTED:
0159         tmp = call->abort_code;
0160         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
0161         break;
0162     case RXRPC_CALL_NETWORK_ERROR:
0163         tmp = -call->error;
0164         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
0165         break;
0166     case RXRPC_CALL_LOCAL_ERROR:
0167         tmp = -call->error;
0168         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
0169         break;
0170     default:
0171         pr_err("Invalid terminal call state %u\n", call->state);
0172         BUG();
0173         break;
0174     }
0175 
0176     trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
0177                 call->rx_pkt_offset, call->rx_pkt_len, ret);
0178     return ret;
0179 }
0180 
0181 /*
0182  * End the packet reception phase.
0183  */
0184 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
0185 {
0186     _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
0187 
0188     trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
0189     ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
0190 
0191     if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
0192         rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, false, true,
0193                   rxrpc_propose_ack_terminal_ack);
0194         //rxrpc_send_ack_packet(call, false, NULL);
0195     }
0196 
0197     write_lock_bh(&call->state_lock);
0198 
0199     switch (call->state) {
0200     case RXRPC_CALL_CLIENT_RECV_REPLY:
0201         __rxrpc_call_completed(call);
0202         write_unlock_bh(&call->state_lock);
0203         break;
0204 
0205     case RXRPC_CALL_SERVER_RECV_REQUEST:
0206         call->tx_phase = true;
0207         call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
0208         call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
0209         write_unlock_bh(&call->state_lock);
0210         rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false, true,
0211                   rxrpc_propose_ack_processing_op);
0212         break;
0213     default:
0214         write_unlock_bh(&call->state_lock);
0215         break;
0216     }
0217 }
0218 
0219 /*
0220  * Discard a packet we've used up and advance the Rx window by one.
0221  */
0222 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
0223 {
0224     struct rxrpc_skb_priv *sp;
0225     struct sk_buff *skb;
0226     rxrpc_serial_t serial;
0227     rxrpc_seq_t hard_ack, top;
0228     bool last = false;
0229     u8 subpacket;
0230     int ix;
0231 
0232     _enter("%d", call->debug_id);
0233 
0234     hard_ack = call->rx_hard_ack;
0235     top = smp_load_acquire(&call->rx_top);
0236     ASSERT(before(hard_ack, top));
0237 
0238     hard_ack++;
0239     ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
0240     skb = call->rxtx_buffer[ix];
0241     rxrpc_see_skb(skb, rxrpc_skb_rotated);
0242     sp = rxrpc_skb(skb);
0243 
0244     subpacket = call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
0245     serial = sp->hdr.serial + subpacket;
0246 
0247     if (subpacket == sp->nr_subpackets - 1 &&
0248         sp->rx_flags & RXRPC_SKB_INCL_LAST)
0249         last = true;
0250 
0251     call->rxtx_buffer[ix] = NULL;
0252     call->rxtx_annotations[ix] = 0;
0253     /* Barrier against rxrpc_input_data(). */
0254     smp_store_release(&call->rx_hard_ack, hard_ack);
0255 
0256     rxrpc_free_skb(skb, rxrpc_skb_freed);
0257 
0258     trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
0259     if (last) {
0260         rxrpc_end_rx_phase(call, serial);
0261     } else {
0262         /* Check to see if there's an ACK that needs sending. */
0263         if (atomic_inc_return(&call->ackr_nr_consumed) > 2)
0264             rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial,
0265                       true, false,
0266                       rxrpc_propose_ack_rotate_rx);
0267         if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
0268             rxrpc_send_ack_packet(call, false, NULL);
0269     }
0270 }
0271 
0272 /*
0273  * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
0274  * padding, but if this is the case, the packet length will be resident in the
0275  * socket buffer.  Note that we can't modify the master skb info as the skb may
0276  * be the home to multiple subpackets.
0277  */
0278 static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
0279                    u8 annotation,
0280                    unsigned int offset, unsigned int len)
0281 {
0282     struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
0283     rxrpc_seq_t seq = sp->hdr.seq;
0284     u16 cksum = sp->hdr.cksum;
0285     u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
0286 
0287     _enter("");
0288 
0289     /* For all but the head jumbo subpacket, the security checksum is in a
0290      * jumbo header immediately prior to the data.
0291      */
0292     if (subpacket > 0) {
0293         __be16 tmp;
0294         if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
0295             BUG();
0296         cksum = ntohs(tmp);
0297         seq += subpacket;
0298     }
0299 
0300     return call->security->verify_packet(call, skb, offset, len,
0301                          seq, cksum);
0302 }
0303 
0304 /*
0305  * Locate the data within a packet.  This is complicated by:
0306  *
0307  * (1) An skb may contain a jumbo packet - so we have to find the appropriate
0308  *     subpacket.
0309  *
0310  * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
0311  *     contains an extra header which includes the true length of the data,
0312  *     excluding any encrypted padding.
0313  */
0314 static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
0315                  u8 *_annotation,
0316                  unsigned int *_offset, unsigned int *_len,
0317                  bool *_last)
0318 {
0319     struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
0320     unsigned int offset = sizeof(struct rxrpc_wire_header);
0321     unsigned int len;
0322     bool last = false;
0323     int ret;
0324     u8 annotation = *_annotation;
0325     u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
0326 
0327     /* Locate the subpacket */
0328     offset += subpacket * RXRPC_JUMBO_SUBPKTLEN;
0329     len = skb->len - offset;
0330     if (subpacket < sp->nr_subpackets - 1)
0331         len = RXRPC_JUMBO_DATALEN;
0332     else if (sp->rx_flags & RXRPC_SKB_INCL_LAST)
0333         last = true;
0334 
0335     if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
0336         ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
0337         if (ret < 0)
0338             return ret;
0339         *_annotation |= RXRPC_RX_ANNO_VERIFIED;
0340     }
0341 
0342     *_offset = offset;
0343     *_len = len;
0344     *_last = last;
0345     call->security->locate_data(call, skb, _offset, _len);
0346     return 0;
0347 }
0348 
0349 /*
0350  * Deliver messages to a call.  This keeps processing packets until the buffer
0351  * is filled and we find either more DATA (returns 0) or the end of the DATA
0352  * (returns 1).  If more packets are required, it returns -EAGAIN.
0353  */
0354 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
0355                   struct msghdr *msg, struct iov_iter *iter,
0356                   size_t len, int flags, size_t *_offset)
0357 {
0358     struct rxrpc_skb_priv *sp;
0359     struct sk_buff *skb;
0360     rxrpc_serial_t serial;
0361     rxrpc_seq_t hard_ack, top, seq;
0362     size_t remain;
0363     bool rx_pkt_last;
0364     unsigned int rx_pkt_offset, rx_pkt_len;
0365     int ix, copy, ret = -EAGAIN, ret2;
0366 
0367     if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) &&
0368         call->ackr_reason)
0369         rxrpc_send_ack_packet(call, false, NULL);
0370 
0371     rx_pkt_offset = call->rx_pkt_offset;
0372     rx_pkt_len = call->rx_pkt_len;
0373     rx_pkt_last = call->rx_pkt_last;
0374 
0375     if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
0376         seq = call->rx_hard_ack;
0377         ret = 1;
0378         goto done;
0379     }
0380 
0381     /* Barriers against rxrpc_input_data(). */
0382     hard_ack = call->rx_hard_ack;
0383     seq = hard_ack + 1;
0384 
0385     while (top = smp_load_acquire(&call->rx_top),
0386            before_eq(seq, top)
0387            ) {
0388         ix = seq & RXRPC_RXTX_BUFF_MASK;
0389         skb = call->rxtx_buffer[ix];
0390         if (!skb) {
0391             trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
0392                         rx_pkt_offset, rx_pkt_len, 0);
0393             break;
0394         }
0395         smp_rmb();
0396         rxrpc_see_skb(skb, rxrpc_skb_seen);
0397         sp = rxrpc_skb(skb);
0398 
0399         if (!(flags & MSG_PEEK)) {
0400             serial = sp->hdr.serial;
0401             serial += call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
0402             trace_rxrpc_receive(call, rxrpc_receive_front,
0403                         serial, seq);
0404         }
0405 
0406         if (msg)
0407             sock_recv_timestamp(msg, sock->sk, skb);
0408 
0409         if (rx_pkt_offset == 0) {
0410             ret2 = rxrpc_locate_data(call, skb,
0411                          &call->rxtx_annotations[ix],
0412                          &rx_pkt_offset, &rx_pkt_len,
0413                          &rx_pkt_last);
0414             trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
0415                         rx_pkt_offset, rx_pkt_len, ret2);
0416             if (ret2 < 0) {
0417                 ret = ret2;
0418                 goto out;
0419             }
0420         } else {
0421             trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
0422                         rx_pkt_offset, rx_pkt_len, 0);
0423         }
0424 
0425         /* We have to handle short, empty and used-up DATA packets. */
0426         remain = len - *_offset;
0427         copy = rx_pkt_len;
0428         if (copy > remain)
0429             copy = remain;
0430         if (copy > 0) {
0431             ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
0432                               copy);
0433             if (ret2 < 0) {
0434                 ret = ret2;
0435                 goto out;
0436             }
0437 
0438             /* handle piecemeal consumption of data packets */
0439             rx_pkt_offset += copy;
0440             rx_pkt_len -= copy;
0441             *_offset += copy;
0442         }
0443 
0444         if (rx_pkt_len > 0) {
0445             trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
0446                         rx_pkt_offset, rx_pkt_len, 0);
0447             ASSERTCMP(*_offset, ==, len);
0448             ret = 0;
0449             break;
0450         }
0451 
0452         /* The whole packet has been transferred. */
0453         if (!(flags & MSG_PEEK))
0454             rxrpc_rotate_rx_window(call);
0455         rx_pkt_offset = 0;
0456         rx_pkt_len = 0;
0457 
0458         if (rx_pkt_last) {
0459             ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
0460             ret = 1;
0461             goto out;
0462         }
0463 
0464         seq++;
0465     }
0466 
0467 out:
0468     if (!(flags & MSG_PEEK)) {
0469         call->rx_pkt_offset = rx_pkt_offset;
0470         call->rx_pkt_len = rx_pkt_len;
0471         call->rx_pkt_last = rx_pkt_last;
0472     }
0473 done:
0474     trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
0475                 rx_pkt_offset, rx_pkt_len, ret);
0476     if (ret == -EAGAIN)
0477         set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags);
0478     return ret;
0479 }
0480 
0481 /*
0482  * Receive a message from an RxRPC socket
0483  * - we need to be careful about two or more threads calling recvmsg
0484  *   simultaneously
0485  */
0486 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
0487           int flags)
0488 {
0489     struct rxrpc_call *call;
0490     struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
0491     struct list_head *l;
0492     size_t copied = 0;
0493     long timeo;
0494     int ret;
0495 
0496     DEFINE_WAIT(wait);
0497 
0498     trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
0499 
0500     if (flags & (MSG_OOB | MSG_TRUNC))
0501         return -EOPNOTSUPP;
0502 
0503     timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
0504 
0505 try_again:
0506     lock_sock(&rx->sk);
0507 
0508     /* Return immediately if a client socket has no outstanding calls */
0509     if (RB_EMPTY_ROOT(&rx->calls) &&
0510         list_empty(&rx->recvmsg_q) &&
0511         rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
0512         release_sock(&rx->sk);
0513         return -EAGAIN;
0514     }
0515 
0516     if (list_empty(&rx->recvmsg_q)) {
0517         ret = -EWOULDBLOCK;
0518         if (timeo == 0) {
0519             call = NULL;
0520             goto error_no_call;
0521         }
0522 
0523         release_sock(&rx->sk);
0524 
0525         /* Wait for something to happen */
0526         prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
0527                       TASK_INTERRUPTIBLE);
0528         ret = sock_error(&rx->sk);
0529         if (ret)
0530             goto wait_error;
0531 
0532         if (list_empty(&rx->recvmsg_q)) {
0533             if (signal_pending(current))
0534                 goto wait_interrupted;
0535             trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
0536                         0, 0, 0, 0);
0537             timeo = schedule_timeout(timeo);
0538         }
0539         finish_wait(sk_sleep(&rx->sk), &wait);
0540         goto try_again;
0541     }
0542 
0543     /* Find the next call and dequeue it if we're not just peeking.  If we
0544      * do dequeue it, that comes with a ref that we will need to release.
0545      */
0546     write_lock_bh(&rx->recvmsg_lock);
0547     l = rx->recvmsg_q.next;
0548     call = list_entry(l, struct rxrpc_call, recvmsg_link);
0549     if (!(flags & MSG_PEEK))
0550         list_del_init(&call->recvmsg_link);
0551     else
0552         rxrpc_get_call(call, rxrpc_call_got);
0553     write_unlock_bh(&rx->recvmsg_lock);
0554 
0555     trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
0556 
0557     /* We're going to drop the socket lock, so we need to lock the call
0558      * against interference by sendmsg.
0559      */
0560     if (!mutex_trylock(&call->user_mutex)) {
0561         ret = -EWOULDBLOCK;
0562         if (flags & MSG_DONTWAIT)
0563             goto error_requeue_call;
0564         ret = -ERESTARTSYS;
0565         if (mutex_lock_interruptible(&call->user_mutex) < 0)
0566             goto error_requeue_call;
0567     }
0568 
0569     release_sock(&rx->sk);
0570 
0571     if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
0572         BUG();
0573 
0574     if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
0575         if (flags & MSG_CMSG_COMPAT) {
0576             unsigned int id32 = call->user_call_ID;
0577 
0578             ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
0579                        sizeof(unsigned int), &id32);
0580         } else {
0581             unsigned long idl = call->user_call_ID;
0582 
0583             ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
0584                        sizeof(unsigned long), &idl);
0585         }
0586         if (ret < 0)
0587             goto error_unlock_call;
0588     }
0589 
0590     if (msg->msg_name && call->peer) {
0591         struct sockaddr_rxrpc *srx = msg->msg_name;
0592         size_t len = sizeof(call->peer->srx);
0593 
0594         memcpy(msg->msg_name, &call->peer->srx, len);
0595         srx->srx_service = call->service_id;
0596         msg->msg_namelen = len;
0597     }
0598 
0599     switch (READ_ONCE(call->state)) {
0600     case RXRPC_CALL_CLIENT_RECV_REPLY:
0601     case RXRPC_CALL_SERVER_RECV_REQUEST:
0602     case RXRPC_CALL_SERVER_ACK_REQUEST:
0603         ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
0604                      flags, &copied);
0605         if (ret == -EAGAIN)
0606             ret = 0;
0607 
0608         if (after(call->rx_top, call->rx_hard_ack) &&
0609             call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
0610             rxrpc_notify_socket(call);
0611         break;
0612     default:
0613         ret = 0;
0614         break;
0615     }
0616 
0617     if (ret < 0)
0618         goto error_unlock_call;
0619 
0620     if (call->state == RXRPC_CALL_COMPLETE) {
0621         ret = rxrpc_recvmsg_term(call, msg);
0622         if (ret < 0)
0623             goto error_unlock_call;
0624         if (!(flags & MSG_PEEK))
0625             rxrpc_release_call(rx, call);
0626         msg->msg_flags |= MSG_EOR;
0627         ret = 1;
0628     }
0629 
0630     if (ret == 0)
0631         msg->msg_flags |= MSG_MORE;
0632     else
0633         msg->msg_flags &= ~MSG_MORE;
0634     ret = copied;
0635 
0636 error_unlock_call:
0637     mutex_unlock(&call->user_mutex);
0638     rxrpc_put_call(call, rxrpc_call_put);
0639     trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
0640     return ret;
0641 
0642 error_requeue_call:
0643     if (!(flags & MSG_PEEK)) {
0644         write_lock_bh(&rx->recvmsg_lock);
0645         list_add(&call->recvmsg_link, &rx->recvmsg_q);
0646         write_unlock_bh(&rx->recvmsg_lock);
0647         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
0648     } else {
0649         rxrpc_put_call(call, rxrpc_call_put);
0650     }
0651 error_no_call:
0652     release_sock(&rx->sk);
0653 error_trace:
0654     trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
0655     return ret;
0656 
0657 wait_interrupted:
0658     ret = sock_intr_errno(timeo);
0659 wait_error:
0660     finish_wait(sk_sleep(&rx->sk), &wait);
0661     call = NULL;
0662     goto error_trace;
0663 }
0664 
0665 /**
0666  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
0667  * @sock: The socket that the call exists on
0668  * @call: The call to send data through
0669  * @iter: The buffer to receive into
0670  * @_len: The amount of data we want to receive (decreased on return)
0671  * @want_more: True if more data is expected to be read
0672  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
0673  * @_service: Where to store the actual service ID (may be upgraded)
0674  *
0675  * Allow a kernel service to receive data and pick up information about the
0676  * state of a call.  Returns 0 if got what was asked for and there's more
0677  * available, 1 if we got what was asked for and we're at the end of the data
0678  * and -EAGAIN if we need more data.
0679  *
0680  * Note that we may return -EAGAIN to drain empty packets at the end of the
0681  * data, even if we've already copied over the requested data.
0682  *
0683  * *_abort should also be initialised to 0.
0684  */
0685 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
0686                struct iov_iter *iter, size_t *_len,
0687                bool want_more, u32 *_abort, u16 *_service)
0688 {
0689     size_t offset = 0;
0690     int ret;
0691 
0692     _enter("{%d,%s},%zu,%d",
0693            call->debug_id, rxrpc_call_states[call->state],
0694            *_len, want_more);
0695 
0696     ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING);
0697 
0698     mutex_lock(&call->user_mutex);
0699 
0700     switch (READ_ONCE(call->state)) {
0701     case RXRPC_CALL_CLIENT_RECV_REPLY:
0702     case RXRPC_CALL_SERVER_RECV_REQUEST:
0703     case RXRPC_CALL_SERVER_ACK_REQUEST:
0704         ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
0705                      *_len, 0, &offset);
0706         *_len -= offset;
0707         if (ret < 0)
0708             goto out;
0709 
0710         /* We can only reach here with a partially full buffer if we
0711          * have reached the end of the data.  We must otherwise have a
0712          * full buffer or have been given -EAGAIN.
0713          */
0714         if (ret == 1) {
0715             if (iov_iter_count(iter) > 0)
0716                 goto short_data;
0717             if (!want_more)
0718                 goto read_phase_complete;
0719             ret = 0;
0720             goto out;
0721         }
0722 
0723         if (!want_more)
0724             goto excess_data;
0725         goto out;
0726 
0727     case RXRPC_CALL_COMPLETE:
0728         goto call_complete;
0729 
0730     default:
0731         ret = -EINPROGRESS;
0732         goto out;
0733     }
0734 
0735 read_phase_complete:
0736     ret = 1;
0737 out:
0738     switch (call->ackr_reason) {
0739     case RXRPC_ACK_IDLE:
0740         break;
0741     case RXRPC_ACK_DELAY:
0742         if (ret != -EAGAIN)
0743             break;
0744         fallthrough;
0745     default:
0746         rxrpc_send_ack_packet(call, false, NULL);
0747     }
0748 
0749     if (_service)
0750         *_service = call->service_id;
0751     mutex_unlock(&call->user_mutex);
0752     _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
0753     return ret;
0754 
0755 short_data:
0756     trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data"));
0757     ret = -EBADMSG;
0758     goto out;
0759 excess_data:
0760     trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
0761     ret = -EMSGSIZE;
0762     goto out;
0763 call_complete:
0764     *_abort = call->abort_code;
0765     ret = call->error;
0766     if (call->completion == RXRPC_CALL_SUCCEEDED) {
0767         ret = 1;
0768         if (iov_iter_count(iter) > 0)
0769             ret = -ECONNRESET;
0770     }
0771     goto out;
0772 }
0773 EXPORT_SYMBOL(rxrpc_kernel_recv_data);