}
EXPORT_SYMBOL(class_disconnect_stale_exports);
-int oig_init(struct obd_io_group **oig_out)
-{
- struct obd_io_group *oig;
- ENTRY;
-
- OBD_ALLOC(oig, sizeof(*oig));
- if (oig == NULL)
- RETURN(-ENOMEM);
-
- spin_lock_init(&oig->oig_lock);
- oig->oig_rc = 0;
- oig->oig_pending = 0;
- atomic_set(&oig->oig_refcount, 1);
- cfs_waitq_init(&oig->oig_waitq);
- CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
-
- *oig_out = oig;
- RETURN(0);
-};
-EXPORT_SYMBOL(oig_init);
-
-static inline void oig_grab(struct obd_io_group *oig)
-{
- atomic_inc(&oig->oig_refcount);
-}
-
-void oig_release(struct obd_io_group *oig)
-{
- if (atomic_dec_and_test(&oig->oig_refcount))
- OBD_FREE(oig, sizeof(*oig));
-}
-EXPORT_SYMBOL(oig_release);
-
-int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
-{
- int rc = 0;
- CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
- spin_lock(&oig->oig_lock);
- if (oig->oig_rc) {
- rc = oig->oig_rc;
- } else {
- oig->oig_pending++;
- if (occ != NULL)
- list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
- }
- spin_unlock(&oig->oig_lock);
- oig_grab(oig);
-
- return rc;
-}
-EXPORT_SYMBOL(oig_add_one);
-
-void oig_complete_one(struct obd_io_group *oig,
- struct oig_callback_context *occ, int rc)
-{
- cfs_waitq_t *wake = NULL;
- int old_rc;
-
- spin_lock(&oig->oig_lock);
-
- if (occ != NULL)
- list_del_init(&occ->occ_oig_item);
-
- old_rc = oig->oig_rc;
- if (oig->oig_rc == 0 && rc != 0)
- oig->oig_rc = rc;
-
- if (--oig->oig_pending <= 0)
- wake = &oig->oig_waitq;
-
- spin_unlock(&oig->oig_lock);
-
- CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
- "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
- oig->oig_pending);
- if (wake)
- cfs_waitq_signal(wake);
- oig_release(oig);
-}
-EXPORT_SYMBOL(oig_complete_one);
-
-static int oig_done(struct obd_io_group *oig)
-{
- int rc = 0;
- spin_lock(&oig->oig_lock);
- if (oig->oig_pending <= 0)
- rc = 1;
- spin_unlock(&oig->oig_lock);
- return rc;
-}
-
-static void interrupted_oig(void *data)
-{
- struct obd_io_group *oig = data;
- struct oig_callback_context *occ;
-
- spin_lock(&oig->oig_lock);
- /* We need to restart the processing each time we drop the lock, as
- * it is possible other threads called oig_complete_one() to remove
- * an entry elsewhere in the list while we dropped lock. We need to
- * drop the lock because osc_ap_completion() calls oig_complete_one()
- * which re-gets this lock ;-) as well as a lock ordering issue. */
-restart:
- list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
- if (occ->interrupted)
- continue;
- occ->interrupted = 1;
- spin_unlock(&oig->oig_lock);
- occ->occ_interrupted(occ);
- spin_lock(&oig->oig_lock);
- goto restart;
- }
- spin_unlock(&oig->oig_lock);
-}
-
-int oig_wait(struct obd_io_group *oig)
-{
- struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
- int rc;
-
- CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
-
- do {
- rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
- LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
- /* we can't continue until the oig has emptied and stopped
- * referencing state that the caller will free upon return */
- if (rc == -EINTR)
- lwi = (struct l_wait_info){ 0, };
- } while (rc == -EINTR);
-
- LASSERTF(oig->oig_pending == 0,
- "exiting oig_wait(oig = %p) with %d pending\n", oig,
- oig->oig_pending);
-
- CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
- return oig->oig_rc;
-}
-EXPORT_SYMBOL(oig_wait);
-
void class_fail_export(struct obd_export *exp)
{
int rc, already_failed;
/**
* check for work for kill zombie import/export thread.
*/
-int obd_zombie_impexp_check(void *arg)
+static int obd_zombie_impexp_check(void *arg)
{
int rc;
cfs_waitq_signal(&obd_zombie_waitq);
}
+/**
+ * check whether obd_zombie is idle
+ */
+static int obd_zombie_is_idle(void)
+{
+ int rc;
+
+ LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
+ spin_lock(&obd_zombie_impexp_lock);
+ rc = list_empty(&obd_zombie_imports) &&
+ list_empty(&obd_zombie_exports);
+ spin_unlock(&obd_zombie_impexp_lock);
+ return rc;
+}
+
+/**
+ * wait when obd_zombie import/export queues become empty
+ */
+void obd_zombie_barrier(void)
+{
+ struct l_wait_info lwi = { 0 };
+
+ l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
+}
+EXPORT_SYMBOL(obd_zombie_barrier);
+
#ifdef __KERNEL__
/**
l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
obd_zombie_impexp_cull();
+ /* Notify obd_zombie_barrier callers that queues may be empty */
+ cfs_waitq_signal(&obd_zombie_waitq);
}
complete(&obd_zombie_stop);