Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdclass / genops.c
index f4099e0..9c20af2 100644 (file)
@@ -46,7 +46,6 @@
 #include <obd_ost.h>
 #include <obd_class.h>
 #include <lprocfs_status.h>
-#include <class_hash.h>
 
 extern struct list_head obd_types;
 spinlock_t obd_types_lock;
@@ -62,6 +61,8 @@ spinlock_t        obd_zombie_impexp_lock;
 static void obd_zombie_impexp_notify(void);
 static void obd_zombie_export_add(struct obd_export *exp);
 static void obd_zombie_import_add(struct obd_import *imp);
+static void print_export_data(struct obd_export *exp,
+                              const char *status, int locks);
 
 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
 
@@ -119,7 +120,7 @@ struct obd_type *class_get_type(const char *name)
 #ifdef CONFIG_KMOD
         if (!type) {
                 const char *modname = name;
-                if (!request_module(modname)) {
+                if (!request_module("%s", modname)) {
                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
                         type = class_search_type(name);
                 } else {
@@ -261,8 +262,8 @@ int class_unregister_type(const char *name)
  *
  * Find an empty slot in ::obd_devs[], create a new obd device in it.
  *
- * \param typename [in] obd device type string.
- * \param name     [in] obd device name.
+ * \param[in] type_name obd device type string.
+ * \param[in] name      obd device name.
  *
  * \retval NULL if create fails, otherwise return the obd device
  *         pointer created.
@@ -541,7 +542,7 @@ struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
 }
 
 /**
- * to notify sptlrpc log for @fsname has changed, let every relevant OBD
+ * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
  * adjust sptlrpc settings accordingly.
  */
 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
@@ -772,6 +773,7 @@ struct obd_export *class_new_export(struct obd_device *obd,
 {
         struct obd_export *export;
         int rc = 0;
+        ENTRY;
 
         OBD_ALLOC_PTR(export);
         if (!export)
@@ -783,6 +785,11 @@ struct obd_export *class_new_export(struct obd_device *obd,
         atomic_set(&export->exp_rpc_count, 0);
         atomic_set(&export->exp_cb_count, 0);
         atomic_set(&export->exp_locks_count, 0);
+#if LUSTRE_TRACKS_LOCK_EXP_REFS
+        CFS_INIT_LIST_HEAD(&export->exp_locks_list);
+        spin_lock_init(&export->exp_locks_list_guard);
+#endif
+        atomic_set(&export->exp_replay_count, 0);
         export->exp_obd = obd;
         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
         spin_lock_init(&export->exp_uncommitted_replies_lock);
@@ -802,28 +809,35 @@ struct obd_export *class_new_export(struct obd_device *obd,
         obd_init_export(export);
 
         spin_lock(&obd->obd_dev_lock);
+         /* shouldn't happen, but might race */
+        if (obd->obd_stopping)
+                GOTO(exit_err, rc = -ENODEV);
+
         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
-                rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
-                                            &export->exp_uuid_hash);
+                rc = cfs_hash_add_unique(obd->obd_uuid_hash, cluuid,
+                                         &export->exp_uuid_hash);
                 if (rc != 0) {
                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
                                       obd->obd_name, cluuid->uuid, rc);
-                        spin_unlock(&obd->obd_dev_lock);
-                        class_handle_unhash(&export->exp_handle);
-                        OBD_FREE_PTR(export);
-                        return ERR_PTR(-EALREADY);
+                        GOTO(exit_err, rc = -EALREADY);
                 }
         }
 
-        LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
         class_incref(obd, "export", export);
         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
         list_add_tail(&export->exp_obd_chain_timed,
                       &export->exp_obd->obd_exports_timed);
         export->exp_obd->obd_num_exports++;
         spin_unlock(&obd->obd_dev_lock);
+        RETURN(export);
 
-        return export;
+exit_err:
+        spin_unlock(&obd->obd_dev_lock);
+        class_handle_unhash(&export->exp_handle);
+        LASSERT(hlist_unhashed(&export->exp_uuid_hash));
+        obd_destroy_export(export);
+        OBD_FREE_PTR(export);
+        return ERR_PTR(rc);
 }
 EXPORT_SYMBOL(class_new_export);
 
@@ -834,24 +848,14 @@ void class_unlink_export(struct obd_export *exp)
         spin_lock(&exp->exp_obd->obd_dev_lock);
         /* delete an uuid-export hashitem from hashtables */
         if (!hlist_unhashed(&exp->exp_uuid_hash))
-                lustre_hash_del(exp->exp_obd->obd_uuid_hash,
-                                &exp->exp_client_uuid,
-                                &exp->exp_uuid_hash);
+                cfs_hash_del(exp->exp_obd->obd_uuid_hash,
+                             &exp->exp_client_uuid,
+                             &exp->exp_uuid_hash);
 
         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
         list_del_init(&exp->exp_obd_chain_timed);
         exp->exp_obd->obd_num_exports--;
         spin_unlock(&exp->exp_obd->obd_dev_lock);
-
-        /* Keep these counter valid always */
-        spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
-        if (exp->exp_delayed)
-                exp->exp_obd->obd_delayed_clients--;
-        else if (exp->exp_in_recovery)
-                exp->exp_obd->obd_recoverable_clients--;
-        else if (exp->exp_obd->obd_recovering)
-                exp->exp_obd->obd_max_recoverable_clients--;
-        spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
         class_export_put(exp);
 }
 EXPORT_SYMBOL(class_unlink_export);
@@ -895,7 +899,7 @@ struct obd_import *class_import_get(struct obd_import *import)
         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
         atomic_inc(&import->imp_refcount);
         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
-               atomic_read(&import->imp_refcount), 
+               atomic_read(&import->imp_refcount),
                import->imp_obd->obd_name);
         return import;
 }
@@ -910,7 +914,7 @@ void class_import_put(struct obd_import *imp)
         LASSERT(list_empty(&imp->imp_zombie_chain));
 
         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
-               atomic_read(&imp->imp_refcount) - 1, 
+               atomic_read(&imp->imp_refcount) - 1,
                imp->imp_obd->obd_name);
 
         if (atomic_dec_and_test(&imp->imp_refcount)) {
@@ -985,6 +989,49 @@ void class_destroy_import(struct obd_import *import)
 }
 EXPORT_SYMBOL(class_destroy_import);
 
+#if LUSTRE_TRACKS_LOCK_EXP_REFS
+
+void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
+{
+        spin_lock(&exp->exp_locks_list_guard);
+
+        LASSERT(lock->l_exp_refs_nr >= 0);
+
+        if (lock->l_exp_refs_target != NULL &&
+            lock->l_exp_refs_target != exp) {
+                LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
+                              exp, lock, lock->l_exp_refs_target);
+        }
+        if ((lock->l_exp_refs_nr ++) == 0) {
+                list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
+                lock->l_exp_refs_target = exp;
+        }
+        CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
+               lock, exp, lock->l_exp_refs_nr);
+        spin_unlock(&exp->exp_locks_list_guard);
+}
+EXPORT_SYMBOL(__class_export_add_lock_ref);
+
+void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
+{
+        spin_lock(&exp->exp_locks_list_guard);
+        LASSERT(lock->l_exp_refs_nr > 0);
+        if (lock->l_exp_refs_target != exp) {
+                LCONSOLE_WARN("lock %p, "
+                              "mismatching export pointers: %p, %p\n",
+                              lock, lock->l_exp_refs_target, exp);
+        }
+        if (-- lock->l_exp_refs_nr == 0) {
+                list_del_init(&lock->l_exp_refs_link);
+                lock->l_exp_refs_target = NULL;
+        }
+        CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
+               lock, exp, lock->l_exp_refs_nr);
+        spin_unlock(&exp->exp_locks_list_guard);
+}
+EXPORT_SYMBOL(__class_export_del_lock_ref);
+#endif
+
 /* A connection defines an export context in which preallocation can
    be managed. This releases the export pointer reference, and returns
    the export handle, so the export refcount is 1 when this function
@@ -1017,27 +1064,30 @@ void class_export_recovery_cleanup(struct obd_export *exp)
         struct obd_device *obd = exp->exp_obd;
 
         spin_lock_bh(&obd->obd_processing_task_lock);
+        if (exp->exp_delayed)
+                obd->obd_delayed_clients--;
         if (obd->obd_recovering && exp->exp_in_recovery) {
                 spin_lock(&exp->exp_lock);
                 exp->exp_in_recovery = 0;
                 spin_unlock(&exp->exp_lock);
+                LASSERT(obd->obd_connected_clients);
                 obd->obd_connected_clients--;
-                /* each connected client is counted as recoverable */
-                obd->obd_recoverable_clients--;
-                if (exp->exp_req_replay_needed) {
-                        spin_lock(&exp->exp_lock);
-                        exp->exp_req_replay_needed = 0;
-                        spin_unlock(&exp->exp_lock);
-                        LASSERT(atomic_read(&obd->obd_req_replay_clients));
-                        atomic_dec(&obd->obd_req_replay_clients);
-                }
-                if (exp->exp_lock_replay_needed) {
-                        spin_lock(&exp->exp_lock);
-                        exp->exp_lock_replay_needed = 0;
-                        spin_unlock(&exp->exp_lock);
-                        LASSERT(atomic_read(&obd->obd_lock_replay_clients));
-                        atomic_dec(&obd->obd_lock_replay_clients);
-                }
+        }
+        /** Cleanup req replay fields */
+        if (exp->exp_req_replay_needed) {
+                spin_lock(&exp->exp_lock);
+                exp->exp_req_replay_needed = 0;
+                spin_unlock(&exp->exp_lock);
+                LASSERT(atomic_read(&obd->obd_req_replay_clients));
+                atomic_dec(&obd->obd_req_replay_clients);
+        }
+        /** Cleanup lock replay data */
+        if (exp->exp_lock_replay_needed) {
+                spin_lock(&exp->exp_lock);
+                exp->exp_lock_replay_needed = 0;
+                spin_unlock(&exp->exp_lock);
+                LASSERT(atomic_read(&obd->obd_lock_replay_clients));
+                atomic_dec(&obd->obd_lock_replay_clients);
         }
         spin_unlock_bh(&obd->obd_processing_task_lock);
 }
@@ -1076,9 +1126,9 @@ int class_disconnect(struct obd_export *export)
                export->exp_handle.h_cookie);
 
         if (!hlist_unhashed(&export->exp_nid_hash))
-                lustre_hash_del(export->exp_obd->obd_nid_hash,
-                                &export->exp_connection->c_peer.nid,
-                                &export->exp_nid_hash);
+                cfs_hash_del(export->exp_obd->obd_nid_hash,
+                             &export->exp_connection->c_peer.nid,
+                             &export->exp_nid_hash);
 
         class_export_recovery_cleanup(export);
         class_unlink_export(export);
@@ -1087,6 +1137,20 @@ no_disconn:
         RETURN(0);
 }
 
+/* Return non-zero for a fully connected export */
+int class_connected_export(struct obd_export *exp)
+{
+        if (exp) {
+                int connected;
+                spin_lock(&exp->exp_lock);
+                connected = (exp->exp_conn_cnt > 0);
+                spin_unlock(&exp->exp_lock);
+                return connected;
+        }
+        return 0;
+}
+EXPORT_SYMBOL(class_connected_export);
+
 static void class_disconnect_export_list(struct list_head *list,
                                          enum obd_option flags)
 {
@@ -1159,40 +1223,43 @@ EXPORT_SYMBOL(class_disconnect_exports);
 /* Remove exports that have not completed recovery.
  */
 void class_disconnect_stale_exports(struct obd_device *obd,
-                                    int (*test_export)(struct obd_export *),
-                                    enum obd_option flags)
+                                    int (*test_export)(struct obd_export *))
 {
         struct list_head work_list;
         struct list_head *pos, *n;
         struct obd_export *exp;
+        int evicted = 0;
         ENTRY;
 
         CFS_INIT_LIST_HEAD(&work_list);
         spin_lock(&obd->obd_dev_lock);
-        obd->obd_stale_clients = 0;
         list_for_each_safe(pos, n, &obd->obd_exports) {
                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
                 if (test_export(exp))
                         continue;
 
-                list_move(&exp->exp_obd_chain, &work_list);
                 /* don't count self-export as client */
                 if (obd_uuid_equals(&exp->exp_client_uuid,
-                                     &exp->exp_obd->obd_uuid))
+                                    &exp->exp_obd->obd_uuid))
                         continue;
 
-                obd->obd_stale_clients++;
+                list_move(&exp->exp_obd_chain, &work_list);
+                evicted++;
                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
                        obd->obd_name, exp->exp_client_uuid.uuid,
                        exp->exp_connection == NULL ? "<unknown>" :
                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
+                print_export_data(exp, "EVICTING", 0);
         }
         spin_unlock(&obd->obd_dev_lock);
 
-        CDEBUG(D_HA, "%s: disconnecting %d stale clients\n", obd->obd_name,
-               obd->obd_stale_clients);
-
-        class_disconnect_export_list(&work_list, flags);
+        if (evicted) {
+                CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
+                       obd->obd_name, evicted);
+                obd->obd_stale_clients += evicted;
+        }
+        class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
+                                                 OBD_OPT_ABORT_RECOV);
         EXIT;
 }
 EXPORT_SYMBOL(class_disconnect_stale_exports);
@@ -1248,7 +1315,7 @@ int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
 
         do {
-                doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
+                doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
                 if (doomed_exp == NULL)
                         break;
 
@@ -1285,7 +1352,7 @@ int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
                 return exports_evicted;
         }
 
-        doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
+        doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
 
         if (doomed_exp == NULL) {
                 CERROR("%s: can't disconnect %s: no exports found\n",
@@ -1302,7 +1369,13 @@ int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
 }
 EXPORT_SYMBOL(obd_export_evict_by_uuid);
 
-static void print_export_data(struct obd_export *exp, const char *status)
+#if LUSTRE_TRACKS_LOCK_EXP_REFS
+void (*class_export_dump_hook)(struct obd_export*) = NULL;
+EXPORT_SYMBOL(class_export_dump_hook);
+#endif
+
+static void print_export_data(struct obd_export *exp, const char *status,
+                              int locks)
 {
         struct ptlrpc_reply_state *rs;
         struct ptlrpc_reply_state *first_reply = NULL;
@@ -1325,23 +1398,27 @@ static void print_export_data(struct obd_export *exp, const char *status)
                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
                nreplies, first_reply, nreplies > 3 ? "..." : "",
                exp->exp_last_committed);
+#if LUSTRE_TRACKS_LOCK_EXP_REFS
+        if (locks && class_export_dump_hook != NULL)
+                class_export_dump_hook(exp);
+#endif
 }
 
-void dump_exports(struct obd_device *obd)
+void dump_exports(struct obd_device *obd, int locks)
 {
         struct obd_export *exp;
 
         spin_lock(&obd->obd_dev_lock);
         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
-                print_export_data(exp, "ACTIVE");
+                print_export_data(exp, "ACTIVE", locks);
         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
-                print_export_data(exp, "UNLINKED");
+                print_export_data(exp, "UNLINKED", locks);
         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
-                print_export_data(exp, "DELAYED");
+                print_export_data(exp, "DELAYED", locks);
         spin_unlock(&obd->obd_dev_lock);
         spin_lock(&obd_zombie_impexp_lock);
         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
-                print_export_data(exp, "ZOMBIE");
+                print_export_data(exp, "ZOMBIE", locks);
         spin_unlock(&obd_zombie_impexp_lock);
 }
 EXPORT_SYMBOL(dump_exports);
@@ -1355,11 +1432,12 @@ void obd_exports_barrier(struct obd_device *obd)
                 spin_unlock(&obd->obd_dev_lock);
                 cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
                 if (waited > 5 && IS_PO2(waited)) {
-                        LCONSOLE_WARN("Waiting for obd_unlinked_exports "
+                        LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
                                       "more than %d seconds. "
                                       "The obd refcount = %d. Is it stuck?\n",
-                                      waited, atomic_read(&obd->obd_refcount));
-                        dump_exports(obd);
+                                      obd->obd_name, waited,
+                                      atomic_read(&obd->obd_refcount));
+                        dump_exports(obd, 0);
                 }
                 waited *= 2;
                 spin_lock(&obd->obd_dev_lock);
@@ -1412,6 +1490,7 @@ static struct completion        obd_zombie_start;
 static struct completion        obd_zombie_stop;
 static unsigned long            obd_zombie_flags;
 static cfs_waitq_t              obd_zombie_waitq;
+static pid_t                    obd_zombie_pid;
 
 enum {
         OBD_ZOMBIE_STOP   = 1 << 1
@@ -1493,6 +1572,10 @@ static int obd_zombie_is_idle(void)
 void obd_zombie_barrier(void)
 {
         struct l_wait_info lwi = { 0 };
+
+        if (obd_zombie_pid == cfs_curproc_pid())
+                /* don't wait for myself */
+                return;
         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
 }
 EXPORT_SYMBOL(obd_zombie_barrier);
@@ -1513,14 +1596,16 @@ static int obd_zombie_impexp_thread(void *unused)
 
         complete(&obd_zombie_start);
 
+        obd_zombie_pid = cfs_curproc_pid();
+
         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
                 struct l_wait_info lwi = { 0 };
 
-                l_wait_event(obd_zombie_waitq, 
+                l_wait_event(obd_zombie_waitq,
                              !obd_zombie_impexp_check(NULL), &lwi);
                 obd_zombie_impexp_cull();
 
-                /* 
+                /*
                  * Notify obd_zombie_barrier callers that queues
                  * may be empty.
                  */
@@ -1542,7 +1627,7 @@ int obd_zombie_impexp_kill(void *arg)
 {
         int rc = 0;
 
-       if (atomic_inc_return(&zombie_recur) == 1) {
+        if (atomic_inc_return(&zombie_recur) == 1) {
                 obd_zombie_impexp_cull();
                 rc = 1;
         }
@@ -1565,6 +1650,7 @@ int obd_zombie_impexp_init(void)
         init_completion(&obd_zombie_start);
         init_completion(&obd_zombie_stop);
         cfs_waitq_init(&obd_zombie_waitq);
+        obd_zombie_pid = 0;
 
 #ifdef __KERNEL__
         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
@@ -1599,3 +1685,4 @@ void obd_zombie_impexp_stop(void)
         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
 #endif
 }
+