0001 .. _list_rcu_doc:
0002
0003 Using RCU to Protect Read-Mostly Linked Lists
0004 =============================================
0005
0006 One of the best applications of RCU is to protect read-mostly linked lists
0007 (``struct list_head`` in list.h). One big advantage of this approach
0008 is that all of the required memory barriers are included for you in
0009 the list macros. This document describes several applications of RCU,
0010 with the best fits first.
0011
0012
0013 Example 1: Read-mostly list: Deferred Destruction
0014 -------------------------------------------------
0015
0016 A widely used usecase for RCU lists in the kernel is lockless iteration over
0017 all processes in the system. ``task_struct::tasks`` represents the list node that
0018 links all the processes. The list can be traversed in parallel to any list
0019 additions or removals.
0020
0021 The traversal of the list is done using ``for_each_process()`` which is defined
0022 by the 2 macros::
0023
0024 #define next_task(p) \
0025 list_entry_rcu((p)->tasks.next, struct task_struct, tasks)
0026
0027 #define for_each_process(p) \
0028 for (p = &init_task ; (p = next_task(p)) != &init_task ; )
0029
0030 The code traversing the list of all processes typically looks like::
0031
0032 rcu_read_lock();
0033 for_each_process(p) {
0034 /* Do something with p */
0035 }
0036 rcu_read_unlock();
0037
0038 The simplified code for removing a process from a task list is::
0039
0040 void release_task(struct task_struct *p)
0041 {
0042 write_lock(&tasklist_lock);
0043 list_del_rcu(&p->tasks);
0044 write_unlock(&tasklist_lock);
0045 call_rcu(&p->rcu, delayed_put_task_struct);
0046 }
0047
0048 When a process exits, ``release_task()`` calls ``list_del_rcu(&p->tasks)`` under
0049 ``tasklist_lock`` writer lock protection, to remove the task from the list of
0050 all tasks. The ``tasklist_lock`` prevents concurrent list additions/removals
0051 from corrupting the list. Readers using ``for_each_process()`` are not protected
0052 with the ``tasklist_lock``. To prevent readers from noticing changes in the list
0053 pointers, the ``task_struct`` object is freed only after one or more grace
0054 periods elapse (with the help of call_rcu()). This deferring of destruction
0055 ensures that any readers traversing the list will see valid ``p->tasks.next``
0056 pointers and deletion/freeing can happen in parallel with traversal of the list.
0057 This pattern is also called an **existence lock**, since RCU pins the object in
0058 memory until all existing readers finish.
0059
0060
0061 Example 2: Read-Side Action Taken Outside of Lock: No In-Place Updates
0062 ----------------------------------------------------------------------
0063
0064 The best applications are cases where, if reader-writer locking were
0065 used, the read-side lock would be dropped before taking any action
0066 based on the results of the search. The most celebrated example is
0067 the routing table. Because the routing table is tracking the state of
0068 equipment outside of the computer, it will at times contain stale data.
0069 Therefore, once the route has been computed, there is no need to hold
0070 the routing table static during transmission of the packet. After all,
0071 you can hold the routing table static all you want, but that won't keep
0072 the external Internet from changing, and it is the state of the external
0073 Internet that really matters. In addition, routing entries are typically
0074 added or deleted, rather than being modified in place.
0075
0076 A straightforward example of this use of RCU may be found in the
0077 system-call auditing support. For example, a reader-writer locked
0078 implementation of ``audit_filter_task()`` might be as follows::
0079
0080 static enum audit_state audit_filter_task(struct task_struct *tsk)
0081 {
0082 struct audit_entry *e;
0083 enum audit_state state;
0084
0085 read_lock(&auditsc_lock);
0086 /* Note: audit_filter_mutex held by caller. */
0087 list_for_each_entry(e, &audit_tsklist, list) {
0088 if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
0089 read_unlock(&auditsc_lock);
0090 return state;
0091 }
0092 }
0093 read_unlock(&auditsc_lock);
0094 return AUDIT_BUILD_CONTEXT;
0095 }
0096
0097 Here the list is searched under the lock, but the lock is dropped before
0098 the corresponding value is returned. By the time that this value is acted
0099 on, the list may well have been modified. This makes sense, since if
0100 you are turning auditing off, it is OK to audit a few extra system calls.
0101
0102 This means that RCU can be easily applied to the read side, as follows::
0103
0104 static enum audit_state audit_filter_task(struct task_struct *tsk)
0105 {
0106 struct audit_entry *e;
0107 enum audit_state state;
0108
0109 rcu_read_lock();
0110 /* Note: audit_filter_mutex held by caller. */
0111 list_for_each_entry_rcu(e, &audit_tsklist, list) {
0112 if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
0113 rcu_read_unlock();
0114 return state;
0115 }
0116 }
0117 rcu_read_unlock();
0118 return AUDIT_BUILD_CONTEXT;
0119 }
0120
0121 The ``read_lock()`` and ``read_unlock()`` calls have become rcu_read_lock()
0122 and rcu_read_unlock(), respectively, and the list_for_each_entry() has
0123 become list_for_each_entry_rcu(). The **_rcu()** list-traversal primitives
0124 insert the read-side memory barriers that are required on DEC Alpha CPUs.
0125
0126 The changes to the update side are also straightforward. A reader-writer lock
0127 might be used as follows for deletion and insertion::
0128
0129 static inline int audit_del_rule(struct audit_rule *rule,
0130 struct list_head *list)
0131 {
0132 struct audit_entry *e;
0133
0134 write_lock(&auditsc_lock);
0135 list_for_each_entry(e, list, list) {
0136 if (!audit_compare_rule(rule, &e->rule)) {
0137 list_del(&e->list);
0138 write_unlock(&auditsc_lock);
0139 return 0;
0140 }
0141 }
0142 write_unlock(&auditsc_lock);
0143 return -EFAULT; /* No matching rule */
0144 }
0145
0146 static inline int audit_add_rule(struct audit_entry *entry,
0147 struct list_head *list)
0148 {
0149 write_lock(&auditsc_lock);
0150 if (entry->rule.flags & AUDIT_PREPEND) {
0151 entry->rule.flags &= ~AUDIT_PREPEND;
0152 list_add(&entry->list, list);
0153 } else {
0154 list_add_tail(&entry->list, list);
0155 }
0156 write_unlock(&auditsc_lock);
0157 return 0;
0158 }
0159
0160 Following are the RCU equivalents for these two functions::
0161
0162 static inline int audit_del_rule(struct audit_rule *rule,
0163 struct list_head *list)
0164 {
0165 struct audit_entry *e;
0166
0167 /* No need to use the _rcu iterator here, since this is the only
0168 * deletion routine. */
0169 list_for_each_entry(e, list, list) {
0170 if (!audit_compare_rule(rule, &e->rule)) {
0171 list_del_rcu(&e->list);
0172 call_rcu(&e->rcu, audit_free_rule);
0173 return 0;
0174 }
0175 }
0176 return -EFAULT; /* No matching rule */
0177 }
0178
0179 static inline int audit_add_rule(struct audit_entry *entry,
0180 struct list_head *list)
0181 {
0182 if (entry->rule.flags & AUDIT_PREPEND) {
0183 entry->rule.flags &= ~AUDIT_PREPEND;
0184 list_add_rcu(&entry->list, list);
0185 } else {
0186 list_add_tail_rcu(&entry->list, list);
0187 }
0188 return 0;
0189 }
0190
0191 Normally, the ``write_lock()`` and ``write_unlock()`` would be replaced by a
0192 spin_lock() and a spin_unlock(). But in this case, all callers hold
0193 ``audit_filter_mutex``, so no additional locking is required. The
0194 ``auditsc_lock`` can therefore be eliminated, since use of RCU eliminates the
0195 need for writers to exclude readers.
0196
0197 The list_del(), list_add(), and list_add_tail() primitives have been
0198 replaced by list_del_rcu(), list_add_rcu(), and list_add_tail_rcu().
0199 The **_rcu()** list-manipulation primitives add memory barriers that are needed on
0200 weakly ordered CPUs (most of them!). The list_del_rcu() primitive omits the
0201 pointer poisoning debug-assist code that would otherwise cause concurrent
0202 readers to fail spectacularly.
0203
0204 So, when readers can tolerate stale data and when entries are either added or
0205 deleted, without in-place modification, it is very easy to use RCU!
0206
0207
0208 Example 3: Handling In-Place Updates
0209 ------------------------------------
0210
0211 The system-call auditing code does not update auditing rules in place. However,
0212 if it did, the reader-writer-locked code to do so might look as follows
0213 (assuming only ``field_count`` is updated, otherwise, the added fields would
0214 need to be filled in)::
0215
0216 static inline int audit_upd_rule(struct audit_rule *rule,
0217 struct list_head *list,
0218 __u32 newaction,
0219 __u32 newfield_count)
0220 {
0221 struct audit_entry *e;
0222 struct audit_entry *ne;
0223
0224 write_lock(&auditsc_lock);
0225 /* Note: audit_filter_mutex held by caller. */
0226 list_for_each_entry(e, list, list) {
0227 if (!audit_compare_rule(rule, &e->rule)) {
0228 e->rule.action = newaction;
0229 e->rule.field_count = newfield_count;
0230 write_unlock(&auditsc_lock);
0231 return 0;
0232 }
0233 }
0234 write_unlock(&auditsc_lock);
0235 return -EFAULT; /* No matching rule */
0236 }
0237
0238 The RCU version creates a copy, updates the copy, then replaces the old
0239 entry with the newly updated entry. This sequence of actions, allowing
0240 concurrent reads while making a copy to perform an update, is what gives
0241 RCU (*read-copy update*) its name. The RCU code is as follows::
0242
0243 static inline int audit_upd_rule(struct audit_rule *rule,
0244 struct list_head *list,
0245 __u32 newaction,
0246 __u32 newfield_count)
0247 {
0248 struct audit_entry *e;
0249 struct audit_entry *ne;
0250
0251 list_for_each_entry(e, list, list) {
0252 if (!audit_compare_rule(rule, &e->rule)) {
0253 ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
0254 if (ne == NULL)
0255 return -ENOMEM;
0256 audit_copy_rule(&ne->rule, &e->rule);
0257 ne->rule.action = newaction;
0258 ne->rule.field_count = newfield_count;
0259 list_replace_rcu(&e->list, &ne->list);
0260 call_rcu(&e->rcu, audit_free_rule);
0261 return 0;
0262 }
0263 }
0264 return -EFAULT; /* No matching rule */
0265 }
0266
0267 Again, this assumes that the caller holds ``audit_filter_mutex``. Normally, the
0268 writer lock would become a spinlock in this sort of code.
0269
0270 Another use of this pattern can be found in the openswitch driver's *connection
0271 tracking table* code in ``ct_limit_set()``. The table holds connection tracking
0272 entries and has a limit on the maximum entries. There is one such table
0273 per-zone and hence one *limit* per zone. The zones are mapped to their limits
0274 through a hashtable using an RCU-managed hlist for the hash chains. When a new
0275 limit is set, a new limit object is allocated and ``ct_limit_set()`` is called
0276 to replace the old limit object with the new one using list_replace_rcu().
0277 The old limit object is then freed after a grace period using kfree_rcu().
0278
0279
0280 Example 4: Eliminating Stale Data
0281 ---------------------------------
0282
0283 The auditing example above tolerates stale data, as do most algorithms
0284 that are tracking external state. Because there is a delay from the
0285 time the external state changes before Linux becomes aware of the change,
0286 additional RCU-induced staleness is generally not a problem.
0287
0288 However, there are many examples where stale data cannot be tolerated.
0289 One example in the Linux kernel is the System V IPC (see the shm_lock()
0290 function in ipc/shm.c). This code checks a *deleted* flag under a
0291 per-entry spinlock, and, if the *deleted* flag is set, pretends that the
0292 entry does not exist. For this to be helpful, the search function must
0293 return holding the per-entry spinlock, as shm_lock() does in fact do.
0294
0295 .. _quick_quiz:
0296
0297 Quick Quiz:
0298 For the deleted-flag technique to be helpful, why is it necessary
0299 to hold the per-entry lock while returning from the search function?
0300
0301 :ref:`Answer to Quick Quiz <quick_quiz_answer>`
0302
0303 If the system-call audit module were to ever need to reject stale data, one way
0304 to accomplish this would be to add a ``deleted`` flag and a ``lock`` spinlock to the
0305 audit_entry structure, and modify ``audit_filter_task()`` as follows::
0306
0307 static enum audit_state audit_filter_task(struct task_struct *tsk)
0308 {
0309 struct audit_entry *e;
0310 enum audit_state state;
0311
0312 rcu_read_lock();
0313 list_for_each_entry_rcu(e, &audit_tsklist, list) {
0314 if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
0315 spin_lock(&e->lock);
0316 if (e->deleted) {
0317 spin_unlock(&e->lock);
0318 rcu_read_unlock();
0319 return AUDIT_BUILD_CONTEXT;
0320 }
0321 rcu_read_unlock();
0322 return state;
0323 }
0324 }
0325 rcu_read_unlock();
0326 return AUDIT_BUILD_CONTEXT;
0327 }
0328
0329 Note that this example assumes that entries are only added and deleted.
0330 Additional mechanism is required to deal correctly with the update-in-place
0331 performed by ``audit_upd_rule()``. For one thing, ``audit_upd_rule()`` would
0332 need additional memory barriers to ensure that the list_add_rcu() was really
0333 executed before the list_del_rcu().
0334
0335 The ``audit_del_rule()`` function would need to set the ``deleted`` flag under the
0336 spinlock as follows::
0337
0338 static inline int audit_del_rule(struct audit_rule *rule,
0339 struct list_head *list)
0340 {
0341 struct audit_entry *e;
0342
0343 /* No need to use the _rcu iterator here, since this
0344 * is the only deletion routine. */
0345 list_for_each_entry(e, list, list) {
0346 if (!audit_compare_rule(rule, &e->rule)) {
0347 spin_lock(&e->lock);
0348 list_del_rcu(&e->list);
0349 e->deleted = 1;
0350 spin_unlock(&e->lock);
0351 call_rcu(&e->rcu, audit_free_rule);
0352 return 0;
0353 }
0354 }
0355 return -EFAULT; /* No matching rule */
0356 }
0357
0358 This too assumes that the caller holds ``audit_filter_mutex``.
0359
0360
0361 Example 5: Skipping Stale Objects
0362 ---------------------------------
0363
0364 For some usecases, reader performance can be improved by skipping stale objects
0365 during read-side list traversal if the object in concern is pending destruction
0366 after one or more grace periods. One such example can be found in the timerfd
0367 subsystem. When a ``CLOCK_REALTIME`` clock is reprogrammed - for example due to
0368 setting of the system time, then all programmed timerfds that depend on this
0369 clock get triggered and processes waiting on them to expire are woken up in
0370 advance of their scheduled expiry. To facilitate this, all such timers are added
0371 to an RCU-managed ``cancel_list`` when they are setup in
0372 ``timerfd_setup_cancel()``::
0373
0374 static void timerfd_setup_cancel(struct timerfd_ctx *ctx, int flags)
0375 {
0376 spin_lock(&ctx->cancel_lock);
0377 if ((ctx->clockid == CLOCK_REALTIME &&
0378 (flags & TFD_TIMER_ABSTIME) && (flags & TFD_TIMER_CANCEL_ON_SET)) {
0379 if (!ctx->might_cancel) {
0380 ctx->might_cancel = true;
0381 spin_lock(&cancel_lock);
0382 list_add_rcu(&ctx->clist, &cancel_list);
0383 spin_unlock(&cancel_lock);
0384 }
0385 }
0386 spin_unlock(&ctx->cancel_lock);
0387 }
0388
0389 When a timerfd is freed (fd is closed), then the ``might_cancel`` flag of the
0390 timerfd object is cleared, the object removed from the ``cancel_list`` and
0391 destroyed::
0392
0393 int timerfd_release(struct inode *inode, struct file *file)
0394 {
0395 struct timerfd_ctx *ctx = file->private_data;
0396
0397 spin_lock(&ctx->cancel_lock);
0398 if (ctx->might_cancel) {
0399 ctx->might_cancel = false;
0400 spin_lock(&cancel_lock);
0401 list_del_rcu(&ctx->clist);
0402 spin_unlock(&cancel_lock);
0403 }
0404 spin_unlock(&ctx->cancel_lock);
0405
0406 hrtimer_cancel(&ctx->t.tmr);
0407 kfree_rcu(ctx, rcu);
0408 return 0;
0409 }
0410
0411 If the ``CLOCK_REALTIME`` clock is set, for example by a time server, the
0412 hrtimer framework calls ``timerfd_clock_was_set()`` which walks the
0413 ``cancel_list`` and wakes up processes waiting on the timerfd. While iterating
0414 the ``cancel_list``, the ``might_cancel`` flag is consulted to skip stale
0415 objects::
0416
0417 void timerfd_clock_was_set(void)
0418 {
0419 struct timerfd_ctx *ctx;
0420 unsigned long flags;
0421
0422 rcu_read_lock();
0423 list_for_each_entry_rcu(ctx, &cancel_list, clist) {
0424 if (!ctx->might_cancel)
0425 continue;
0426 spin_lock_irqsave(&ctx->wqh.lock, flags);
0427 if (ctx->moffs != ktime_mono_to_real(0)) {
0428 ctx->moffs = KTIME_MAX;
0429 ctx->ticks++;
0430 wake_up_locked_poll(&ctx->wqh, EPOLLIN);
0431 }
0432 spin_unlock_irqrestore(&ctx->wqh.lock, flags);
0433 }
0434 rcu_read_unlock();
0435 }
0436
0437 The key point here is, because RCU-traversal of the ``cancel_list`` happens
0438 while objects are being added and removed to the list, sometimes the traversal
0439 can step on an object that has been removed from the list. In this example, it
0440 is seen that it is better to skip such objects using a flag.
0441
0442
0443 Summary
0444 -------
0445
0446 Read-mostly list-based data structures that can tolerate stale data are
0447 the most amenable to use of RCU. The simplest case is where entries are
0448 either added or deleted from the data structure (or atomically modified
0449 in place), but non-atomic in-place modifications can be handled by making
0450 a copy, updating the copy, then replacing the original with the copy.
0451 If stale data cannot be tolerated, then a *deleted* flag may be used
0452 in conjunction with a per-entry spinlock in order to allow the search
0453 function to reject newly deleted data.
0454
0455 .. _quick_quiz_answer:
0456
0457 Answer to Quick Quiz:
0458 For the deleted-flag technique to be helpful, why is it necessary
0459 to hold the per-entry lock while returning from the search function?
0460
0461 If the search function drops the per-entry lock before returning,
0462 then the caller will be processing stale data in any case. If it
0463 is really OK to be processing stale data, then you don't need a
0464 *deleted* flag. If processing stale data really is a problem,
0465 then you need to hold the per-entry lock across all of the code
0466 that uses the value that was returned.
0467
0468 :ref:`Back to Quick Quiz <quick_quiz>`