Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * This file is subject to the terms and conditions of the GNU General Public
0003  * License.  See the file "COPYING" in the main directory of this archive
0004  * for more details.
0005  *
0006  * (C) Copyright 2020 Hewlett Packard Enterprise Development LP
0007  * Copyright (c) 2004-2009 Silicon Graphics, Inc.  All Rights Reserved.
0008  */
0009 
0010 /*
0011  * Cross Partition Communication (XPC) support - standard version.
0012  *
0013  *  XPC provides a message passing capability that crosses partition
0014  *  boundaries. This module is made up of two parts:
0015  *
0016  *      partition   This part detects the presence/absence of other
0017  *          partitions. It provides a heartbeat and monitors
0018  *          the heartbeats of other partitions.
0019  *
0020  *      channel This part manages the channels and sends/receives
0021  *          messages across them to/from other partitions.
0022  *
0023  *  There are a couple of additional functions residing in XP, which
0024  *  provide an interface to XPC for its users.
0025  *
0026  *
0027  *  Caveats:
0028  *
0029  *    . Currently on sn2, we have no way to determine which nasid an IRQ
0030  *      came from. Thus, xpc_send_IRQ_sn2() does a remote amo write
0031  *      followed by an IPI. The amo indicates where data is to be pulled
0032  *      from, so after the IPI arrives, the remote partition checks the amo
0033  *      word. The IPI can actually arrive before the amo however, so other
0034  *      code must periodically check for this case. Also, remote amo
0035  *      operations do not reliably time out. Thus we do a remote PIO read
0036  *      solely to know whether the remote partition is down and whether we
0037  *      should stop sending IPIs to it. This remote PIO read operation is
0038  *      set up in a special nofault region so SAL knows to ignore (and
0039  *      cleanup) any errors due to the remote amo write, PIO read, and/or
0040  *      PIO write operations.
0041  *
0042  *      If/when new hardware solves this IPI problem, we should abandon
0043  *      the current approach.
0044  *
0045  */
0046 
0047 #include <linux/module.h>
0048 #include <linux/slab.h>
0049 #include <linux/sysctl.h>
0050 #include <linux/device.h>
0051 #include <linux/delay.h>
0052 #include <linux/reboot.h>
0053 #include <linux/kdebug.h>
0054 #include <linux/kthread.h>
0055 #include "xpc.h"
0056 
0057 #ifdef CONFIG_X86_64
0058 #include <asm/traps.h>
0059 #endif
0060 
0061 /* define two XPC debug device structures to be used with dev_dbg() et al */
0062 
0063 static struct device_driver xpc_dbg_name = {
0064     .name = "xpc"
0065 };
0066 
0067 static struct device xpc_part_dbg_subname = {
0068     .init_name = "",    /* set to "part" at xpc_init() time */
0069     .driver = &xpc_dbg_name
0070 };
0071 
0072 static struct device xpc_chan_dbg_subname = {
0073     .init_name = "",    /* set to "chan" at xpc_init() time */
0074     .driver = &xpc_dbg_name
0075 };
0076 
0077 struct device *xpc_part = &xpc_part_dbg_subname;
0078 struct device *xpc_chan = &xpc_chan_dbg_subname;
0079 
0080 static int xpc_kdebug_ignore;
0081 
0082 /* systune related variables for /proc/sys directories */
0083 
0084 static int xpc_hb_interval = XPC_HB_DEFAULT_INTERVAL;
0085 static int xpc_hb_min_interval = 1;
0086 static int xpc_hb_max_interval = 10;
0087 
0088 static int xpc_hb_check_interval = XPC_HB_CHECK_DEFAULT_INTERVAL;
0089 static int xpc_hb_check_min_interval = 10;
0090 static int xpc_hb_check_max_interval = 120;
0091 
0092 int xpc_disengage_timelimit = XPC_DISENGAGE_DEFAULT_TIMELIMIT;
0093 static int xpc_disengage_min_timelimit; /* = 0 */
0094 static int xpc_disengage_max_timelimit = 120;
0095 
0096 static struct ctl_table xpc_sys_xpc_hb_dir[] = {
0097     {
0098      .procname = "hb_interval",
0099      .data = &xpc_hb_interval,
0100      .maxlen = sizeof(int),
0101      .mode = 0644,
0102      .proc_handler = proc_dointvec_minmax,
0103      .extra1 = &xpc_hb_min_interval,
0104      .extra2 = &xpc_hb_max_interval},
0105     {
0106      .procname = "hb_check_interval",
0107      .data = &xpc_hb_check_interval,
0108      .maxlen = sizeof(int),
0109      .mode = 0644,
0110      .proc_handler = proc_dointvec_minmax,
0111      .extra1 = &xpc_hb_check_min_interval,
0112      .extra2 = &xpc_hb_check_max_interval},
0113     {}
0114 };
0115 static struct ctl_table xpc_sys_xpc_dir[] = {
0116     {
0117      .procname = "hb",
0118      .mode = 0555,
0119      .child = xpc_sys_xpc_hb_dir},
0120     {
0121      .procname = "disengage_timelimit",
0122      .data = &xpc_disengage_timelimit,
0123      .maxlen = sizeof(int),
0124      .mode = 0644,
0125      .proc_handler = proc_dointvec_minmax,
0126      .extra1 = &xpc_disengage_min_timelimit,
0127      .extra2 = &xpc_disengage_max_timelimit},
0128     {}
0129 };
0130 static struct ctl_table xpc_sys_dir[] = {
0131     {
0132      .procname = "xpc",
0133      .mode = 0555,
0134      .child = xpc_sys_xpc_dir},
0135     {}
0136 };
0137 static struct ctl_table_header *xpc_sysctl;
0138 
0139 /* non-zero if any remote partition disengage was timed out */
0140 int xpc_disengage_timedout;
0141 
0142 /* #of activate IRQs received and not yet processed */
0143 int xpc_activate_IRQ_rcvd;
0144 DEFINE_SPINLOCK(xpc_activate_IRQ_rcvd_lock);
0145 
0146 /* IRQ handler notifies this wait queue on receipt of an IRQ */
0147 DECLARE_WAIT_QUEUE_HEAD(xpc_activate_IRQ_wq);
0148 
0149 static unsigned long xpc_hb_check_timeout;
0150 static struct timer_list xpc_hb_timer;
0151 
0152 /* notification that the xpc_hb_checker thread has exited */
0153 static DECLARE_COMPLETION(xpc_hb_checker_exited);
0154 
0155 /* notification that the xpc_discovery thread has exited */
0156 static DECLARE_COMPLETION(xpc_discovery_exited);
0157 
0158 static void xpc_kthread_waitmsgs(struct xpc_partition *, struct xpc_channel *);
0159 
0160 static int xpc_system_reboot(struct notifier_block *, unsigned long, void *);
0161 static struct notifier_block xpc_reboot_notifier = {
0162     .notifier_call = xpc_system_reboot,
0163 };
0164 
0165 static int xpc_system_die(struct notifier_block *, unsigned long, void *);
0166 static struct notifier_block xpc_die_notifier = {
0167     .notifier_call = xpc_system_die,
0168 };
0169 
0170 struct xpc_arch_operations xpc_arch_ops;
0171 
0172 /*
0173  * Timer function to enforce the timelimit on the partition disengage.
0174  */
0175 static void
0176 xpc_timeout_partition_disengage(struct timer_list *t)
0177 {
0178     struct xpc_partition *part = from_timer(part, t, disengage_timer);
0179 
0180     DBUG_ON(time_is_after_jiffies(part->disengage_timeout));
0181 
0182     xpc_partition_disengaged_from_timer(part);
0183 
0184     DBUG_ON(part->disengage_timeout != 0);
0185     DBUG_ON(xpc_arch_ops.partition_engaged(XPC_PARTID(part)));
0186 }
0187 
0188 /*
0189  * Timer to produce the heartbeat.  The timer structures function is
0190  * already set when this is initially called.  A tunable is used to
0191  * specify when the next timeout should occur.
0192  */
0193 static void
0194 xpc_hb_beater(struct timer_list *unused)
0195 {
0196     xpc_arch_ops.increment_heartbeat();
0197 
0198     if (time_is_before_eq_jiffies(xpc_hb_check_timeout))
0199         wake_up_interruptible(&xpc_activate_IRQ_wq);
0200 
0201     xpc_hb_timer.expires = jiffies + (xpc_hb_interval * HZ);
0202     add_timer(&xpc_hb_timer);
0203 }
0204 
0205 static void
0206 xpc_start_hb_beater(void)
0207 {
0208     xpc_arch_ops.heartbeat_init();
0209     timer_setup(&xpc_hb_timer, xpc_hb_beater, 0);
0210     xpc_hb_beater(NULL);
0211 }
0212 
0213 static void
0214 xpc_stop_hb_beater(void)
0215 {
0216     del_timer_sync(&xpc_hb_timer);
0217     xpc_arch_ops.heartbeat_exit();
0218 }
0219 
0220 /*
0221  * At periodic intervals, scan through all active partitions and ensure
0222  * their heartbeat is still active.  If not, the partition is deactivated.
0223  */
0224 static void
0225 xpc_check_remote_hb(void)
0226 {
0227     struct xpc_partition *part;
0228     short partid;
0229     enum xp_retval ret;
0230 
0231     for (partid = 0; partid < xp_max_npartitions; partid++) {
0232 
0233         if (xpc_exiting)
0234             break;
0235 
0236         if (partid == xp_partition_id)
0237             continue;
0238 
0239         part = &xpc_partitions[partid];
0240 
0241         if (part->act_state == XPC_P_AS_INACTIVE ||
0242             part->act_state == XPC_P_AS_DEACTIVATING) {
0243             continue;
0244         }
0245 
0246         ret = xpc_arch_ops.get_remote_heartbeat(part);
0247         if (ret != xpSuccess)
0248             XPC_DEACTIVATE_PARTITION(part, ret);
0249     }
0250 }
0251 
0252 /*
0253  * This thread is responsible for nearly all of the partition
0254  * activation/deactivation.
0255  */
0256 static int
0257 xpc_hb_checker(void *ignore)
0258 {
0259     int force_IRQ = 0;
0260 
0261     /* this thread was marked active by xpc_hb_init() */
0262 
0263     set_cpus_allowed_ptr(current, cpumask_of(XPC_HB_CHECK_CPU));
0264 
0265     /* set our heartbeating to other partitions into motion */
0266     xpc_hb_check_timeout = jiffies + (xpc_hb_check_interval * HZ);
0267     xpc_start_hb_beater();
0268 
0269     while (!xpc_exiting) {
0270 
0271         dev_dbg(xpc_part, "woke up with %d ticks rem; %d IRQs have "
0272             "been received\n",
0273             (int)(xpc_hb_check_timeout - jiffies),
0274             xpc_activate_IRQ_rcvd);
0275 
0276         /* checking of remote heartbeats is skewed by IRQ handling */
0277         if (time_is_before_eq_jiffies(xpc_hb_check_timeout)) {
0278             xpc_hb_check_timeout = jiffies +
0279                 (xpc_hb_check_interval * HZ);
0280 
0281             dev_dbg(xpc_part, "checking remote heartbeats\n");
0282             xpc_check_remote_hb();
0283         }
0284 
0285         /* check for outstanding IRQs */
0286         if (xpc_activate_IRQ_rcvd > 0 || force_IRQ != 0) {
0287             force_IRQ = 0;
0288             dev_dbg(xpc_part, "processing activate IRQs "
0289                 "received\n");
0290             xpc_arch_ops.process_activate_IRQ_rcvd();
0291         }
0292 
0293         /* wait for IRQ or timeout */
0294         (void)wait_event_interruptible(xpc_activate_IRQ_wq,
0295                            (time_is_before_eq_jiffies(
0296                         xpc_hb_check_timeout) ||
0297                         xpc_activate_IRQ_rcvd > 0 ||
0298                         xpc_exiting));
0299     }
0300 
0301     xpc_stop_hb_beater();
0302 
0303     dev_dbg(xpc_part, "heartbeat checker is exiting\n");
0304 
0305     /* mark this thread as having exited */
0306     complete(&xpc_hb_checker_exited);
0307     return 0;
0308 }
0309 
0310 /*
0311  * This thread will attempt to discover other partitions to activate
0312  * based on info provided by SAL. This new thread is short lived and
0313  * will exit once discovery is complete.
0314  */
0315 static int
0316 xpc_initiate_discovery(void *ignore)
0317 {
0318     xpc_discovery();
0319 
0320     dev_dbg(xpc_part, "discovery thread is exiting\n");
0321 
0322     /* mark this thread as having exited */
0323     complete(&xpc_discovery_exited);
0324     return 0;
0325 }
0326 
0327 /*
0328  * The first kthread assigned to a newly activated partition is the one
0329  * created by XPC HB with which it calls xpc_activating(). XPC hangs on to
0330  * that kthread until the partition is brought down, at which time that kthread
0331  * returns back to XPC HB. (The return of that kthread will signify to XPC HB
0332  * that XPC has dismantled all communication infrastructure for the associated
0333  * partition.) This kthread becomes the channel manager for that partition.
0334  *
0335  * Each active partition has a channel manager, who, besides connecting and
0336  * disconnecting channels, will ensure that each of the partition's connected
0337  * channels has the required number of assigned kthreads to get the work done.
0338  */
0339 static void
0340 xpc_channel_mgr(struct xpc_partition *part)
0341 {
0342     while (part->act_state != XPC_P_AS_DEACTIVATING ||
0343            atomic_read(&part->nchannels_active) > 0 ||
0344            !xpc_partition_disengaged(part)) {
0345 
0346         xpc_process_sent_chctl_flags(part);
0347 
0348         /*
0349          * Wait until we've been requested to activate kthreads or
0350          * all of the channel's message queues have been torn down or
0351          * a signal is pending.
0352          *
0353          * The channel_mgr_requests is set to 1 after being awakened,
0354          * This is done to prevent the channel mgr from making one pass
0355          * through the loop for each request, since he will
0356          * be servicing all the requests in one pass. The reason it's
0357          * set to 1 instead of 0 is so that other kthreads will know
0358          * that the channel mgr is running and won't bother trying to
0359          * wake him up.
0360          */
0361         atomic_dec(&part->channel_mgr_requests);
0362         (void)wait_event_interruptible(part->channel_mgr_wq,
0363                 (atomic_read(&part->channel_mgr_requests) > 0 ||
0364                  part->chctl.all_flags != 0 ||
0365                  (part->act_state == XPC_P_AS_DEACTIVATING &&
0366                  atomic_read(&part->nchannels_active) == 0 &&
0367                  xpc_partition_disengaged(part))));
0368         atomic_set(&part->channel_mgr_requests, 1);
0369     }
0370 }
0371 
0372 /*
0373  * Guarantee that the kzalloc'd memory is cacheline aligned.
0374  */
0375 void *
0376 xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
0377 {
0378     /* see if kzalloc will give us cachline aligned memory by default */
0379     *base = kzalloc(size, flags);
0380     if (*base == NULL)
0381         return NULL;
0382 
0383     if ((u64)*base == L1_CACHE_ALIGN((u64)*base))
0384         return *base;
0385 
0386     kfree(*base);
0387 
0388     /* nope, we'll have to do it ourselves */
0389     *base = kzalloc(size + L1_CACHE_BYTES, flags);
0390     if (*base == NULL)
0391         return NULL;
0392 
0393     return (void *)L1_CACHE_ALIGN((u64)*base);
0394 }
0395 
0396 /*
0397  * Setup the channel structures necessary to support XPartition Communication
0398  * between the specified remote partition and the local one.
0399  */
0400 static enum xp_retval
0401 xpc_setup_ch_structures(struct xpc_partition *part)
0402 {
0403     enum xp_retval ret;
0404     int ch_number;
0405     struct xpc_channel *ch;
0406     short partid = XPC_PARTID(part);
0407 
0408     /*
0409      * Allocate all of the channel structures as a contiguous chunk of
0410      * memory.
0411      */
0412     DBUG_ON(part->channels != NULL);
0413     part->channels = kcalloc(XPC_MAX_NCHANNELS,
0414                  sizeof(struct xpc_channel),
0415                  GFP_KERNEL);
0416     if (part->channels == NULL) {
0417         dev_err(xpc_chan, "can't get memory for channels\n");
0418         return xpNoMemory;
0419     }
0420 
0421     /* allocate the remote open and close args */
0422 
0423     part->remote_openclose_args =
0424         xpc_kzalloc_cacheline_aligned(XPC_OPENCLOSE_ARGS_SIZE,
0425                       GFP_KERNEL, &part->
0426                       remote_openclose_args_base);
0427     if (part->remote_openclose_args == NULL) {
0428         dev_err(xpc_chan, "can't get memory for remote connect args\n");
0429         ret = xpNoMemory;
0430         goto out_1;
0431     }
0432 
0433     part->chctl.all_flags = 0;
0434     spin_lock_init(&part->chctl_lock);
0435 
0436     atomic_set(&part->channel_mgr_requests, 1);
0437     init_waitqueue_head(&part->channel_mgr_wq);
0438 
0439     part->nchannels = XPC_MAX_NCHANNELS;
0440 
0441     atomic_set(&part->nchannels_active, 0);
0442     atomic_set(&part->nchannels_engaged, 0);
0443 
0444     for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
0445         ch = &part->channels[ch_number];
0446 
0447         ch->partid = partid;
0448         ch->number = ch_number;
0449         ch->flags = XPC_C_DISCONNECTED;
0450 
0451         atomic_set(&ch->kthreads_assigned, 0);
0452         atomic_set(&ch->kthreads_idle, 0);
0453         atomic_set(&ch->kthreads_active, 0);
0454 
0455         atomic_set(&ch->references, 0);
0456         atomic_set(&ch->n_to_notify, 0);
0457 
0458         spin_lock_init(&ch->lock);
0459         init_completion(&ch->wdisconnect_wait);
0460 
0461         atomic_set(&ch->n_on_msg_allocate_wq, 0);
0462         init_waitqueue_head(&ch->msg_allocate_wq);
0463         init_waitqueue_head(&ch->idle_wq);
0464     }
0465 
0466     ret = xpc_arch_ops.setup_ch_structures(part);
0467     if (ret != xpSuccess)
0468         goto out_2;
0469 
0470     /*
0471      * With the setting of the partition setup_state to XPC_P_SS_SETUP,
0472      * we're declaring that this partition is ready to go.
0473      */
0474     part->setup_state = XPC_P_SS_SETUP;
0475 
0476     return xpSuccess;
0477 
0478     /* setup of ch structures failed */
0479 out_2:
0480     kfree(part->remote_openclose_args_base);
0481     part->remote_openclose_args = NULL;
0482 out_1:
0483     kfree(part->channels);
0484     part->channels = NULL;
0485     return ret;
0486 }
0487 
0488 /*
0489  * Teardown the channel structures necessary to support XPartition Communication
0490  * between the specified remote partition and the local one.
0491  */
0492 static void
0493 xpc_teardown_ch_structures(struct xpc_partition *part)
0494 {
0495     DBUG_ON(atomic_read(&part->nchannels_engaged) != 0);
0496     DBUG_ON(atomic_read(&part->nchannels_active) != 0);
0497 
0498     /*
0499      * Make this partition inaccessible to local processes by marking it
0500      * as no longer setup. Then wait before proceeding with the teardown
0501      * until all existing references cease.
0502      */
0503     DBUG_ON(part->setup_state != XPC_P_SS_SETUP);
0504     part->setup_state = XPC_P_SS_WTEARDOWN;
0505 
0506     wait_event(part->teardown_wq, (atomic_read(&part->references) == 0));
0507 
0508     /* now we can begin tearing down the infrastructure */
0509 
0510     xpc_arch_ops.teardown_ch_structures(part);
0511 
0512     kfree(part->remote_openclose_args_base);
0513     part->remote_openclose_args = NULL;
0514     kfree(part->channels);
0515     part->channels = NULL;
0516 
0517     part->setup_state = XPC_P_SS_TORNDOWN;
0518 }
0519 
0520 /*
0521  * When XPC HB determines that a partition has come up, it will create a new
0522  * kthread and that kthread will call this function to attempt to set up the
0523  * basic infrastructure used for Cross Partition Communication with the newly
0524  * upped partition.
0525  *
0526  * The kthread that was created by XPC HB and which setup the XPC
0527  * infrastructure will remain assigned to the partition becoming the channel
0528  * manager for that partition until the partition is deactivating, at which
0529  * time the kthread will teardown the XPC infrastructure and then exit.
0530  */
0531 static int
0532 xpc_activating(void *__partid)
0533 {
0534     short partid = (u64)__partid;
0535     struct xpc_partition *part = &xpc_partitions[partid];
0536     unsigned long irq_flags;
0537 
0538     DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
0539 
0540     spin_lock_irqsave(&part->act_lock, irq_flags);
0541 
0542     if (part->act_state == XPC_P_AS_DEACTIVATING) {
0543         part->act_state = XPC_P_AS_INACTIVE;
0544         spin_unlock_irqrestore(&part->act_lock, irq_flags);
0545         part->remote_rp_pa = 0;
0546         return 0;
0547     }
0548 
0549     /* indicate the thread is activating */
0550     DBUG_ON(part->act_state != XPC_P_AS_ACTIVATION_REQ);
0551     part->act_state = XPC_P_AS_ACTIVATING;
0552 
0553     XPC_SET_REASON(part, 0, 0);
0554     spin_unlock_irqrestore(&part->act_lock, irq_flags);
0555 
0556     dev_dbg(xpc_part, "activating partition %d\n", partid);
0557 
0558     xpc_arch_ops.allow_hb(partid);
0559 
0560     if (xpc_setup_ch_structures(part) == xpSuccess) {
0561         (void)xpc_part_ref(part);   /* this will always succeed */
0562 
0563         if (xpc_arch_ops.make_first_contact(part) == xpSuccess) {
0564             xpc_mark_partition_active(part);
0565             xpc_channel_mgr(part);
0566             /* won't return until partition is deactivating */
0567         }
0568 
0569         xpc_part_deref(part);
0570         xpc_teardown_ch_structures(part);
0571     }
0572 
0573     xpc_arch_ops.disallow_hb(partid);
0574     xpc_mark_partition_inactive(part);
0575 
0576     if (part->reason == xpReactivating) {
0577         /* interrupting ourselves results in activating partition */
0578         xpc_arch_ops.request_partition_reactivation(part);
0579     }
0580 
0581     return 0;
0582 }
0583 
0584 void
0585 xpc_activate_partition(struct xpc_partition *part)
0586 {
0587     short partid = XPC_PARTID(part);
0588     unsigned long irq_flags;
0589     struct task_struct *kthread;
0590 
0591     spin_lock_irqsave(&part->act_lock, irq_flags);
0592 
0593     DBUG_ON(part->act_state != XPC_P_AS_INACTIVE);
0594 
0595     part->act_state = XPC_P_AS_ACTIVATION_REQ;
0596     XPC_SET_REASON(part, xpCloneKThread, __LINE__);
0597 
0598     spin_unlock_irqrestore(&part->act_lock, irq_flags);
0599 
0600     kthread = kthread_run(xpc_activating, (void *)((u64)partid), "xpc%02d",
0601                   partid);
0602     if (IS_ERR(kthread)) {
0603         spin_lock_irqsave(&part->act_lock, irq_flags);
0604         part->act_state = XPC_P_AS_INACTIVE;
0605         XPC_SET_REASON(part, xpCloneKThreadFailed, __LINE__);
0606         spin_unlock_irqrestore(&part->act_lock, irq_flags);
0607     }
0608 }
0609 
0610 void
0611 xpc_activate_kthreads(struct xpc_channel *ch, int needed)
0612 {
0613     int idle = atomic_read(&ch->kthreads_idle);
0614     int assigned = atomic_read(&ch->kthreads_assigned);
0615     int wakeup;
0616 
0617     DBUG_ON(needed <= 0);
0618 
0619     if (idle > 0) {
0620         wakeup = (needed > idle) ? idle : needed;
0621         needed -= wakeup;
0622 
0623         dev_dbg(xpc_chan, "wakeup %d idle kthreads, partid=%d, "
0624             "channel=%d\n", wakeup, ch->partid, ch->number);
0625 
0626         /* only wakeup the requested number of kthreads */
0627         wake_up_nr(&ch->idle_wq, wakeup);
0628     }
0629 
0630     if (needed <= 0)
0631         return;
0632 
0633     if (needed + assigned > ch->kthreads_assigned_limit) {
0634         needed = ch->kthreads_assigned_limit - assigned;
0635         if (needed <= 0)
0636             return;
0637     }
0638 
0639     dev_dbg(xpc_chan, "create %d new kthreads, partid=%d, channel=%d\n",
0640         needed, ch->partid, ch->number);
0641 
0642     xpc_create_kthreads(ch, needed, 0);
0643 }
0644 
0645 /*
0646  * This function is where XPC's kthreads wait for messages to deliver.
0647  */
0648 static void
0649 xpc_kthread_waitmsgs(struct xpc_partition *part, struct xpc_channel *ch)
0650 {
0651     int (*n_of_deliverable_payloads) (struct xpc_channel *) =
0652         xpc_arch_ops.n_of_deliverable_payloads;
0653 
0654     do {
0655         /* deliver messages to their intended recipients */
0656 
0657         while (n_of_deliverable_payloads(ch) > 0 &&
0658                !(ch->flags & XPC_C_DISCONNECTING)) {
0659             xpc_deliver_payload(ch);
0660         }
0661 
0662         if (atomic_inc_return(&ch->kthreads_idle) >
0663             ch->kthreads_idle_limit) {
0664             /* too many idle kthreads on this channel */
0665             atomic_dec(&ch->kthreads_idle);
0666             break;
0667         }
0668 
0669         dev_dbg(xpc_chan, "idle kthread calling "
0670             "wait_event_interruptible_exclusive()\n");
0671 
0672         (void)wait_event_interruptible_exclusive(ch->idle_wq,
0673                 (n_of_deliverable_payloads(ch) > 0 ||
0674                  (ch->flags & XPC_C_DISCONNECTING)));
0675 
0676         atomic_dec(&ch->kthreads_idle);
0677 
0678     } while (!(ch->flags & XPC_C_DISCONNECTING));
0679 }
0680 
0681 static int
0682 xpc_kthread_start(void *args)
0683 {
0684     short partid = XPC_UNPACK_ARG1(args);
0685     u16 ch_number = XPC_UNPACK_ARG2(args);
0686     struct xpc_partition *part = &xpc_partitions[partid];
0687     struct xpc_channel *ch;
0688     int n_needed;
0689     unsigned long irq_flags;
0690     int (*n_of_deliverable_payloads) (struct xpc_channel *) =
0691         xpc_arch_ops.n_of_deliverable_payloads;
0692 
0693     dev_dbg(xpc_chan, "kthread starting, partid=%d, channel=%d\n",
0694         partid, ch_number);
0695 
0696     ch = &part->channels[ch_number];
0697 
0698     if (!(ch->flags & XPC_C_DISCONNECTING)) {
0699 
0700         /* let registerer know that connection has been established */
0701 
0702         spin_lock_irqsave(&ch->lock, irq_flags);
0703         if (!(ch->flags & XPC_C_CONNECTEDCALLOUT)) {
0704             ch->flags |= XPC_C_CONNECTEDCALLOUT;
0705             spin_unlock_irqrestore(&ch->lock, irq_flags);
0706 
0707             xpc_connected_callout(ch);
0708 
0709             spin_lock_irqsave(&ch->lock, irq_flags);
0710             ch->flags |= XPC_C_CONNECTEDCALLOUT_MADE;
0711             spin_unlock_irqrestore(&ch->lock, irq_flags);
0712 
0713             /*
0714              * It is possible that while the callout was being
0715              * made that the remote partition sent some messages.
0716              * If that is the case, we may need to activate
0717              * additional kthreads to help deliver them. We only
0718              * need one less than total #of messages to deliver.
0719              */
0720             n_needed = n_of_deliverable_payloads(ch) - 1;
0721             if (n_needed > 0 && !(ch->flags & XPC_C_DISCONNECTING))
0722                 xpc_activate_kthreads(ch, n_needed);
0723 
0724         } else {
0725             spin_unlock_irqrestore(&ch->lock, irq_flags);
0726         }
0727 
0728         xpc_kthread_waitmsgs(part, ch);
0729     }
0730 
0731     /* let registerer know that connection is disconnecting */
0732 
0733     spin_lock_irqsave(&ch->lock, irq_flags);
0734     if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
0735         !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
0736         ch->flags |= XPC_C_DISCONNECTINGCALLOUT;
0737         spin_unlock_irqrestore(&ch->lock, irq_flags);
0738 
0739         xpc_disconnect_callout(ch, xpDisconnecting);
0740 
0741         spin_lock_irqsave(&ch->lock, irq_flags);
0742         ch->flags |= XPC_C_DISCONNECTINGCALLOUT_MADE;
0743     }
0744     spin_unlock_irqrestore(&ch->lock, irq_flags);
0745 
0746     if (atomic_dec_return(&ch->kthreads_assigned) == 0 &&
0747         atomic_dec_return(&part->nchannels_engaged) == 0) {
0748         xpc_arch_ops.indicate_partition_disengaged(part);
0749     }
0750 
0751     xpc_msgqueue_deref(ch);
0752 
0753     dev_dbg(xpc_chan, "kthread exiting, partid=%d, channel=%d\n",
0754         partid, ch_number);
0755 
0756     xpc_part_deref(part);
0757     return 0;
0758 }
0759 
0760 /*
0761  * For each partition that XPC has established communications with, there is
0762  * a minimum of one kernel thread assigned to perform any operation that
0763  * may potentially sleep or block (basically the callouts to the asynchronous
0764  * functions registered via xpc_connect()).
0765  *
0766  * Additional kthreads are created and destroyed by XPC as the workload
0767  * demands.
0768  *
0769  * A kthread is assigned to one of the active channels that exists for a given
0770  * partition.
0771  */
0772 void
0773 xpc_create_kthreads(struct xpc_channel *ch, int needed,
0774             int ignore_disconnecting)
0775 {
0776     unsigned long irq_flags;
0777     u64 args = XPC_PACK_ARGS(ch->partid, ch->number);
0778     struct xpc_partition *part = &xpc_partitions[ch->partid];
0779     struct task_struct *kthread;
0780     void (*indicate_partition_disengaged) (struct xpc_partition *) =
0781         xpc_arch_ops.indicate_partition_disengaged;
0782 
0783     while (needed-- > 0) {
0784 
0785         /*
0786          * The following is done on behalf of the newly created
0787          * kthread. That kthread is responsible for doing the
0788          * counterpart to the following before it exits.
0789          */
0790         if (ignore_disconnecting) {
0791             if (!atomic_inc_not_zero(&ch->kthreads_assigned)) {
0792                 /* kthreads assigned had gone to zero */
0793                 BUG_ON(!(ch->flags &
0794                      XPC_C_DISCONNECTINGCALLOUT_MADE));
0795                 break;
0796             }
0797 
0798         } else if (ch->flags & XPC_C_DISCONNECTING) {
0799             break;
0800 
0801         } else if (atomic_inc_return(&ch->kthreads_assigned) == 1 &&
0802                atomic_inc_return(&part->nchannels_engaged) == 1) {
0803             xpc_arch_ops.indicate_partition_engaged(part);
0804         }
0805         (void)xpc_part_ref(part);
0806         xpc_msgqueue_ref(ch);
0807 
0808         kthread = kthread_run(xpc_kthread_start, (void *)args,
0809                       "xpc%02dc%d", ch->partid, ch->number);
0810         if (IS_ERR(kthread)) {
0811             /* the fork failed */
0812 
0813             /*
0814              * NOTE: if (ignore_disconnecting &&
0815              * !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) is true,
0816              * then we'll deadlock if all other kthreads assigned
0817              * to this channel are blocked in the channel's
0818              * registerer, because the only thing that will unblock
0819              * them is the xpDisconnecting callout that this
0820              * failed kthread_run() would have made.
0821              */
0822 
0823             if (atomic_dec_return(&ch->kthreads_assigned) == 0 &&
0824                 atomic_dec_return(&part->nchannels_engaged) == 0) {
0825                 indicate_partition_disengaged(part);
0826             }
0827             xpc_msgqueue_deref(ch);
0828             xpc_part_deref(part);
0829 
0830             if (atomic_read(&ch->kthreads_assigned) <
0831                 ch->kthreads_idle_limit) {
0832                 /*
0833                  * Flag this as an error only if we have an
0834                  * insufficient #of kthreads for the channel
0835                  * to function.
0836                  */
0837                 spin_lock_irqsave(&ch->lock, irq_flags);
0838                 XPC_DISCONNECT_CHANNEL(ch, xpLackOfResources,
0839                                &irq_flags);
0840                 spin_unlock_irqrestore(&ch->lock, irq_flags);
0841             }
0842             break;
0843         }
0844     }
0845 }
0846 
0847 void
0848 xpc_disconnect_wait(int ch_number)
0849 {
0850     unsigned long irq_flags;
0851     short partid;
0852     struct xpc_partition *part;
0853     struct xpc_channel *ch;
0854     int wakeup_channel_mgr;
0855 
0856     /* now wait for all callouts to the caller's function to cease */
0857     for (partid = 0; partid < xp_max_npartitions; partid++) {
0858         part = &xpc_partitions[partid];
0859 
0860         if (!xpc_part_ref(part))
0861             continue;
0862 
0863         ch = &part->channels[ch_number];
0864 
0865         if (!(ch->flags & XPC_C_WDISCONNECT)) {
0866             xpc_part_deref(part);
0867             continue;
0868         }
0869 
0870         wait_for_completion(&ch->wdisconnect_wait);
0871 
0872         spin_lock_irqsave(&ch->lock, irq_flags);
0873         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
0874         wakeup_channel_mgr = 0;
0875 
0876         if (ch->delayed_chctl_flags) {
0877             if (part->act_state != XPC_P_AS_DEACTIVATING) {
0878                 spin_lock(&part->chctl_lock);
0879                 part->chctl.flags[ch->number] |=
0880                     ch->delayed_chctl_flags;
0881                 spin_unlock(&part->chctl_lock);
0882                 wakeup_channel_mgr = 1;
0883             }
0884             ch->delayed_chctl_flags = 0;
0885         }
0886 
0887         ch->flags &= ~XPC_C_WDISCONNECT;
0888         spin_unlock_irqrestore(&ch->lock, irq_flags);
0889 
0890         if (wakeup_channel_mgr)
0891             xpc_wakeup_channel_mgr(part);
0892 
0893         xpc_part_deref(part);
0894     }
0895 }
0896 
0897 static int
0898 xpc_setup_partitions(void)
0899 {
0900     short partid;
0901     struct xpc_partition *part;
0902 
0903     xpc_partitions = kcalloc(xp_max_npartitions,
0904                  sizeof(struct xpc_partition),
0905                  GFP_KERNEL);
0906     if (xpc_partitions == NULL) {
0907         dev_err(xpc_part, "can't get memory for partition structure\n");
0908         return -ENOMEM;
0909     }
0910 
0911     /*
0912      * The first few fields of each entry of xpc_partitions[] need to
0913      * be initialized now so that calls to xpc_connect() and
0914      * xpc_disconnect() can be made prior to the activation of any remote
0915      * partition. NOTE THAT NONE OF THE OTHER FIELDS BELONGING TO THESE
0916      * ENTRIES ARE MEANINGFUL UNTIL AFTER AN ENTRY'S CORRESPONDING
0917      * PARTITION HAS BEEN ACTIVATED.
0918      */
0919     for (partid = 0; partid < xp_max_npartitions; partid++) {
0920         part = &xpc_partitions[partid];
0921 
0922         DBUG_ON((u64)part != L1_CACHE_ALIGN((u64)part));
0923 
0924         part->activate_IRQ_rcvd = 0;
0925         spin_lock_init(&part->act_lock);
0926         part->act_state = XPC_P_AS_INACTIVE;
0927         XPC_SET_REASON(part, 0, 0);
0928 
0929         timer_setup(&part->disengage_timer,
0930                 xpc_timeout_partition_disengage, 0);
0931 
0932         part->setup_state = XPC_P_SS_UNSET;
0933         init_waitqueue_head(&part->teardown_wq);
0934         atomic_set(&part->references, 0);
0935     }
0936 
0937     return xpc_arch_ops.setup_partitions();
0938 }
0939 
0940 static void
0941 xpc_teardown_partitions(void)
0942 {
0943     xpc_arch_ops.teardown_partitions();
0944     kfree(xpc_partitions);
0945 }
0946 
0947 static void
0948 xpc_do_exit(enum xp_retval reason)
0949 {
0950     short partid;
0951     int active_part_count, printed_waiting_msg = 0;
0952     struct xpc_partition *part;
0953     unsigned long printmsg_time, disengage_timeout = 0;
0954 
0955     /* a 'rmmod XPC' and a 'reboot' cannot both end up here together */
0956     DBUG_ON(xpc_exiting == 1);
0957 
0958     /*
0959      * Let the heartbeat checker thread and the discovery thread
0960      * (if one is running) know that they should exit. Also wake up
0961      * the heartbeat checker thread in case it's sleeping.
0962      */
0963     xpc_exiting = 1;
0964     wake_up_interruptible(&xpc_activate_IRQ_wq);
0965 
0966     /* wait for the discovery thread to exit */
0967     wait_for_completion(&xpc_discovery_exited);
0968 
0969     /* wait for the heartbeat checker thread to exit */
0970     wait_for_completion(&xpc_hb_checker_exited);
0971 
0972     /* sleep for a 1/3 of a second or so */
0973     (void)msleep_interruptible(300);
0974 
0975     /* wait for all partitions to become inactive */
0976 
0977     printmsg_time = jiffies + (XPC_DEACTIVATE_PRINTMSG_INTERVAL * HZ);
0978     xpc_disengage_timedout = 0;
0979 
0980     do {
0981         active_part_count = 0;
0982 
0983         for (partid = 0; partid < xp_max_npartitions; partid++) {
0984             part = &xpc_partitions[partid];
0985 
0986             if (xpc_partition_disengaged(part) &&
0987                 part->act_state == XPC_P_AS_INACTIVE) {
0988                 continue;
0989             }
0990 
0991             active_part_count++;
0992 
0993             XPC_DEACTIVATE_PARTITION(part, reason);
0994 
0995             if (part->disengage_timeout > disengage_timeout)
0996                 disengage_timeout = part->disengage_timeout;
0997         }
0998 
0999         if (xpc_arch_ops.any_partition_engaged()) {
1000             if (time_is_before_jiffies(printmsg_time)) {
1001                 dev_info(xpc_part, "waiting for remote "
1002                      "partitions to deactivate, timeout in "
1003                      "%ld seconds\n", (disengage_timeout -
1004                      jiffies) / HZ);
1005                 printmsg_time = jiffies +
1006                     (XPC_DEACTIVATE_PRINTMSG_INTERVAL * HZ);
1007                 printed_waiting_msg = 1;
1008             }
1009 
1010         } else if (active_part_count > 0) {
1011             if (printed_waiting_msg) {
1012                 dev_info(xpc_part, "waiting for local partition"
1013                      " to deactivate\n");
1014                 printed_waiting_msg = 0;
1015             }
1016 
1017         } else {
1018             if (!xpc_disengage_timedout) {
1019                 dev_info(xpc_part, "all partitions have "
1020                      "deactivated\n");
1021             }
1022             break;
1023         }
1024 
1025         /* sleep for a 1/3 of a second or so */
1026         (void)msleep_interruptible(300);
1027 
1028     } while (1);
1029 
1030     DBUG_ON(xpc_arch_ops.any_partition_engaged());
1031 
1032     xpc_teardown_rsvd_page();
1033 
1034     if (reason == xpUnloading) {
1035         (void)unregister_die_notifier(&xpc_die_notifier);
1036         (void)unregister_reboot_notifier(&xpc_reboot_notifier);
1037     }
1038 
1039     /* clear the interface to XPC's functions */
1040     xpc_clear_interface();
1041 
1042     if (xpc_sysctl)
1043         unregister_sysctl_table(xpc_sysctl);
1044 
1045     xpc_teardown_partitions();
1046 
1047     if (is_uv_system())
1048         xpc_exit_uv();
1049 }
1050 
1051 /*
1052  * This function is called when the system is being rebooted.
1053  */
1054 static int
1055 xpc_system_reboot(struct notifier_block *nb, unsigned long event, void *unused)
1056 {
1057     enum xp_retval reason;
1058 
1059     switch (event) {
1060     case SYS_RESTART:
1061         reason = xpSystemReboot;
1062         break;
1063     case SYS_HALT:
1064         reason = xpSystemHalt;
1065         break;
1066     case SYS_POWER_OFF:
1067         reason = xpSystemPoweroff;
1068         break;
1069     default:
1070         reason = xpSystemGoingDown;
1071     }
1072 
1073     xpc_do_exit(reason);
1074     return NOTIFY_DONE;
1075 }
1076 
1077 /* Used to only allow one cpu to complete disconnect */
1078 static unsigned int xpc_die_disconnecting;
1079 
1080 /*
1081  * Notify other partitions to deactivate from us by first disengaging from all
1082  * references to our memory.
1083  */
1084 static void
1085 xpc_die_deactivate(void)
1086 {
1087     struct xpc_partition *part;
1088     short partid;
1089     int any_engaged;
1090     long keep_waiting;
1091     long wait_to_print;
1092 
1093     if (cmpxchg(&xpc_die_disconnecting, 0, 1))
1094         return;
1095 
1096     /* keep xpc_hb_checker thread from doing anything (just in case) */
1097     xpc_exiting = 1;
1098 
1099     xpc_arch_ops.disallow_all_hbs();   /*indicate we're deactivated */
1100 
1101     for (partid = 0; partid < xp_max_npartitions; partid++) {
1102         part = &xpc_partitions[partid];
1103 
1104         if (xpc_arch_ops.partition_engaged(partid) ||
1105             part->act_state != XPC_P_AS_INACTIVE) {
1106             xpc_arch_ops.request_partition_deactivation(part);
1107             xpc_arch_ops.indicate_partition_disengaged(part);
1108         }
1109     }
1110 
1111     /*
1112      * Though we requested that all other partitions deactivate from us,
1113      * we only wait until they've all disengaged or we've reached the
1114      * defined timelimit.
1115      *
1116      * Given that one iteration through the following while-loop takes
1117      * approximately 200 microseconds, calculate the #of loops to take
1118      * before bailing and the #of loops before printing a waiting message.
1119      */
1120     keep_waiting = xpc_disengage_timelimit * 1000 * 5;
1121     wait_to_print = XPC_DEACTIVATE_PRINTMSG_INTERVAL * 1000 * 5;
1122 
1123     while (1) {
1124         any_engaged = xpc_arch_ops.any_partition_engaged();
1125         if (!any_engaged) {
1126             dev_info(xpc_part, "all partitions have deactivated\n");
1127             break;
1128         }
1129 
1130         if (!keep_waiting--) {
1131             for (partid = 0; partid < xp_max_npartitions;
1132                  partid++) {
1133                 if (xpc_arch_ops.partition_engaged(partid)) {
1134                     dev_info(xpc_part, "deactivate from "
1135                          "remote partition %d timed "
1136                          "out\n", partid);
1137                 }
1138             }
1139             break;
1140         }
1141 
1142         if (!wait_to_print--) {
1143             dev_info(xpc_part, "waiting for remote partitions to "
1144                  "deactivate, timeout in %ld seconds\n",
1145                  keep_waiting / (1000 * 5));
1146             wait_to_print = XPC_DEACTIVATE_PRINTMSG_INTERVAL *
1147                 1000 * 5;
1148         }
1149 
1150         udelay(200);
1151     }
1152 }
1153 
1154 /*
1155  * This function is called when the system is being restarted or halted due
1156  * to some sort of system failure. If this is the case we need to notify the
1157  * other partitions to disengage from all references to our memory.
1158  * This function can also be called when our heartbeater could be offlined
1159  * for a time. In this case we need to notify other partitions to not worry
1160  * about the lack of a heartbeat.
1161  */
1162 static int
1163 xpc_system_die(struct notifier_block *nb, unsigned long event, void *_die_args)
1164 {
1165 #ifdef CONFIG_IA64      /* !!! temporary kludge */
1166     switch (event) {
1167     case DIE_MACHINE_RESTART:
1168     case DIE_MACHINE_HALT:
1169         xpc_die_deactivate();
1170         break;
1171 
1172     case DIE_KDEBUG_ENTER:
1173         /* Should lack of heartbeat be ignored by other partitions? */
1174         if (!xpc_kdebug_ignore)
1175             break;
1176 
1177         fallthrough;
1178     case DIE_MCA_MONARCH_ENTER:
1179     case DIE_INIT_MONARCH_ENTER:
1180         xpc_arch_ops.offline_heartbeat();
1181         break;
1182 
1183     case DIE_KDEBUG_LEAVE:
1184         /* Is lack of heartbeat being ignored by other partitions? */
1185         if (!xpc_kdebug_ignore)
1186             break;
1187 
1188         fallthrough;
1189     case DIE_MCA_MONARCH_LEAVE:
1190     case DIE_INIT_MONARCH_LEAVE:
1191         xpc_arch_ops.online_heartbeat();
1192         break;
1193     }
1194 #else
1195     struct die_args *die_args = _die_args;
1196 
1197     switch (event) {
1198     case DIE_TRAP:
1199         if (die_args->trapnr == X86_TRAP_DF)
1200             xpc_die_deactivate();
1201 
1202         if (((die_args->trapnr == X86_TRAP_MF) ||
1203              (die_args->trapnr == X86_TRAP_XF)) &&
1204             !user_mode(die_args->regs))
1205             xpc_die_deactivate();
1206 
1207         break;
1208     case DIE_INT3:
1209     case DIE_DEBUG:
1210         break;
1211     case DIE_OOPS:
1212     case DIE_GPF:
1213     default:
1214         xpc_die_deactivate();
1215     }
1216 #endif
1217 
1218     return NOTIFY_DONE;
1219 }
1220 
1221 static int __init
1222 xpc_init(void)
1223 {
1224     int ret;
1225     struct task_struct *kthread;
1226 
1227     dev_set_name(xpc_part, "part");
1228     dev_set_name(xpc_chan, "chan");
1229 
1230     if (is_uv_system()) {
1231         ret = xpc_init_uv();
1232 
1233     } else {
1234         ret = -ENODEV;
1235     }
1236 
1237     if (ret != 0)
1238         return ret;
1239 
1240     ret = xpc_setup_partitions();
1241     if (ret != 0) {
1242         dev_err(xpc_part, "can't get memory for partition structure\n");
1243         goto out_1;
1244     }
1245 
1246     xpc_sysctl = register_sysctl_table(xpc_sys_dir);
1247 
1248     /*
1249      * Fill the partition reserved page with the information needed by
1250      * other partitions to discover we are alive and establish initial
1251      * communications.
1252      */
1253     ret = xpc_setup_rsvd_page();
1254     if (ret != 0) {
1255         dev_err(xpc_part, "can't setup our reserved page\n");
1256         goto out_2;
1257     }
1258 
1259     /* add ourselves to the reboot_notifier_list */
1260     ret = register_reboot_notifier(&xpc_reboot_notifier);
1261     if (ret != 0)
1262         dev_warn(xpc_part, "can't register reboot notifier\n");
1263 
1264     /* add ourselves to the die_notifier list */
1265     ret = register_die_notifier(&xpc_die_notifier);
1266     if (ret != 0)
1267         dev_warn(xpc_part, "can't register die notifier\n");
1268 
1269     /*
1270      * The real work-horse behind xpc.  This processes incoming
1271      * interrupts and monitors remote heartbeats.
1272      */
1273     kthread = kthread_run(xpc_hb_checker, NULL, XPC_HB_CHECK_THREAD_NAME);
1274     if (IS_ERR(kthread)) {
1275         dev_err(xpc_part, "failed while forking hb check thread\n");
1276         ret = -EBUSY;
1277         goto out_3;
1278     }
1279 
1280     /*
1281      * Startup a thread that will attempt to discover other partitions to
1282      * activate based on info provided by SAL. This new thread is short
1283      * lived and will exit once discovery is complete.
1284      */
1285     kthread = kthread_run(xpc_initiate_discovery, NULL,
1286                   XPC_DISCOVERY_THREAD_NAME);
1287     if (IS_ERR(kthread)) {
1288         dev_err(xpc_part, "failed while forking discovery thread\n");
1289 
1290         /* mark this new thread as a non-starter */
1291         complete(&xpc_discovery_exited);
1292 
1293         xpc_do_exit(xpUnloading);
1294         return -EBUSY;
1295     }
1296 
1297     /* set the interface to point at XPC's functions */
1298     xpc_set_interface(xpc_initiate_connect, xpc_initiate_disconnect,
1299               xpc_initiate_send, xpc_initiate_send_notify,
1300               xpc_initiate_received, xpc_initiate_partid_to_nasids);
1301 
1302     return 0;
1303 
1304     /* initialization was not successful */
1305 out_3:
1306     xpc_teardown_rsvd_page();
1307 
1308     (void)unregister_die_notifier(&xpc_die_notifier);
1309     (void)unregister_reboot_notifier(&xpc_reboot_notifier);
1310 out_2:
1311     if (xpc_sysctl)
1312         unregister_sysctl_table(xpc_sysctl);
1313 
1314     xpc_teardown_partitions();
1315 out_1:
1316     if (is_uv_system())
1317         xpc_exit_uv();
1318     return ret;
1319 }
1320 
1321 module_init(xpc_init);
1322 
1323 static void __exit
1324 xpc_exit(void)
1325 {
1326     xpc_do_exit(xpUnloading);
1327 }
1328 
1329 module_exit(xpc_exit);
1330 
1331 MODULE_AUTHOR("Silicon Graphics, Inc.");
1332 MODULE_DESCRIPTION("Cross Partition Communication (XPC) support");
1333 MODULE_LICENSE("GPL");
1334 
1335 module_param(xpc_hb_interval, int, 0);
1336 MODULE_PARM_DESC(xpc_hb_interval, "Number of seconds between "
1337          "heartbeat increments.");
1338 
1339 module_param(xpc_hb_check_interval, int, 0);
1340 MODULE_PARM_DESC(xpc_hb_check_interval, "Number of seconds between "
1341          "heartbeat checks.");
1342 
1343 module_param(xpc_disengage_timelimit, int, 0);
1344 MODULE_PARM_DESC(xpc_disengage_timelimit, "Number of seconds to wait "
1345          "for disengage to complete.");
1346 
1347 module_param(xpc_kdebug_ignore, int, 0);
1348 MODULE_PARM_DESC(xpc_kdebug_ignore, "Should lack of heartbeat be ignored by "
1349          "other partitions when dropping into kdebug.");