Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 /* Watch queue and general notification mechanism, built on pipes
0003  *
0004  * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
0005  * Written by David Howells (dhowells@redhat.com)
0006  *
0007  * See Documentation/core-api/watch_queue.rst
0008  */
0009 
0010 #define pr_fmt(fmt) "watchq: " fmt
0011 #include <linux/module.h>
0012 #include <linux/init.h>
0013 #include <linux/sched.h>
0014 #include <linux/slab.h>
0015 #include <linux/printk.h>
0016 #include <linux/miscdevice.h>
0017 #include <linux/fs.h>
0018 #include <linux/mm.h>
0019 #include <linux/pagemap.h>
0020 #include <linux/poll.h>
0021 #include <linux/uaccess.h>
0022 #include <linux/vmalloc.h>
0023 #include <linux/file.h>
0024 #include <linux/security.h>
0025 #include <linux/cred.h>
0026 #include <linux/sched/signal.h>
0027 #include <linux/watch_queue.h>
0028 #include <linux/pipe_fs_i.h>
0029 
0030 MODULE_DESCRIPTION("Watch queue");
0031 MODULE_AUTHOR("Red Hat, Inc.");
0032 MODULE_LICENSE("GPL");
0033 
0034 #define WATCH_QUEUE_NOTE_SIZE 128
0035 #define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE)
0036 
0037 /*
0038  * This must be called under the RCU read-lock, which makes
0039  * sure that the wqueue still exists. It can then take the lock,
0040  * and check that the wqueue hasn't been destroyed, which in
0041  * turn makes sure that the notification pipe still exists.
0042  */
0043 static inline bool lock_wqueue(struct watch_queue *wqueue)
0044 {
0045     spin_lock_bh(&wqueue->lock);
0046     if (unlikely(wqueue->defunct)) {
0047         spin_unlock_bh(&wqueue->lock);
0048         return false;
0049     }
0050     return true;
0051 }
0052 
0053 static inline void unlock_wqueue(struct watch_queue *wqueue)
0054 {
0055     spin_unlock_bh(&wqueue->lock);
0056 }
0057 
0058 static void watch_queue_pipe_buf_release(struct pipe_inode_info *pipe,
0059                      struct pipe_buffer *buf)
0060 {
0061     struct watch_queue *wqueue = (struct watch_queue *)buf->private;
0062     struct page *page;
0063     unsigned int bit;
0064 
0065     /* We need to work out which note within the page this refers to, but
0066      * the note might have been maximum size, so merely ANDing the offset
0067      * off doesn't work.  OTOH, the note must've been more than zero size.
0068      */
0069     bit = buf->offset + buf->len;
0070     if ((bit & (WATCH_QUEUE_NOTE_SIZE - 1)) == 0)
0071         bit -= WATCH_QUEUE_NOTE_SIZE;
0072     bit /= WATCH_QUEUE_NOTE_SIZE;
0073 
0074     page = buf->page;
0075     bit += page->index;
0076 
0077     set_bit(bit, wqueue->notes_bitmap);
0078     generic_pipe_buf_release(pipe, buf);
0079 }
0080 
0081 // No try_steal function => no stealing
0082 #define watch_queue_pipe_buf_try_steal NULL
0083 
0084 /* New data written to a pipe may be appended to a buffer with this type. */
0085 static const struct pipe_buf_operations watch_queue_pipe_buf_ops = {
0086     .release    = watch_queue_pipe_buf_release,
0087     .try_steal  = watch_queue_pipe_buf_try_steal,
0088     .get        = generic_pipe_buf_get,
0089 };
0090 
0091 /*
0092  * Post a notification to a watch queue.
0093  *
0094  * Must be called with the RCU lock for reading, and the
0095  * watch_queue lock held, which guarantees that the pipe
0096  * hasn't been released.
0097  */
0098 static bool post_one_notification(struct watch_queue *wqueue,
0099                   struct watch_notification *n)
0100 {
0101     void *p;
0102     struct pipe_inode_info *pipe = wqueue->pipe;
0103     struct pipe_buffer *buf;
0104     struct page *page;
0105     unsigned int head, tail, mask, note, offset, len;
0106     bool done = false;
0107 
0108     if (!pipe)
0109         return false;
0110 
0111     spin_lock_irq(&pipe->rd_wait.lock);
0112 
0113     mask = pipe->ring_size - 1;
0114     head = pipe->head;
0115     tail = pipe->tail;
0116     if (pipe_full(head, tail, pipe->ring_size))
0117         goto lost;
0118 
0119     note = find_first_bit(wqueue->notes_bitmap, wqueue->nr_notes);
0120     if (note >= wqueue->nr_notes)
0121         goto lost;
0122 
0123     page = wqueue->notes[note / WATCH_QUEUE_NOTES_PER_PAGE];
0124     offset = note % WATCH_QUEUE_NOTES_PER_PAGE * WATCH_QUEUE_NOTE_SIZE;
0125     get_page(page);
0126     len = n->info & WATCH_INFO_LENGTH;
0127     p = kmap_atomic(page);
0128     memcpy(p + offset, n, len);
0129     kunmap_atomic(p);
0130 
0131     buf = &pipe->bufs[head & mask];
0132     buf->page = page;
0133     buf->private = (unsigned long)wqueue;
0134     buf->ops = &watch_queue_pipe_buf_ops;
0135     buf->offset = offset;
0136     buf->len = len;
0137     buf->flags = PIPE_BUF_FLAG_WHOLE;
0138     smp_store_release(&pipe->head, head + 1); /* vs pipe_read() */
0139 
0140     if (!test_and_clear_bit(note, wqueue->notes_bitmap)) {
0141         spin_unlock_irq(&pipe->rd_wait.lock);
0142         BUG();
0143     }
0144     wake_up_interruptible_sync_poll_locked(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
0145     done = true;
0146 
0147 out:
0148     spin_unlock_irq(&pipe->rd_wait.lock);
0149     if (done)
0150         kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
0151     return done;
0152 
0153 lost:
0154     buf = &pipe->bufs[(head - 1) & mask];
0155     buf->flags |= PIPE_BUF_FLAG_LOSS;
0156     goto out;
0157 }
0158 
0159 /*
0160  * Apply filter rules to a notification.
0161  */
0162 static bool filter_watch_notification(const struct watch_filter *wf,
0163                       const struct watch_notification *n)
0164 {
0165     const struct watch_type_filter *wt;
0166     unsigned int st_bits = sizeof(wt->subtype_filter[0]) * 8;
0167     unsigned int st_index = n->subtype / st_bits;
0168     unsigned int st_bit = 1U << (n->subtype % st_bits);
0169     int i;
0170 
0171     if (!test_bit(n->type, wf->type_filter))
0172         return false;
0173 
0174     for (i = 0; i < wf->nr_filters; i++) {
0175         wt = &wf->filters[i];
0176         if (n->type == wt->type &&
0177             (wt->subtype_filter[st_index] & st_bit) &&
0178             (n->info & wt->info_mask) == wt->info_filter)
0179             return true;
0180     }
0181 
0182     return false; /* If there is a filter, the default is to reject. */
0183 }
0184 
0185 /**
0186  * __post_watch_notification - Post an event notification
0187  * @wlist: The watch list to post the event to.
0188  * @n: The notification record to post.
0189  * @cred: The creds of the process that triggered the notification.
0190  * @id: The ID to match on the watch.
0191  *
0192  * Post a notification of an event into a set of watch queues and let the users
0193  * know.
0194  *
0195  * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and
0196  * should be in units of sizeof(*n).
0197  */
0198 void __post_watch_notification(struct watch_list *wlist,
0199                    struct watch_notification *n,
0200                    const struct cred *cred,
0201                    u64 id)
0202 {
0203     const struct watch_filter *wf;
0204     struct watch_queue *wqueue;
0205     struct watch *watch;
0206 
0207     if (((n->info & WATCH_INFO_LENGTH) >> WATCH_INFO_LENGTH__SHIFT) == 0) {
0208         WARN_ON(1);
0209         return;
0210     }
0211 
0212     rcu_read_lock();
0213 
0214     hlist_for_each_entry_rcu(watch, &wlist->watchers, list_node) {
0215         if (watch->id != id)
0216             continue;
0217         n->info &= ~WATCH_INFO_ID;
0218         n->info |= watch->info_id;
0219 
0220         wqueue = rcu_dereference(watch->queue);
0221         wf = rcu_dereference(wqueue->filter);
0222         if (wf && !filter_watch_notification(wf, n))
0223             continue;
0224 
0225         if (security_post_notification(watch->cred, cred, n) < 0)
0226             continue;
0227 
0228         if (lock_wqueue(wqueue)) {
0229             post_one_notification(wqueue, n);
0230             unlock_wqueue(wqueue);
0231         }
0232     }
0233 
0234     rcu_read_unlock();
0235 }
0236 EXPORT_SYMBOL(__post_watch_notification);
0237 
0238 /*
0239  * Allocate sufficient pages to preallocation for the requested number of
0240  * notifications.
0241  */
0242 long watch_queue_set_size(struct pipe_inode_info *pipe, unsigned int nr_notes)
0243 {
0244     struct watch_queue *wqueue = pipe->watch_queue;
0245     struct page **pages;
0246     unsigned long *bitmap;
0247     unsigned long user_bufs;
0248     int ret, i, nr_pages;
0249 
0250     if (!wqueue)
0251         return -ENODEV;
0252     if (wqueue->notes)
0253         return -EBUSY;
0254 
0255     if (nr_notes < 1 ||
0256         nr_notes > 512) /* TODO: choose a better hard limit */
0257         return -EINVAL;
0258 
0259     nr_pages = (nr_notes + WATCH_QUEUE_NOTES_PER_PAGE - 1);
0260     nr_pages /= WATCH_QUEUE_NOTES_PER_PAGE;
0261     user_bufs = account_pipe_buffers(pipe->user, pipe->nr_accounted, nr_pages);
0262 
0263     if (nr_pages > pipe->max_usage &&
0264         (too_many_pipe_buffers_hard(user_bufs) ||
0265          too_many_pipe_buffers_soft(user_bufs)) &&
0266         pipe_is_unprivileged_user()) {
0267         ret = -EPERM;
0268         goto error;
0269     }
0270 
0271     nr_notes = nr_pages * WATCH_QUEUE_NOTES_PER_PAGE;
0272     ret = pipe_resize_ring(pipe, roundup_pow_of_two(nr_notes));
0273     if (ret < 0)
0274         goto error;
0275 
0276     pages = kcalloc(sizeof(struct page *), nr_pages, GFP_KERNEL);
0277     if (!pages)
0278         goto error;
0279 
0280     for (i = 0; i < nr_pages; i++) {
0281         pages[i] = alloc_page(GFP_KERNEL);
0282         if (!pages[i])
0283             goto error_p;
0284         pages[i]->index = i * WATCH_QUEUE_NOTES_PER_PAGE;
0285     }
0286 
0287     bitmap = bitmap_alloc(nr_notes, GFP_KERNEL);
0288     if (!bitmap)
0289         goto error_p;
0290 
0291     bitmap_fill(bitmap, nr_notes);
0292     wqueue->notes = pages;
0293     wqueue->notes_bitmap = bitmap;
0294     wqueue->nr_pages = nr_pages;
0295     wqueue->nr_notes = nr_notes;
0296     return 0;
0297 
0298 error_p:
0299     while (--i >= 0)
0300         __free_page(pages[i]);
0301     kfree(pages);
0302 error:
0303     (void) account_pipe_buffers(pipe->user, nr_pages, pipe->nr_accounted);
0304     return ret;
0305 }
0306 
0307 /*
0308  * Set the filter on a watch queue.
0309  */
0310 long watch_queue_set_filter(struct pipe_inode_info *pipe,
0311                 struct watch_notification_filter __user *_filter)
0312 {
0313     struct watch_notification_type_filter *tf;
0314     struct watch_notification_filter filter;
0315     struct watch_type_filter *q;
0316     struct watch_filter *wfilter;
0317     struct watch_queue *wqueue = pipe->watch_queue;
0318     int ret, nr_filter = 0, i;
0319 
0320     if (!wqueue)
0321         return -ENODEV;
0322 
0323     if (!_filter) {
0324         /* Remove the old filter */
0325         wfilter = NULL;
0326         goto set;
0327     }
0328 
0329     /* Grab the user's filter specification */
0330     if (copy_from_user(&filter, _filter, sizeof(filter)) != 0)
0331         return -EFAULT;
0332     if (filter.nr_filters == 0 ||
0333         filter.nr_filters > 16 ||
0334         filter.__reserved != 0)
0335         return -EINVAL;
0336 
0337     tf = memdup_user(_filter->filters, filter.nr_filters * sizeof(*tf));
0338     if (IS_ERR(tf))
0339         return PTR_ERR(tf);
0340 
0341     ret = -EINVAL;
0342     for (i = 0; i < filter.nr_filters; i++) {
0343         if ((tf[i].info_filter & ~tf[i].info_mask) ||
0344             tf[i].info_mask & WATCH_INFO_LENGTH)
0345             goto err_filter;
0346         /* Ignore any unknown types */
0347         if (tf[i].type >= WATCH_TYPE__NR)
0348             continue;
0349         nr_filter++;
0350     }
0351 
0352     /* Now we need to build the internal filter from only the relevant
0353      * user-specified filters.
0354      */
0355     ret = -ENOMEM;
0356     wfilter = kzalloc(struct_size(wfilter, filters, nr_filter), GFP_KERNEL);
0357     if (!wfilter)
0358         goto err_filter;
0359     wfilter->nr_filters = nr_filter;
0360 
0361     q = wfilter->filters;
0362     for (i = 0; i < filter.nr_filters; i++) {
0363         if (tf[i].type >= WATCH_TYPE__NR)
0364             continue;
0365 
0366         q->type         = tf[i].type;
0367         q->info_filter      = tf[i].info_filter;
0368         q->info_mask        = tf[i].info_mask;
0369         q->subtype_filter[0]    = tf[i].subtype_filter[0];
0370         __set_bit(q->type, wfilter->type_filter);
0371         q++;
0372     }
0373 
0374     kfree(tf);
0375 set:
0376     pipe_lock(pipe);
0377     wfilter = rcu_replace_pointer(wqueue->filter, wfilter,
0378                       lockdep_is_held(&pipe->mutex));
0379     pipe_unlock(pipe);
0380     if (wfilter)
0381         kfree_rcu(wfilter, rcu);
0382     return 0;
0383 
0384 err_filter:
0385     kfree(tf);
0386     return ret;
0387 }
0388 
0389 static void __put_watch_queue(struct kref *kref)
0390 {
0391     struct watch_queue *wqueue =
0392         container_of(kref, struct watch_queue, usage);
0393     struct watch_filter *wfilter;
0394     int i;
0395 
0396     for (i = 0; i < wqueue->nr_pages; i++)
0397         __free_page(wqueue->notes[i]);
0398     kfree(wqueue->notes);
0399     bitmap_free(wqueue->notes_bitmap);
0400 
0401     wfilter = rcu_access_pointer(wqueue->filter);
0402     if (wfilter)
0403         kfree_rcu(wfilter, rcu);
0404     kfree_rcu(wqueue, rcu);
0405 }
0406 
0407 /**
0408  * put_watch_queue - Dispose of a ref on a watchqueue.
0409  * @wqueue: The watch queue to unref.
0410  */
0411 void put_watch_queue(struct watch_queue *wqueue)
0412 {
0413     kref_put(&wqueue->usage, __put_watch_queue);
0414 }
0415 EXPORT_SYMBOL(put_watch_queue);
0416 
0417 static void free_watch(struct rcu_head *rcu)
0418 {
0419     struct watch *watch = container_of(rcu, struct watch, rcu);
0420 
0421     put_watch_queue(rcu_access_pointer(watch->queue));
0422     atomic_dec(&watch->cred->user->nr_watches);
0423     put_cred(watch->cred);
0424     kfree(watch);
0425 }
0426 
0427 static void __put_watch(struct kref *kref)
0428 {
0429     struct watch *watch = container_of(kref, struct watch, usage);
0430 
0431     call_rcu(&watch->rcu, free_watch);
0432 }
0433 
0434 /*
0435  * Discard a watch.
0436  */
0437 static void put_watch(struct watch *watch)
0438 {
0439     kref_put(&watch->usage, __put_watch);
0440 }
0441 
0442 /**
0443  * init_watch - Initialise a watch
0444  * @watch: The watch to initialise.
0445  * @wqueue: The queue to assign.
0446  *
0447  * Initialise a watch and set the watch queue.
0448  */
0449 void init_watch(struct watch *watch, struct watch_queue *wqueue)
0450 {
0451     kref_init(&watch->usage);
0452     INIT_HLIST_NODE(&watch->list_node);
0453     INIT_HLIST_NODE(&watch->queue_node);
0454     rcu_assign_pointer(watch->queue, wqueue);
0455 }
0456 
0457 static int add_one_watch(struct watch *watch, struct watch_list *wlist, struct watch_queue *wqueue)
0458 {
0459     const struct cred *cred;
0460     struct watch *w;
0461 
0462     hlist_for_each_entry(w, &wlist->watchers, list_node) {
0463         struct watch_queue *wq = rcu_access_pointer(w->queue);
0464         if (wqueue == wq && watch->id == w->id)
0465             return -EBUSY;
0466     }
0467 
0468     cred = current_cred();
0469     if (atomic_inc_return(&cred->user->nr_watches) > task_rlimit(current, RLIMIT_NOFILE)) {
0470         atomic_dec(&cred->user->nr_watches);
0471         return -EAGAIN;
0472     }
0473 
0474     watch->cred = get_cred(cred);
0475     rcu_assign_pointer(watch->watch_list, wlist);
0476 
0477     kref_get(&wqueue->usage);
0478     kref_get(&watch->usage);
0479     hlist_add_head(&watch->queue_node, &wqueue->watches);
0480     hlist_add_head_rcu(&watch->list_node, &wlist->watchers);
0481     return 0;
0482 }
0483 
0484 /**
0485  * add_watch_to_object - Add a watch on an object to a watch list
0486  * @watch: The watch to add
0487  * @wlist: The watch list to add to
0488  *
0489  * @watch->queue must have been set to point to the queue to post notifications
0490  * to and the watch list of the object to be watched.  @watch->cred must also
0491  * have been set to the appropriate credentials and a ref taken on them.
0492  *
0493  * The caller must pin the queue and the list both and must hold the list
0494  * locked against racing watch additions/removals.
0495  */
0496 int add_watch_to_object(struct watch *watch, struct watch_list *wlist)
0497 {
0498     struct watch_queue *wqueue;
0499     int ret = -ENOENT;
0500 
0501     rcu_read_lock();
0502 
0503     wqueue = rcu_access_pointer(watch->queue);
0504     if (lock_wqueue(wqueue)) {
0505         spin_lock(&wlist->lock);
0506         ret = add_one_watch(watch, wlist, wqueue);
0507         spin_unlock(&wlist->lock);
0508         unlock_wqueue(wqueue);
0509     }
0510 
0511     rcu_read_unlock();
0512     return ret;
0513 }
0514 EXPORT_SYMBOL(add_watch_to_object);
0515 
0516 /**
0517  * remove_watch_from_object - Remove a watch or all watches from an object.
0518  * @wlist: The watch list to remove from
0519  * @wq: The watch queue of interest (ignored if @all is true)
0520  * @id: The ID of the watch to remove (ignored if @all is true)
0521  * @all: True to remove all objects
0522  *
0523  * Remove a specific watch or all watches from an object.  A notification is
0524  * sent to the watcher to tell them that this happened.
0525  */
0526 int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq,
0527                  u64 id, bool all)
0528 {
0529     struct watch_notification_removal n;
0530     struct watch_queue *wqueue;
0531     struct watch *watch;
0532     int ret = -EBADSLT;
0533 
0534     rcu_read_lock();
0535 
0536 again:
0537     spin_lock(&wlist->lock);
0538     hlist_for_each_entry(watch, &wlist->watchers, list_node) {
0539         if (all ||
0540             (watch->id == id && rcu_access_pointer(watch->queue) == wq))
0541             goto found;
0542     }
0543     spin_unlock(&wlist->lock);
0544     goto out;
0545 
0546 found:
0547     ret = 0;
0548     hlist_del_init_rcu(&watch->list_node);
0549     rcu_assign_pointer(watch->watch_list, NULL);
0550     spin_unlock(&wlist->lock);
0551 
0552     /* We now own the reference on watch that used to belong to wlist. */
0553 
0554     n.watch.type = WATCH_TYPE_META;
0555     n.watch.subtype = WATCH_META_REMOVAL_NOTIFICATION;
0556     n.watch.info = watch->info_id | watch_sizeof(n.watch);
0557     n.id = id;
0558     if (id != 0)
0559         n.watch.info = watch->info_id | watch_sizeof(n);
0560 
0561     wqueue = rcu_dereference(watch->queue);
0562 
0563     if (lock_wqueue(wqueue)) {
0564         post_one_notification(wqueue, &n.watch);
0565 
0566         if (!hlist_unhashed(&watch->queue_node)) {
0567             hlist_del_init_rcu(&watch->queue_node);
0568             put_watch(watch);
0569         }
0570 
0571         unlock_wqueue(wqueue);
0572     }
0573 
0574     if (wlist->release_watch) {
0575         void (*release_watch)(struct watch *);
0576 
0577         release_watch = wlist->release_watch;
0578         rcu_read_unlock();
0579         (*release_watch)(watch);
0580         rcu_read_lock();
0581     }
0582     put_watch(watch);
0583 
0584     if (all && !hlist_empty(&wlist->watchers))
0585         goto again;
0586 out:
0587     rcu_read_unlock();
0588     return ret;
0589 }
0590 EXPORT_SYMBOL(remove_watch_from_object);
0591 
0592 /*
0593  * Remove all the watches that are contributory to a queue.  This has the
0594  * potential to race with removal of the watches by the destruction of the
0595  * objects being watched or with the distribution of notifications.
0596  */
0597 void watch_queue_clear(struct watch_queue *wqueue)
0598 {
0599     struct watch_list *wlist;
0600     struct watch *watch;
0601     bool release;
0602 
0603     rcu_read_lock();
0604     spin_lock_bh(&wqueue->lock);
0605 
0606     /* Prevent new notifications from being stored. */
0607     wqueue->defunct = true;
0608 
0609     while (!hlist_empty(&wqueue->watches)) {
0610         watch = hlist_entry(wqueue->watches.first, struct watch, queue_node);
0611         hlist_del_init_rcu(&watch->queue_node);
0612         /* We now own a ref on the watch. */
0613         spin_unlock_bh(&wqueue->lock);
0614 
0615         /* We can't do the next bit under the queue lock as we need to
0616          * get the list lock - which would cause a deadlock if someone
0617          * was removing from the opposite direction at the same time or
0618          * posting a notification.
0619          */
0620         wlist = rcu_dereference(watch->watch_list);
0621         if (wlist) {
0622             void (*release_watch)(struct watch *);
0623 
0624             spin_lock(&wlist->lock);
0625 
0626             release = !hlist_unhashed(&watch->list_node);
0627             if (release) {
0628                 hlist_del_init_rcu(&watch->list_node);
0629                 rcu_assign_pointer(watch->watch_list, NULL);
0630 
0631                 /* We now own a second ref on the watch. */
0632             }
0633 
0634             release_watch = wlist->release_watch;
0635             spin_unlock(&wlist->lock);
0636 
0637             if (release) {
0638                 if (release_watch) {
0639                     rcu_read_unlock();
0640                     /* This might need to call dput(), so
0641                      * we have to drop all the locks.
0642                      */
0643                     (*release_watch)(watch);
0644                     rcu_read_lock();
0645                 }
0646                 put_watch(watch);
0647             }
0648         }
0649 
0650         put_watch(watch);
0651         spin_lock_bh(&wqueue->lock);
0652     }
0653 
0654     spin_unlock_bh(&wqueue->lock);
0655     rcu_read_unlock();
0656 }
0657 
0658 /**
0659  * get_watch_queue - Get a watch queue from its file descriptor.
0660  * @fd: The fd to query.
0661  */
0662 struct watch_queue *get_watch_queue(int fd)
0663 {
0664     struct pipe_inode_info *pipe;
0665     struct watch_queue *wqueue = ERR_PTR(-EINVAL);
0666     struct fd f;
0667 
0668     f = fdget(fd);
0669     if (f.file) {
0670         pipe = get_pipe_info(f.file, false);
0671         if (pipe && pipe->watch_queue) {
0672             wqueue = pipe->watch_queue;
0673             kref_get(&wqueue->usage);
0674         }
0675         fdput(f);
0676     }
0677 
0678     return wqueue;
0679 }
0680 EXPORT_SYMBOL(get_watch_queue);
0681 
0682 /*
0683  * Initialise a watch queue
0684  */
0685 int watch_queue_init(struct pipe_inode_info *pipe)
0686 {
0687     struct watch_queue *wqueue;
0688 
0689     wqueue = kzalloc(sizeof(*wqueue), GFP_KERNEL);
0690     if (!wqueue)
0691         return -ENOMEM;
0692 
0693     wqueue->pipe = pipe;
0694     kref_init(&wqueue->usage);
0695     spin_lock_init(&wqueue->lock);
0696     INIT_HLIST_HEAD(&wqueue->watches);
0697 
0698     pipe->watch_queue = wqueue;
0699     return 0;
0700 }