+ EXIT;
+}
+
+static cfs_completion_t obd_zombie_start;
+static cfs_completion_t obd_zombie_stop;
+static unsigned long obd_zombie_flags;
+static cfs_waitq_t obd_zombie_waitq;
+static pid_t obd_zombie_pid;
+
+enum {
+ OBD_ZOMBIE_STOP = 1 << 1
+};
+
+/**
+ * check for work for kill zombie import/export thread.
+ */
+static int obd_zombie_impexp_check(void *arg)
+{
+ int rc;
+
+ cfs_spin_lock(&obd_zombie_impexp_lock);
+ rc = cfs_list_empty(&obd_zombie_imports) &&
+ cfs_list_empty(&obd_zombie_exports) &&
+ !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
+
+ cfs_spin_unlock(&obd_zombie_impexp_lock);
+
+ RETURN(rc);
+}
+
+/**
+ * Add export to the obd_zombe thread and notify it.
+ */
+static void obd_zombie_export_add(struct obd_export *exp) {
+ cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
+ LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
+ cfs_list_del_init(&exp->exp_obd_chain);
+ cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
+ cfs_spin_lock(&obd_zombie_impexp_lock);
+ cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
+ cfs_spin_unlock(&obd_zombie_impexp_lock);
+
+ if (obd_zombie_impexp_notify != NULL)
+ obd_zombie_impexp_notify();
+}
+
+/**
+ * Add import to the obd_zombe thread and notify it.
+ */
+static void obd_zombie_import_add(struct obd_import *imp) {
+ LASSERT(imp->imp_sec == NULL);
+ cfs_spin_lock(&obd_zombie_impexp_lock);
+ LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
+ cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
+ cfs_spin_unlock(&obd_zombie_impexp_lock);
+
+ if (obd_zombie_impexp_notify != NULL)
+ obd_zombie_impexp_notify();
+}
+
+/**
+ * notify import/export destroy thread about new zombie.
+ */
+static void obd_zombie_impexp_notify(void)
+{
+ cfs_waitq_signal(&obd_zombie_waitq);
+}
+
+/**
+ * check whether obd_zombie is idle
+ */
+static int obd_zombie_is_idle(void)
+{
+ int rc;
+
+ LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
+ cfs_spin_lock(&obd_zombie_impexp_lock);
+ rc = cfs_list_empty(&obd_zombie_imports) &&
+ cfs_list_empty(&obd_zombie_exports);
+ cfs_spin_unlock(&obd_zombie_impexp_lock);
+ return rc;
+}
+
+/**
+ * wait when obd_zombie import/export queues become empty
+ */
+void obd_zombie_barrier(void)
+{
+ struct l_wait_info lwi = { 0 };
+
+ if (obd_zombie_pid == cfs_curproc_pid())
+ /* don't wait for myself */
+ return;
+ l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
+}
+EXPORT_SYMBOL(obd_zombie_barrier);
+
+#ifdef __KERNEL__
+
+/**
+ * destroy zombie export/import thread.
+ */
+static int obd_zombie_impexp_thread(void *unused)
+{
+ int rc;
+
+ if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
+ cfs_complete(&obd_zombie_start);
+ RETURN(rc);
+ }
+
+ cfs_complete(&obd_zombie_start);
+
+ obd_zombie_pid = cfs_curproc_pid();
+
+ while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
+ struct l_wait_info lwi = { 0 };
+
+ l_wait_event(obd_zombie_waitq,
+ !obd_zombie_impexp_check(NULL), &lwi);
+ obd_zombie_impexp_cull();
+
+ /*
+ * Notify obd_zombie_barrier callers that queues
+ * may be empty.
+ */
+ cfs_waitq_signal(&obd_zombie_waitq);
+ }
+
+ cfs_complete(&obd_zombie_stop);
+
+ RETURN(0);
+}
+
+#else /* ! KERNEL */
+
+static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
+static void *obd_zombie_impexp_work_cb;
+static void *obd_zombie_impexp_idle_cb;
+
+int obd_zombie_impexp_kill(void *arg)
+{
+ int rc = 0;
+
+ if (cfs_atomic_inc_return(&zombie_recur) == 1) {
+ obd_zombie_impexp_cull();
+ rc = 1;
+ }
+ cfs_atomic_dec(&zombie_recur);
+ return rc;
+}
+
+#endif
+
+/**
+ * start destroy zombie import/export thread
+ */
+int obd_zombie_impexp_init(void)
+{
+ int rc;
+
+ CFS_INIT_LIST_HEAD(&obd_zombie_imports);
+ CFS_INIT_LIST_HEAD(&obd_zombie_exports);
+ cfs_spin_lock_init(&obd_zombie_impexp_lock);
+ cfs_init_completion(&obd_zombie_start);
+ cfs_init_completion(&obd_zombie_stop);
+ cfs_waitq_init(&obd_zombie_waitq);
+ obd_zombie_pid = 0;
+
+#ifdef __KERNEL__
+ rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
+ if (rc < 0)
+ RETURN(rc);
+
+ cfs_wait_for_completion(&obd_zombie_start);
+#else
+
+ obd_zombie_impexp_work_cb =
+ liblustre_register_wait_callback("obd_zombi_impexp_kill",
+ &obd_zombie_impexp_kill, NULL);
+
+ obd_zombie_impexp_idle_cb =
+ liblustre_register_idle_callback("obd_zombi_impexp_check",
+ &obd_zombie_impexp_check, NULL);
+ rc = 0;
+#endif
+ RETURN(rc);
+}
+/**
+ * stop destroy zombie import/export thread
+ */
+void obd_zombie_impexp_stop(void)
+{
+ cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
+ obd_zombie_impexp_notify();
+#ifdef __KERNEL__
+ cfs_wait_for_completion(&obd_zombie_stop);
+#else
+ liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
+ liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
+#endif
+}
+
+/***** Kernel-userspace comm helpers *******/
+
+/* Get length of entire message, including header */
+int kuc_len(int payload_len)
+{
+ return sizeof(struct kuc_hdr) + payload_len;
+}
+EXPORT_SYMBOL(kuc_len);
+
+/* Get a pointer to kuc header, given a ptr to the payload
+ * @param p Pointer to payload area
+ * @returns Pointer to kuc header
+ */
+struct kuc_hdr * kuc_ptr(void *p)
+{
+ struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
+ LASSERT(lh->kuc_magic == KUC_MAGIC);
+ return lh;
+}
+EXPORT_SYMBOL(kuc_ptr);
+
+/* Test if payload is part of kuc message
+ * @param p Pointer to payload area
+ * @returns boolean
+ */
+int kuc_ispayload(void *p)
+{
+ struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
+
+ if (kh->kuc_magic == KUC_MAGIC)
+ return 1;
+ else
+ return 0;
+}
+EXPORT_SYMBOL(kuc_ispayload);
+
+/* Alloc space for a message, and fill in header
+ * @return Pointer to payload area
+ */
+void *kuc_alloc(int payload_len, int transport, int type)
+{
+ struct kuc_hdr *lh;
+ int len = kuc_len(payload_len);
+
+ OBD_ALLOC(lh, len);
+ if (lh == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ lh->kuc_magic = KUC_MAGIC;
+ lh->kuc_transport = transport;
+ lh->kuc_msgtype = type;
+ lh->kuc_msglen = len;
+
+ return (void *)(lh + 1);