Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdclass / genops.c
index 7341aaa..9c20af2 100644 (file)
@@ -46,7 +46,6 @@
 #include <obd_ost.h>
 #include <obd_class.h>
 #include <lprocfs_status.h>
-#include <class_hash.h>
 
 extern struct list_head obd_types;
 spinlock_t obd_types_lock;
@@ -62,6 +61,8 @@ spinlock_t        obd_zombie_impexp_lock;
 static void obd_zombie_impexp_notify(void);
 static void obd_zombie_export_add(struct obd_export *exp);
 static void obd_zombie_import_add(struct obd_import *imp);
+static void print_export_data(struct obd_export *exp,
+                              const char *status, int locks);
 
 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
 
@@ -73,7 +74,7 @@ static struct obd_device *obd_device_alloc(void)
 {
         struct obd_device *obd;
 
-        OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
+        OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
         if (obd != NULL) {
                 obd->obd_magic = OBD_DEVICE_MAGIC;
         }
@@ -119,7 +120,7 @@ struct obd_type *class_get_type(const char *name)
 #ifdef CONFIG_KMOD
         if (!type) {
                 const char *modname = name;
-                if (!request_module(modname)) {
+                if (!request_module("%s", modname)) {
                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
                         type = class_search_type(name);
                 } else {
@@ -261,8 +262,8 @@ int class_unregister_type(const char *name)
  *
  * Find an empty slot in ::obd_devs[], create a new obd device in it.
  *
- * \param typename [in] obd device type string.
- * \param name     [in] obd device name.
+ * \param[in] type_name obd device type string.
+ * \param[in] name      obd device name.
  *
  * \retval NULL if create fails, otherwise return the obd device
  *         pointer created.
@@ -541,7 +542,7 @@ struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
 }
 
 /**
- * to notify sptlrpc log for @fsname has changed, let every relevant OBD
+ * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
  * adjust sptlrpc settings accordingly.
  */
 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
@@ -723,6 +724,7 @@ static void class_export_destroy(struct obd_export *exp)
                 ptlrpc_put_connection_superhack(exp->exp_connection);
 
         LASSERT(list_empty(&exp->exp_outstanding_replies));
+        LASSERT(list_empty(&exp->exp_uncommitted_replies));
         LASSERT(list_empty(&exp->exp_req_replay_queue));
         LASSERT(list_empty(&exp->exp_queued_rpc));
         obd_destroy_export(exp);
@@ -755,6 +757,7 @@ void class_export_put(struct obd_export *exp)
         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
 
         if (atomic_dec_and_test(&exp->exp_refcount)) {
+                LASSERT(!list_empty(&exp->exp_obd_chain));
                 CDEBUG(D_IOCTL, "final put %p/%s\n",
                        exp, exp->exp_client_uuid.uuid);
                 obd_zombie_export_add(exp);
@@ -770,6 +773,7 @@ struct obd_export *class_new_export(struct obd_device *obd,
 {
         struct obd_export *export;
         int rc = 0;
+        ENTRY;
 
         OBD_ALLOC_PTR(export);
         if (!export)
@@ -779,8 +783,17 @@ struct obd_export *class_new_export(struct obd_device *obd,
         export->exp_lock_hash = NULL;
         atomic_set(&export->exp_refcount, 2);
         atomic_set(&export->exp_rpc_count, 0);
+        atomic_set(&export->exp_cb_count, 0);
+        atomic_set(&export->exp_locks_count, 0);
+#if LUSTRE_TRACKS_LOCK_EXP_REFS
+        CFS_INIT_LIST_HEAD(&export->exp_locks_list);
+        spin_lock_init(&export->exp_locks_list_guard);
+#endif
+        atomic_set(&export->exp_replay_count, 0);
         export->exp_obd = obd;
         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
+        spin_lock_init(&export->exp_uncommitted_replies_lock);
+        CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
@@ -796,28 +809,35 @@ struct obd_export *class_new_export(struct obd_device *obd,
         obd_init_export(export);
 
         spin_lock(&obd->obd_dev_lock);
+         /* shouldn't happen, but might race */
+        if (obd->obd_stopping)
+                GOTO(exit_err, rc = -ENODEV);
+
         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
-                rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
-                                            &export->exp_uuid_hash);
+                rc = cfs_hash_add_unique(obd->obd_uuid_hash, cluuid,
+                                         &export->exp_uuid_hash);
                 if (rc != 0) {
                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
                                       obd->obd_name, cluuid->uuid, rc);
-                        spin_unlock(&obd->obd_dev_lock);
-                        class_handle_unhash(&export->exp_handle);
-                        OBD_FREE_PTR(export);
-                        return ERR_PTR(-EALREADY);
+                        GOTO(exit_err, rc = -EALREADY);
                 }
         }
 
-        LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
         class_incref(obd, "export", export);
         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
         list_add_tail(&export->exp_obd_chain_timed,
                       &export->exp_obd->obd_exports_timed);
         export->exp_obd->obd_num_exports++;
         spin_unlock(&obd->obd_dev_lock);
+        RETURN(export);
 
-        return export;
+exit_err:
+        spin_unlock(&obd->obd_dev_lock);
+        class_handle_unhash(&export->exp_handle);
+        LASSERT(hlist_unhashed(&export->exp_uuid_hash));
+        obd_destroy_export(export);
+        OBD_FREE_PTR(export);
+        return ERR_PTR(rc);
 }
 EXPORT_SYMBOL(class_new_export);
 
@@ -828,15 +848,14 @@ void class_unlink_export(struct obd_export *exp)
         spin_lock(&exp->exp_obd->obd_dev_lock);
         /* delete an uuid-export hashitem from hashtables */
         if (!hlist_unhashed(&exp->exp_uuid_hash))
-                lustre_hash_del(exp->exp_obd->obd_uuid_hash,
-                                &exp->exp_client_uuid,
-                                &exp->exp_uuid_hash);
+                cfs_hash_del(exp->exp_obd->obd_uuid_hash,
+                             &exp->exp_client_uuid,
+                             &exp->exp_uuid_hash);
 
-        list_del_init(&exp->exp_obd_chain);
+        list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
         list_del_init(&exp->exp_obd_chain_timed);
         exp->exp_obd->obd_num_exports--;
         spin_unlock(&exp->exp_obd->obd_dev_lock);
-
         class_export_put(exp);
 }
 EXPORT_SYMBOL(class_unlink_export);
@@ -880,7 +899,7 @@ struct obd_import *class_import_get(struct obd_import *import)
         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
         atomic_inc(&import->imp_refcount);
         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
-               atomic_read(&import->imp_refcount), 
+               atomic_read(&import->imp_refcount),
                import->imp_obd->obd_name);
         return import;
 }
@@ -895,7 +914,7 @@ void class_import_put(struct obd_import *imp)
         LASSERT(list_empty(&imp->imp_zombie_chain));
 
         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
-               atomic_read(&imp->imp_refcount) - 1, 
+               atomic_read(&imp->imp_refcount) - 1,
                imp->imp_obd->obd_name);
 
         if (atomic_dec_and_test(&imp->imp_refcount)) {
@@ -970,6 +989,49 @@ void class_destroy_import(struct obd_import *import)
 }
 EXPORT_SYMBOL(class_destroy_import);
 
+#if LUSTRE_TRACKS_LOCK_EXP_REFS
+
+void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
+{
+        spin_lock(&exp->exp_locks_list_guard);
+
+        LASSERT(lock->l_exp_refs_nr >= 0);
+
+        if (lock->l_exp_refs_target != NULL &&
+            lock->l_exp_refs_target != exp) {
+                LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
+                              exp, lock, lock->l_exp_refs_target);
+        }
+        if ((lock->l_exp_refs_nr ++) == 0) {
+                list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
+                lock->l_exp_refs_target = exp;
+        }
+        CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
+               lock, exp, lock->l_exp_refs_nr);
+        spin_unlock(&exp->exp_locks_list_guard);
+}
+EXPORT_SYMBOL(__class_export_add_lock_ref);
+
+void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
+{
+        spin_lock(&exp->exp_locks_list_guard);
+        LASSERT(lock->l_exp_refs_nr > 0);
+        if (lock->l_exp_refs_target != exp) {
+                LCONSOLE_WARN("lock %p, "
+                              "mismatching export pointers: %p, %p\n",
+                              lock, lock->l_exp_refs_target, exp);
+        }
+        if (-- lock->l_exp_refs_nr == 0) {
+                list_del_init(&lock->l_exp_refs_link);
+                lock->l_exp_refs_target = NULL;
+        }
+        CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
+               lock, exp, lock->l_exp_refs_nr);
+        spin_unlock(&exp->exp_locks_list_guard);
+}
+EXPORT_SYMBOL(__class_export_del_lock_ref);
+#endif
+
 /* A connection defines an export context in which preallocation can
    be managed. This releases the export pointer reference, and returns
    the export handle, so the export refcount is 1 when this function
@@ -1002,35 +1064,40 @@ void class_export_recovery_cleanup(struct obd_export *exp)
         struct obd_device *obd = exp->exp_obd;
 
         spin_lock_bh(&obd->obd_processing_task_lock);
+        if (exp->exp_delayed)
+                obd->obd_delayed_clients--;
         if (obd->obd_recovering && exp->exp_in_recovery) {
                 spin_lock(&exp->exp_lock);
                 exp->exp_in_recovery = 0;
                 spin_unlock(&exp->exp_lock);
+                LASSERT(obd->obd_connected_clients);
                 obd->obd_connected_clients--;
-                /* each connected client is counted as recoverable */
-                obd->obd_recoverable_clients--;
-                if (exp->exp_req_replay_needed) {
-                        spin_lock(&exp->exp_lock);
-                        exp->exp_req_replay_needed = 0;
-                        spin_unlock(&exp->exp_lock);
-                        LASSERT(atomic_read(&obd->obd_req_replay_clients));
-                        atomic_dec(&obd->obd_req_replay_clients);
-                }
-                if (exp->exp_lock_replay_needed) {
-                        spin_lock(&exp->exp_lock);
-                        exp->exp_lock_replay_needed = 0;
-                        spin_unlock(&exp->exp_lock);
-                        LASSERT(atomic_read(&obd->obd_lock_replay_clients));
-                        atomic_dec(&obd->obd_lock_replay_clients);
-                }
+        }
+        /** Cleanup req replay fields */
+        if (exp->exp_req_replay_needed) {
+                spin_lock(&exp->exp_lock);
+                exp->exp_req_replay_needed = 0;
+                spin_unlock(&exp->exp_lock);
+                LASSERT(atomic_read(&obd->obd_req_replay_clients));
+                atomic_dec(&obd->obd_req_replay_clients);
+        }
+        /** Cleanup lock replay data */
+        if (exp->exp_lock_replay_needed) {
+                spin_lock(&exp->exp_lock);
+                exp->exp_lock_replay_needed = 0;
+                spin_unlock(&exp->exp_lock);
+                LASSERT(atomic_read(&obd->obd_lock_replay_clients));
+                atomic_dec(&obd->obd_lock_replay_clients);
         }
         spin_unlock_bh(&obd->obd_processing_task_lock);
 }
 
-/* This function removes two references from the export: one for the
- * hash entry and one for the export pointer passed in.  The export
- * pointer passed to this function is destroyed should not be used
- * again. */
+/* This function removes 1-3 references from the export:
+ * 1 - for export pointer passed
+ * and if disconnect really need
+ * 2 - removing from hash
+ * 3 - in client_unlink_export
+ * The export pointer passed to this function can destroyed */
 int class_disconnect(struct obd_export *export)
 {
         int already_disconnected;
@@ -1045,41 +1112,57 @@ int class_disconnect(struct obd_export *export)
         spin_lock(&export->exp_lock);
         already_disconnected = export->exp_disconnected;
         export->exp_disconnected = 1;
-
-        if (!hlist_unhashed(&export->exp_nid_hash))
-                lustre_hash_del(export->exp_obd->obd_nid_hash,
-                                &export->exp_connection->c_peer.nid,
-                                &export->exp_nid_hash);
-
         spin_unlock(&export->exp_lock);
 
         /* class_cleanup(), abort_recovery(), and class_fail_export()
          * all end up in here, and if any of them race we shouldn't
          * call extra class_export_puts(). */
-        if (already_disconnected)
-                RETURN(0);
+        if (already_disconnected) {
+                LASSERT(hlist_unhashed(&export->exp_nid_hash));
+                GOTO(no_disconn, already_disconnected);
+        }
 
         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
                export->exp_handle.h_cookie);
 
+        if (!hlist_unhashed(&export->exp_nid_hash))
+                cfs_hash_del(export->exp_obd->obd_nid_hash,
+                             &export->exp_connection->c_peer.nid,
+                             &export->exp_nid_hash);
+
         class_export_recovery_cleanup(export);
         class_unlink_export(export);
+no_disconn:
         class_export_put(export);
         RETURN(0);
 }
 
+/* Return non-zero for a fully connected export */
+int class_connected_export(struct obd_export *exp)
+{
+        if (exp) {
+                int connected;
+                spin_lock(&exp->exp_lock);
+                connected = (exp->exp_conn_cnt > 0);
+                spin_unlock(&exp->exp_lock);
+                return connected;
+        }
+        return 0;
+}
+EXPORT_SYMBOL(class_connected_export);
+
 static void class_disconnect_export_list(struct list_head *list,
                                          enum obd_option flags)
 {
         int rc;
-        struct lustre_handle fake_conn;
-        struct obd_export *fake_exp, *exp;
+        struct obd_export *exp;
         ENTRY;
 
         /* It's possible that an export may disconnect itself, but
          * nothing else will be added to this list. */
         while (!list_empty(list)) {
                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
+                /* need for safe call CDEBUG after obd_disconnect */
                 class_export_get(exp);
 
                 spin_lock(&exp->exp_lock);
@@ -1098,22 +1181,16 @@ static void class_disconnect_export_list(struct list_head *list,
                         continue;
                 }
 
-                fake_conn.cookie = exp->exp_handle.h_cookie;
-                fake_exp = class_conn2export(&fake_conn);
-                if (!fake_exp) {
-                        class_export_put(exp);
-                        continue;
-                }
-
-                spin_lock(&fake_exp->exp_lock);
-                fake_exp->exp_flags = flags;
-                spin_unlock(&fake_exp->exp_lock);
-
+                class_export_get(exp);
                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
                        "last request at "CFS_TIME_T"\n",
                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
                        exp, exp->exp_last_request_time);
-                rc = obd_disconnect(fake_exp);
+                /* release one export reference anyway */
+                rc = obd_disconnect(exp);
+
+                CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
+                       obd_export_nid2str(exp), exp, rc);
                 class_export_put(exp);
         }
         EXIT;
@@ -1125,9 +1202,10 @@ void class_disconnect_exports(struct obd_device *obd)
         ENTRY;
 
         /* Move all of the exports from obd_exports to a work list, en masse. */
+        CFS_INIT_LIST_HEAD(&work_list);
         spin_lock(&obd->obd_dev_lock);
-        list_add(&work_list, &obd->obd_exports);
-        list_del_init(&obd->obd_exports);
+        list_splice_init(&obd->obd_exports, &work_list);
+        list_splice_init(&obd->obd_delayed_exports, &work_list);
         spin_unlock(&obd->obd_dev_lock);
 
         if (!list_empty(&work_list)) {
@@ -1144,14 +1222,13 @@ EXPORT_SYMBOL(class_disconnect_exports);
 
 /* Remove exports that have not completed recovery.
  */
-int class_disconnect_stale_exports(struct obd_device *obd,
-                                   int (*test_export)(struct obd_export *),
-                                   enum obd_option flags)
+void class_disconnect_stale_exports(struct obd_device *obd,
+                                    int (*test_export)(struct obd_export *))
 {
         struct list_head work_list;
         struct list_head *pos, *n;
         struct obd_export *exp;
-        int cnt = 0;
+        int evicted = 0;
         ENTRY;
 
         CFS_INIT_LIST_HEAD(&work_list);
@@ -1161,25 +1238,29 @@ int class_disconnect_stale_exports(struct obd_device *obd,
                 if (test_export(exp))
                         continue;
 
-                list_del(&exp->exp_obd_chain);
-                list_add(&exp->exp_obd_chain, &work_list);
                 /* don't count self-export as client */
                 if (obd_uuid_equals(&exp->exp_client_uuid,
-                                     &exp->exp_obd->obd_uuid))
+                                    &exp->exp_obd->obd_uuid))
                         continue;
 
-                cnt++;
+                list_move(&exp->exp_obd_chain, &work_list);
+                evicted++;
                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
                        obd->obd_name, exp->exp_client_uuid.uuid,
                        exp->exp_connection == NULL ? "<unknown>" :
                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
+                print_export_data(exp, "EVICTING", 0);
         }
         spin_unlock(&obd->obd_dev_lock);
 
-        CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
-               obd->obd_name, cnt);
-        class_disconnect_export_list(&work_list, flags);
-        RETURN(cnt);
+        if (evicted) {
+                CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
+                       obd->obd_name, evicted);
+                obd->obd_stale_clients += evicted;
+        }
+        class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
+                                                 OBD_OPT_ABORT_RECOV);
+        EXIT;
 }
 EXPORT_SYMBOL(class_disconnect_stale_exports);
 
@@ -1234,7 +1315,7 @@ int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
 
         do {
-                doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
+                doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
                 if (doomed_exp == NULL)
                         break;
 
@@ -1271,7 +1352,7 @@ int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
                 return exports_evicted;
         }
 
-        doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
+        doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
 
         if (doomed_exp == NULL) {
                 CERROR("%s: can't disconnect %s: no exports found\n",
@@ -1288,6 +1369,83 @@ int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
 }
 EXPORT_SYMBOL(obd_export_evict_by_uuid);
 
+#if LUSTRE_TRACKS_LOCK_EXP_REFS
+void (*class_export_dump_hook)(struct obd_export*) = NULL;
+EXPORT_SYMBOL(class_export_dump_hook);
+#endif
+
+static void print_export_data(struct obd_export *exp, const char *status,
+                              int locks)
+{
+        struct ptlrpc_reply_state *rs;
+        struct ptlrpc_reply_state *first_reply = NULL;
+        int nreplies = 0;
+
+        spin_lock(&exp->exp_lock);
+        list_for_each_entry (rs, &exp->exp_outstanding_replies, rs_exp_list) {
+                if (nreplies == 0)
+                        first_reply = rs;
+                nreplies++;
+        }
+        spin_unlock(&exp->exp_lock);
+
+        CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
+               exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
+               obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
+               atomic_read(&exp->exp_rpc_count),
+               atomic_read(&exp->exp_cb_count),
+               atomic_read(&exp->exp_locks_count),
+               exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
+               nreplies, first_reply, nreplies > 3 ? "..." : "",
+               exp->exp_last_committed);
+#if LUSTRE_TRACKS_LOCK_EXP_REFS
+        if (locks && class_export_dump_hook != NULL)
+                class_export_dump_hook(exp);
+#endif
+}
+
+void dump_exports(struct obd_device *obd, int locks)
+{
+        struct obd_export *exp;
+
+        spin_lock(&obd->obd_dev_lock);
+        list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
+                print_export_data(exp, "ACTIVE", locks);
+        list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
+                print_export_data(exp, "UNLINKED", locks);
+        list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
+                print_export_data(exp, "DELAYED", locks);
+        spin_unlock(&obd->obd_dev_lock);
+        spin_lock(&obd_zombie_impexp_lock);
+        list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
+                print_export_data(exp, "ZOMBIE", locks);
+        spin_unlock(&obd_zombie_impexp_lock);
+}
+EXPORT_SYMBOL(dump_exports);
+
+void obd_exports_barrier(struct obd_device *obd)
+{
+        int waited = 2;
+        LASSERT(list_empty(&obd->obd_exports));
+        spin_lock(&obd->obd_dev_lock);
+        while (!list_empty(&obd->obd_unlinked_exports)) {
+                spin_unlock(&obd->obd_dev_lock);
+                cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
+                if (waited > 5 && IS_PO2(waited)) {
+                        LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
+                                      "more than %d seconds. "
+                                      "The obd refcount = %d. Is it stuck?\n",
+                                      obd->obd_name, waited,
+                                      atomic_read(&obd->obd_refcount));
+                        dump_exports(obd, 0);
+                }
+                waited *= 2;
+                spin_lock(&obd->obd_dev_lock);
+        }
+        spin_unlock(&obd->obd_dev_lock);
+}
+EXPORT_SYMBOL(obd_exports_barrier);
+
 /**
  * kill zombie imports and exports
  */
@@ -1332,6 +1490,7 @@ static struct completion        obd_zombie_start;
 static struct completion        obd_zombie_stop;
 static unsigned long            obd_zombie_flags;
 static cfs_waitq_t              obd_zombie_waitq;
+static pid_t                    obd_zombie_pid;
 
 enum {
         OBD_ZOMBIE_STOP   = 1 << 1
@@ -1358,8 +1517,11 @@ static int obd_zombie_impexp_check(void *arg)
  * Add export to the obd_zombe thread and notify it.
  */
 static void obd_zombie_export_add(struct obd_export *exp) {
+        spin_lock(&exp->exp_obd->obd_dev_lock);
+        LASSERT(!list_empty(&exp->exp_obd_chain));
+        list_del_init(&exp->exp_obd_chain);
+        spin_unlock(&exp->exp_obd->obd_dev_lock);
         spin_lock(&obd_zombie_impexp_lock);
-        LASSERT(list_empty(&exp->exp_obd_chain));
         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
         spin_unlock(&obd_zombie_impexp_lock);
 
@@ -1371,6 +1533,7 @@ static void obd_zombie_export_add(struct obd_export *exp) {
  * Add import to the obd_zombe thread and notify it.
  */
 static void obd_zombie_import_add(struct obd_import *imp) {
+        LASSERT(imp->imp_sec == NULL);
         spin_lock(&obd_zombie_impexp_lock);
         LASSERT(list_empty(&imp->imp_zombie_chain));
         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
@@ -1409,6 +1572,10 @@ static int obd_zombie_is_idle(void)
 void obd_zombie_barrier(void)
 {
         struct l_wait_info lwi = { 0 };
+
+        if (obd_zombie_pid == cfs_curproc_pid())
+                /* don't wait for myself */
+                return;
         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
 }
 EXPORT_SYMBOL(obd_zombie_barrier);
@@ -1429,14 +1596,16 @@ static int obd_zombie_impexp_thread(void *unused)
 
         complete(&obd_zombie_start);
 
+        obd_zombie_pid = cfs_curproc_pid();
+
         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
                 struct l_wait_info lwi = { 0 };
 
-                l_wait_event(obd_zombie_waitq, 
+                l_wait_event(obd_zombie_waitq,
                              !obd_zombie_impexp_check(NULL), &lwi);
                 obd_zombie_impexp_cull();
 
-                /* 
+                /*
                  * Notify obd_zombie_barrier callers that queues
                  * may be empty.
                  */
@@ -1458,7 +1627,7 @@ int obd_zombie_impexp_kill(void *arg)
 {
         int rc = 0;
 
-       if (atomic_inc_return(&zombie_recur) == 1) {
+        if (atomic_inc_return(&zombie_recur) == 1) {
                 obd_zombie_impexp_cull();
                 rc = 1;
         }
@@ -1481,6 +1650,7 @@ int obd_zombie_impexp_init(void)
         init_completion(&obd_zombie_start);
         init_completion(&obd_zombie_stop);
         cfs_waitq_init(&obd_zombie_waitq);
+        obd_zombie_pid = 0;
 
 #ifdef __KERNEL__
         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
@@ -1515,3 +1685,4 @@ void obd_zombie_impexp_stop(void)
         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
 #endif
 }
+