Whamcloud - gitweb
Branch HEAD
authorbwzhou <bwzhou>
Sat, 24 May 2008 13:27:52 +0000 (13:27 +0000)
committerbwzhou <bwzhou>
Sat, 24 May 2008 13:27:52 +0000 (13:27 +0000)
b=11777
r=green, shadow

invalidate the import to halt request handling so the namespace can be safely
cleaned up

lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_resource.c
lustre/mdt/mdt_handler.c
lustre/mgs/mgs_handler.c
lustre/obdecho/echo.c
lustre/obdfilter/filter.c

index 34d17de..b8f95c0 100644 (file)
@@ -620,7 +620,7 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock);
 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock);
 void ldlm_revoke_export_locks(struct obd_export *exp);
 int ldlm_get_ref(void);
 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock);
 void ldlm_revoke_export_locks(struct obd_export *exp);
 int ldlm_get_ref(void);
-void ldlm_put_ref(int force);
+void ldlm_put_ref(void);
 
 /* ldlm_lock.c */
 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res);
 
 /* ldlm_lock.c */
 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res);
@@ -703,7 +703,8 @@ void ldlm_unlink_lock_skiplist(struct ldlm_lock *req);
 struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, 
                                           ldlm_appetite_t apt);
 int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags);
 struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, 
                                           ldlm_appetite_t apt);
 int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags);
-int ldlm_namespace_free(struct ldlm_namespace *ns, int force);
+void ldlm_namespace_free(struct ldlm_namespace *ns, 
+                         struct obd_import *imp, int force);
 void ldlm_namespace_move(struct ldlm_namespace *ns, ldlm_side_t client);
 struct ldlm_namespace *ldlm_namespace_first(ldlm_side_t client);
 void ldlm_namespace_get(struct ldlm_namespace *ns);
 void ldlm_namespace_move(struct ldlm_namespace *ns, ldlm_side_t client);
 struct ldlm_namespace *ldlm_namespace_first(ldlm_side_t client);
 void ldlm_namespace_get(struct ldlm_namespace *ns);
index 3127e8c..4cf0798 100644 (file)
@@ -54,8 +54,9 @@ int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns, int count, int max,
 int ldlm_resource_putref_locked(struct ldlm_resource *res);
 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
                                      struct ldlm_lock *new);
 int ldlm_resource_putref_locked(struct ldlm_resource *res);
 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
                                      struct ldlm_lock *new);
-int ldlm_namespace_free_prior(struct ldlm_namespace *ns);
-int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force);
+void ldlm_namespace_free_prior(struct ldlm_namespace *ns,
+                               struct obd_import *imp, int force);
+void ldlm_namespace_free_post(struct ldlm_namespace *ns);
 /* ldlm_lock.c */
 
 /* Number of blocking/completion callbacks that will be sent in
 /* ldlm_lock.c */
 
 /* Number of blocking/completion callbacks that will be sent in
index 2988abe..5d2e464 100644 (file)
@@ -354,7 +354,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
 err_import:
         class_destroy_import(imp);
 err_ldlm:
 err_import:
         class_destroy_import(imp);
 err_ldlm:
-        ldlm_put_ref(0);
+        ldlm_put_ref();
 err:
         RETURN(rc);
 
 err:
         RETURN(rc);
 
@@ -364,7 +364,7 @@ int client_obd_cleanup(struct obd_device *obddev)
 {
         ENTRY;
         sptlrpc_rule_set_free(&obddev->u.cli.cl_sptlrpc_rset);
 {
         ENTRY;
         sptlrpc_rule_set_free(&obddev->u.cli.cl_sptlrpc_rset);
-        ldlm_put_ref(obddev->obd_force);
+        ldlm_put_ref();
         RETURN(0);
 }
 
         RETURN(0);
 }
 
@@ -431,7 +431,7 @@ int client_connect_import(const struct lu_env *env,
 
         if (rc) {
 out_ldlm:
 
         if (rc) {
 out_ldlm:
-                ldlm_namespace_free_prior(obd->obd_namespace);
+                ldlm_namespace_free_prior(obd->obd_namespace, imp, 0);
                 to_be_freed = obd->obd_namespace;
                 obd->obd_namespace = NULL;
 out_disco:
                 to_be_freed = obd->obd_namespace;
                 obd->obd_namespace = NULL;
 out_disco:
@@ -443,7 +443,7 @@ out_disco:
 out_sem:
         mutex_up(&cli->cl_sem);
         if (to_be_freed)
 out_sem:
         mutex_up(&cli->cl_sem);
         if (to_be_freed)
-                ldlm_namespace_free_post(to_be_freed, 1);
+                ldlm_namespace_free_post(to_be_freed);
 
         return rc;
 }
 
         return rc;
 }
@@ -494,7 +494,7 @@ int client_disconnect_export(struct obd_export *exp)
                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
                                        obd->obd_force ? LDLM_FL_LOCAL_ONLY:0,
                                        NULL);
                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
                                        obd->obd_force ? LDLM_FL_LOCAL_ONLY:0,
                                        NULL);
-                ldlm_namespace_free_prior(obd->obd_namespace);
+                ldlm_namespace_free_prior(obd->obd_namespace, imp, obd->obd_force);
                 to_be_freed = obd->obd_namespace;
         }
 
                 to_be_freed = obd->obd_namespace;
         }
 
@@ -518,7 +518,7 @@ int client_disconnect_export(struct obd_export *exp)
  out_sem:
         mutex_up(&cli->cl_sem);
         if (to_be_freed)
  out_sem:
         mutex_up(&cli->cl_sem);
         if (to_be_freed)
-                ldlm_namespace_free_post(to_be_freed, obd->obd_force);
+                ldlm_namespace_free_post(to_be_freed);
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
index f345503..7ba227c 100644 (file)
@@ -1969,7 +1969,7 @@ static int ldlm_bl_thread_main(void *arg)
 #endif
 
 static int ldlm_setup(void);
 #endif
 
 static int ldlm_setup(void);
-static int ldlm_cleanup(int force);
+static int ldlm_cleanup(void);
 
 int ldlm_get_ref(void)
 {
 
 int ldlm_get_ref(void)
 {
@@ -1986,12 +1986,12 @@ int ldlm_get_ref(void)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-void ldlm_put_ref(int force)
+void ldlm_put_ref(void)
 {
         ENTRY;
         mutex_down(&ldlm_ref_sem);
         if (ldlm_refcount == 1) {
 {
         ENTRY;
         mutex_down(&ldlm_ref_sem);
         if (ldlm_refcount == 1) {
-                int rc = ldlm_cleanup(force);
+                int rc = ldlm_cleanup();
                 if (rc)
                         CERROR("ldlm_cleanup failed: %d\n", rc);
                 else
                 if (rc)
                         CERROR("ldlm_cleanup failed: %d\n", rc);
                 else
@@ -2139,7 +2139,7 @@ static int ldlm_setup(void)
         return rc;
 }
 
         return rc;
 }
 
-static int ldlm_cleanup(int force)
+static int ldlm_cleanup(void)
 {
 #ifdef __KERNEL__
         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
 {
 #ifdef __KERNEL__
         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
index aebfe5a..2c93142 100644 (file)
@@ -372,7 +372,7 @@ out_hash:
 out_ns:
         OBD_FREE_PTR(ns);
 out_ref:
 out_ns:
         OBD_FREE_PTR(ns);
 out_ref:
-        ldlm_put_ref(0);
+        ldlm_put_ref();
         RETURN(NULL);
 }
 
         RETURN(NULL);
 }
 
@@ -501,26 +501,12 @@ int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags)
         return ELDLM_OK;
 }
 
         return ELDLM_OK;
 }
 
-/* Cleanup, but also free, the namespace */
-int ldlm_namespace_free_prior(struct ldlm_namespace *ns)
+static int __ldlm_namespace_free(struct ldlm_namespace *ns, int force)
 {
         ENTRY;
 {
         ENTRY;
-        if (!ns)
-                RETURN(ELDLM_OK);
-
-        mutex_down(ldlm_namespace_lock(ns->ns_client));
-        /*
-         * Some asserts and possibly other parts of code still using 
-         * list_empty(&ns->ns_list_chain). This is why it is important
-         * to use list_del_init() here.
-         */
-        list_del_init(&ns->ns_list_chain);
-        atomic_dec(ldlm_namespace_nr(ns->ns_client));
-        ldlm_pool_fini(&ns->ns_pool);
-        mutex_up(ldlm_namespace_lock(ns->ns_client));
 
         /* At shutdown time, don't call the cancellation callback */
 
         /* At shutdown time, don't call the cancellation callback */
-        ldlm_namespace_cleanup(ns, 0);
+        ldlm_namespace_cleanup(ns, force ? LDLM_FL_LOCAL_ONLY : 0);
 
         if (ns->ns_refcount > 0) {
                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
 
         if (ns->ns_refcount > 0) {
                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
@@ -528,16 +514,30 @@ int ldlm_namespace_free_prior(struct ldlm_namespace *ns)
                 CDEBUG(D_DLMTRACE,
                        "dlm namespace %s free waiting on refcount %d\n",
                        ns->ns_name, ns->ns_refcount);
                 CDEBUG(D_DLMTRACE,
                        "dlm namespace %s free waiting on refcount %d\n",
                        ns->ns_name, ns->ns_refcount);
+force_wait:
+                if (force)
+                        lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
+
                 rc = l_wait_event(ns->ns_waitq,
                                   ns->ns_refcount == 0, &lwi);
                 rc = l_wait_event(ns->ns_waitq,
                                   ns->ns_refcount == 0, &lwi);
-                if (ns->ns_refcount)
-                        LCONSOLE_ERROR_MSG(0x139, "Lock manager: wait for %s "
-                                           "namespace cleanup aborted with %d "
-                                           "resources in use. (%d)\nI'm going "
-                                           "to try to clean up anyway, but I "
-                                           "might need a reboot of this node.\n",
-                                            ns->ns_name, (int) ns->ns_refcount, 
-                                            rc);
+
+                /* Forced cleanups should be able to reclaim all references,
+                 * so it's safe to wait forever... we can't leak locks... */
+                if (force && rc == -ETIMEDOUT) {
+                        LCONSOLE_ERROR("Forced cleanup waiting for %s "
+                                       "namespace with %d resources in use, "
+                                       "(rc=%d)\n", ns->ns_name,
+                                       ns->ns_refcount, rc);
+                        GOTO(force_wait, rc);
+                }
+
+                if (ns->ns_refcount) {
+                        LCONSOLE_ERROR("Cleanup waiting for %s namespace "
+                                       "with %d resources in use, (rc=%d)\n",
+                                       ns->ns_name,
+                                       ns->ns_refcount, rc);
+                        RETURN(ELDLM_NAMESPACE_EXISTS);
+                }
                 CDEBUG(D_DLMTRACE,
                        "dlm namespace %s free done waiting\n", ns->ns_name);
         }
                 CDEBUG(D_DLMTRACE,
                        "dlm namespace %s free done waiting\n", ns->ns_name);
         }
@@ -545,12 +545,51 @@ int ldlm_namespace_free_prior(struct ldlm_namespace *ns)
         RETURN(ELDLM_OK);
 }
 
         RETURN(ELDLM_OK);
 }
 
-int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force)
+void ldlm_namespace_free_prior(struct ldlm_namespace *ns, 
+                               struct obd_import *imp, 
+                               int force)
 {
 {
+        int rc;
         ENTRY;
         ENTRY;
-        if (!ns)
-                RETURN(ELDLM_OK);
+        if (!ns) {
+                EXIT;
+                return;
+        }
+
+        /* Can fail with -EINTR when force == 0 in which case try harder */
+        rc = __ldlm_namespace_free(ns, force);
+        if (rc != ELDLM_OK) {
+                if (imp) {
+                        ptlrpc_disconnect_import(imp, 0);
+                        ptlrpc_invalidate_import(imp);
+                }
+
+                /* With all requests dropped and the import inactive
+                 * we are gaurenteed all reference will be dropped. */
+                rc = __ldlm_namespace_free(ns, 1);
+                LASSERT(rc == 0);
+        }
+        EXIT;
+}
+
+void ldlm_namespace_free_post(struct ldlm_namespace *ns)
+{
+        ENTRY;
+        if (!ns) {
+                EXIT;
+                return;
+        }
 
 
+        mutex_down(ldlm_namespace_lock(ns->ns_client));
+        /*
+         * Some asserts and possibly other parts of code still using 
+         * list_empty(&ns->ns_list_chain). This is why it is important
+         * to use list_del_init() here.
+         */
+        list_del_init(&ns->ns_list_chain);
+        atomic_dec(ldlm_namespace_nr(ns->ns_client));
+        ldlm_pool_fini(&ns->ns_pool);
+        mutex_up(ldlm_namespace_lock(ns->ns_client));
 #ifdef LPROCFS
         {
                 struct proc_dir_entry *dir;
 #ifdef LPROCFS
         {
                 struct proc_dir_entry *dir;
@@ -572,8 +611,8 @@ int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force)
          */
         LASSERT(list_empty(&ns->ns_list_chain));
         OBD_FREE_PTR(ns);
          */
         LASSERT(list_empty(&ns->ns_list_chain));
         OBD_FREE_PTR(ns);
-        ldlm_put_ref(force);
-        RETURN(ELDLM_OK);
+        ldlm_put_ref();
+        EXIT;
 }
 
 
 }
 
 
@@ -594,11 +633,12 @@ int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force)
  * lprocfs entries, and then free memory. It will be called w/o cli->cl_sem
  * held.
  */
  * lprocfs entries, and then free memory. It will be called w/o cli->cl_sem
  * held.
  */
-int ldlm_namespace_free(struct ldlm_namespace *ns, int force)
+void ldlm_namespace_free(struct ldlm_namespace *ns, 
+                         struct obd_import *imp,
+                         int force)
 {
 {
-        ldlm_namespace_free_prior(ns);
-        ldlm_namespace_free_post(ns, force);
-        return ELDLM_OK;
+        ldlm_namespace_free_prior(ns, imp, force);
+        ldlm_namespace_free_post(ns);
 }
 
 
 }
 
 
index 1c5efc2..2e35988 100644 (file)
@@ -3778,7 +3778,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
         m->mdt_identity_cache = NULL;
 
         if (m->mdt_namespace != NULL) {
         m->mdt_identity_cache = NULL;
 
         if (m->mdt_namespace != NULL) {
-                ldlm_namespace_free(m->mdt_namespace, d->ld_obd->obd_force);
+                ldlm_namespace_free(m->mdt_namespace, NULL, d->ld_obd->obd_force);
                 d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
         }
 
                 d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
         }
 
@@ -4035,7 +4035,7 @@ err_capa:
 err_free_ns:
         upcall_cache_cleanup(m->mdt_identity_cache);
         m->mdt_identity_cache = NULL;
 err_free_ns:
         upcall_cache_cleanup(m->mdt_identity_cache);
         m->mdt_identity_cache = NULL;
-        ldlm_namespace_free(m->mdt_namespace, 0);
+        ldlm_namespace_free(m->mdt_namespace, NULL, 0);
         obd->obd_namespace = m->mdt_namespace = NULL;
 err_fini_seq:
         mdt_seq_fini(env, m);
         obd->obd_namespace = m->mdt_namespace = NULL;
 err_fini_seq:
         mdt_seq_fini(env, m);
index 582e55a..4781703 100644 (file)
@@ -248,7 +248,7 @@ err_fs:
         /* No extra cleanup needed for llog_init_commit_thread() */
         mgs_fs_cleanup(obd);
 err_ns:
         /* No extra cleanup needed for llog_init_commit_thread() */
         mgs_fs_cleanup(obd);
 err_ns:
-        ldlm_namespace_free(obd->obd_namespace, 0);
+        ldlm_namespace_free(obd->obd_namespace, NULL, 0);
         obd->obd_namespace = NULL;
 err_ops:
         fsfilt_put_ops(obd->obd_fsops);
         obd->obd_namespace = NULL;
 err_ops:
         fsfilt_put_ops(obd->obd_fsops);
@@ -279,12 +279,11 @@ static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 static int mgs_ldlm_nsfree(void *data)
 {
         struct ldlm_namespace *ns = (struct ldlm_namespace *)data;
 static int mgs_ldlm_nsfree(void *data)
 {
         struct ldlm_namespace *ns = (struct ldlm_namespace *)data;
-        int rc;
         ENTRY;
 
         ptlrpc_daemonize("ll_mgs_nsfree");
         ENTRY;
 
         ptlrpc_daemonize("ll_mgs_nsfree");
-        rc = ldlm_namespace_free(ns, 1 /* obd_force should always be on */);
-        RETURN(rc);
+        ldlm_namespace_free(ns, NULL, 1 /* obd_force should always be on */);
+        RETURN(0);
 }
 
 static int mgs_cleanup(struct obd_device *obd)
 }
 
 static int mgs_cleanup(struct obd_device *obd)
index 2fe7c07..197d3cd 100644 (file)
@@ -511,7 +511,7 @@ static int echo_cleanup(struct obd_device *obd)
         set_current_state (TASK_UNINTERRUPTIBLE);
         cfs_schedule_timeout (CFS_TASK_UNINT, cfs_time_seconds(1));
 
         set_current_state (TASK_UNINTERRUPTIBLE);
         cfs_schedule_timeout (CFS_TASK_UNINT, cfs_time_seconds(1));
 
-        ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
+        ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
 
         leaked = atomic_read(&obd->u.echo.eo_prep);
         if (leaked != 0)
 
         leaked = atomic_read(&obd->u.echo.eo_prep);
         if (leaked != 0)
index f00b738..3b3d1fc 100644 (file)
@@ -2402,7 +2402,7 @@ static int filter_cleanup(struct obd_device *obd)
         target_stop_recovery_thread(obd);
         target_cleanup_recovery(obd);
 
         target_stop_recovery_thread(obd);
         target_cleanup_recovery(obd);
 
-        ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
+        ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
 
         sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset);
 
 
         sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset);