Whamcloud - gitweb
Branch b1_6
authorbwzhou <bwzhou>
Mon, 28 Apr 2008 04:21:59 +0000 (04:21 +0000)
committerbwzhou <bwzhou>
Mon, 28 Apr 2008 04:21:59 +0000 (04:21 +0000)
b=11777
r=green, shadow

invalidate the import to halt request handling so the namespace can be safely
cleaned up

lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_resource.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mgs/mgs_handler.c
lustre/obdecho/echo.c
lustre/obdfilter/filter.c

index 55696b5..038fac3 100644 (file)
@@ -617,7 +617,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
 int ldlm_del_waiting_lock(struct ldlm_lock *lock);
 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock);
 int ldlm_get_ref(void);
-void ldlm_put_ref(int force);
+void ldlm_put_ref(void);
 
 /* ldlm_lock.c */
 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res);
@@ -699,7 +699,8 @@ void ldlm_unlink_lock_skiplist(struct ldlm_lock *req);
 struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, 
                                           ldlm_appetite_t apt);
 int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags);
-int ldlm_namespace_free(struct ldlm_namespace *ns, int force);
+void ldlm_namespace_free(struct ldlm_namespace *ns,
+                         struct obd_import *imp, int force);
 void ldlm_namespace_move(struct ldlm_namespace *ns, ldlm_side_t client);
 struct ldlm_namespace *ldlm_namespace_first(ldlm_side_t client);
 void ldlm_namespace_get(struct ldlm_namespace *ns);
index 1e8cf31..50dca93 100644 (file)
@@ -56,8 +56,9 @@ int ldlm_get_enq_timeout(struct ldlm_lock *lock);
 int ldlm_resource_putref_locked(struct ldlm_resource *res);
 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
                                      struct ldlm_lock *new);
-int ldlm_namespace_free_prior(struct ldlm_namespace *ns);
-int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force);
+void ldlm_namespace_free_prior(struct ldlm_namespace *ns, 
+                               struct obd_import *imp, int force);
+void ldlm_namespace_free_post(struct ldlm_namespace *ns);
 
 /* ldlm_lock.c */
 
index bdb1ac7..5f7a7a4 100644 (file)
@@ -345,7 +345,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
 err_import:
         class_destroy_import(imp);
 err_ldlm:
-        ldlm_put_ref(0);
+        ldlm_put_ref();
 err:
         RETURN(rc);
 
@@ -354,7 +354,7 @@ err:
 int client_obd_cleanup(struct obd_device *obddev)
 {
         ENTRY;
-        ldlm_put_ref(obddev->obd_force);
+        ldlm_put_ref();
         RETURN(0);
 }
 
@@ -418,7 +418,7 @@ int client_connect_import(struct lustre_handle *dlm_handle,
 
         if (rc) {
 out_ldlm:
-                ldlm_namespace_free_prior(obd->obd_namespace);
+                ldlm_namespace_free_prior(obd->obd_namespace, imp, 0);
                 to_be_freed = obd->obd_namespace;
                 obd->obd_namespace = NULL;
 out_disco:
@@ -430,7 +430,7 @@ out_disco:
 out_sem:
         mutex_up(&cli->cl_sem);
         if (to_be_freed)
-                ldlm_namespace_free_post(to_be_freed, 0);
+                ldlm_namespace_free_post(to_be_freed);
         return rc;
 }
 
@@ -480,7 +480,8 @@ int client_disconnect_export(struct obd_export *exp)
                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
                                        obd->obd_force ? LDLM_FL_LOCAL_ONLY:0,
                                        NULL);
-                ldlm_namespace_free_prior(obd->obd_namespace);
+                ldlm_namespace_free_prior(obd->obd_namespace, imp, 
+                                          obd->obd_force);
                 to_be_freed = obd->obd_namespace;
         }
 
@@ -504,7 +505,7 @@ int client_disconnect_export(struct obd_export *exp)
  out_sem:
         mutex_up(&cli->cl_sem);
         if (to_be_freed)
-                ldlm_namespace_free_post(to_be_freed, obd->obd_force);
+                ldlm_namespace_free_post(to_be_freed);
         RETURN(rc);
 }
 
index 850f540..80ec048 100644 (file)
@@ -1858,7 +1858,7 @@ static int ldlm_bl_thread_main(void *arg)
 #endif
 
 static int ldlm_setup(void);
-static int ldlm_cleanup(int force);
+static int ldlm_cleanup(void);
 
 int ldlm_get_ref(void)
 {
@@ -1875,12 +1875,12 @@ int ldlm_get_ref(void)
         RETURN(rc);
 }
 
-void ldlm_put_ref(int force)
+void ldlm_put_ref(void)
 {
         ENTRY;
         mutex_down(&ldlm_ref_sem);
         if (ldlm_refcount == 1) {
-                int rc = ldlm_cleanup(force);
+                int rc = ldlm_cleanup();
                 if (rc)
                         CERROR("ldlm_cleanup failed: %d\n", rc);
                 else
@@ -2027,7 +2027,7 @@ static int ldlm_setup(void)
         return rc;
 }
 
-static int ldlm_cleanup(int force)
+static int ldlm_cleanup(void)
 {
 #ifdef __KERNEL__
         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
index df5bc2c..2629a64 100644 (file)
@@ -369,7 +369,7 @@ out_hash:
 out_ns:
         OBD_FREE_PTR(ns);
 out_ref:
-        ldlm_put_ref(0);
+        ldlm_put_ref();
         RETURN(NULL);
 }
 
@@ -498,25 +498,11 @@ int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags)
         return ELDLM_OK;
 }
 
-int ldlm_namespace_free_prior(struct ldlm_namespace *ns)
+static int __ldlm_namespace_free(struct ldlm_namespace *ns, int force)
 {
         ENTRY;
-        if (!ns)
-                RETURN(ELDLM_OK);
-
-        mutex_down(ldlm_namespace_lock(ns->ns_client));
-        /*
-         * Some asserts and possibly other parts of code still using 
-         * list_empty(&ns->ns_list_chain). This is why it is important
-         * to use list_del_init() here.
-         */
-        list_del_init(&ns->ns_list_chain);
-        atomic_dec(ldlm_namespace_nr(ns->ns_client));
-        ldlm_pool_fini(&ns->ns_pool);
-        mutex_up(ldlm_namespace_lock(ns->ns_client));
-
         /* At shutdown time, don't call the cancellation callback */
-        ldlm_namespace_cleanup(ns, 0);
+        ldlm_namespace_cleanup(ns, force ? LDLM_FL_LOCAL_ONLY : 0);
 
         if (ns->ns_refcount > 0) {
                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
@@ -524,16 +510,30 @@ int ldlm_namespace_free_prior(struct ldlm_namespace *ns)
                 CDEBUG(D_DLMTRACE,
                        "dlm namespace %s free waiting on refcount %d\n",
                        ns->ns_name, ns->ns_refcount);
+force_wait:
+                if (force)
+                        lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
+
                 rc = l_wait_event(ns->ns_waitq,
                                   ns->ns_refcount == 0, &lwi);
-                if (ns->ns_refcount)
-                        LCONSOLE_ERROR_MSG(0x139, "Lock manager: wait for %s "
-                                           "namespace cleanup aborted with %d "
-                                           "resources in use. (%d)\nI'm going "
-                                           "to try to clean up anyway, but I "
-                                           "might need a reboot of this node.\n",
-                                            ns->ns_name, (int) ns->ns_refcount, 
-                                            rc);
+
+                /* Forced cleanups should be able to reclaim all references,
+                 * so it's safe to wait forever... we can't leak locks... */
+                if (force && rc == -ETIMEDOUT) {
+                        LCONSOLE_ERROR("Forced cleanup waiting for %s "
+                                       "namespace with %d resources in use, "
+                                       "(rc=%d)\n", ns->ns_name,
+                                       ns->ns_refcount, rc);
+                        GOTO(force_wait, rc);
+                }
+
+                if (ns->ns_refcount) {
+                        LCONSOLE_ERROR("Cleanup waiting for %s namespace "
+                                       "with %d resources in use, (rc=%d)\n",
+                                       ns->ns_name,
+                                       ns->ns_refcount, rc);
+                        RETURN(ELDLM_NAMESPACE_EXISTS);
+                }
                 CDEBUG(D_DLMTRACE,
                        "dlm namespace %s free done waiting\n", ns->ns_name);
         }
@@ -541,12 +541,50 @@ int ldlm_namespace_free_prior(struct ldlm_namespace *ns)
         RETURN(ELDLM_OK);
 }
 
-int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force)
+void ldlm_namespace_free_prior(struct ldlm_namespace *ns, 
+                               struct obd_import *imp, 
+                               int force)
 {
+        int rc;
         ENTRY;
-        if (!ns)
-                RETURN(ELDLM_OK);
+        if (!ns) {
+                EXIT;
+                return;
+        }
 
+        /* Can fail with -EINTR when force == 0 in which case try harder */
+        rc = __ldlm_namespace_free(ns, force);
+        if (rc != ELDLM_OK) {
+                if (imp) {
+                        ptlrpc_disconnect_import(imp, 0);
+                        ptlrpc_invalidate_import(imp);
+                }
+
+                /* With all requests dropped and the import inactive
+                 * we are gaurenteed all reference will be dropped. */
+                rc = __ldlm_namespace_free(ns, 1);
+                LASSERT(rc == 0);
+        }
+        EXIT;
+}
+
+void ldlm_namespace_free_post(struct ldlm_namespace *ns)
+{
+        ENTRY;
+        if (!ns) {
+                EXIT;
+                return;
+        }
+        mutex_down(ldlm_namespace_lock(ns->ns_client));
+        /*
+         * Some asserts and possibly other parts of code still using 
+         * list_empty(&ns->ns_list_chain). This is why it is important
+         * to use list_del_init() here.
+         */
+        list_del_init(&ns->ns_list_chain);
+        atomic_dec(ldlm_namespace_nr(ns->ns_client));
+        ldlm_pool_fini(&ns->ns_pool);
+        mutex_up(ldlm_namespace_lock(ns->ns_client));
 #ifdef LPROCFS
         {
                 struct proc_dir_entry *dir;
@@ -567,8 +605,8 @@ int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force)
          */
         LASSERT(list_empty(&ns->ns_list_chain));
         OBD_FREE_PTR(ns);
-        ldlm_put_ref(force);
-        RETURN(ELDLM_OK);
+        ldlm_put_ref();
+        EXIT;
 }
 
 /* Cleanup the resource, and free namespace.
@@ -588,11 +626,12 @@ int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force)
  * lprocfs entries, and then free memory. It will be called w/o cli->cl_sem 
  * held.
  */
-int ldlm_namespace_free(struct ldlm_namespace *ns, int force)
+void ldlm_namespace_free(struct ldlm_namespace *ns, 
+                         struct obd_import *imp, 
+                         int force)
 {
-        ldlm_namespace_free_prior(ns);
-        ldlm_namespace_free_post(ns, force);
-        return ELDLM_OK;
+        ldlm_namespace_free_prior(ns, imp, force);
+        ldlm_namespace_free_post(ns);
 }
 
 void ldlm_namespace_get_nolock(struct ldlm_namespace *ns)
index 0d40904..be6589e 100644 (file)
@@ -534,6 +534,7 @@ static void mdc_replay_open(struct ptlrpc_request *req)
                 EXIT;
                 return;
         }
+        DEBUG_REQ(D_ERROR, req, "mdc open data found");
 
         och = mod->mod_och;
         if (och != NULL) {
index 072591a..655cac4 100644 (file)
@@ -2095,7 +2095,7 @@ err_fs:
 err_ns:
         lprocfs_free_obd_stats(obd);
         lprocfs_obd_cleanup(obd);
-        ldlm_namespace_free(obd->obd_namespace, 0);
+        ldlm_namespace_free(obd->obd_namespace, NULL, 0);
         obd->obd_namespace = NULL;
 err_ops:
         fsfilt_put_ops(obd->obd_fsops);
@@ -2282,7 +2282,7 @@ static int mds_cleanup(struct obd_device *obd)
         server_put_mount(obd->obd_name, mds->mds_vfsmnt);
         obd->u.obt.obt_sb = NULL;
 
-        ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
+        ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
 
         spin_lock_bh(&obd->obd_processing_task_lock);
         if (obd->obd_recovering) {
index 6b7ba3a..c9ccaa6 100644 (file)
@@ -212,7 +212,7 @@ err_fs:
         /* No extra cleanup needed for llog_init_commit_thread() */
         mgs_fs_cleanup(obd);
 err_ns:
-        ldlm_namespace_free(obd->obd_namespace, 0);
+        ldlm_namespace_free(obd->obd_namespace, NULL, 0);
         obd->obd_namespace = NULL;
 err_ops:
         fsfilt_put_ops(obd->obd_fsops);
@@ -244,12 +244,11 @@ static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 static int mgs_ldlm_nsfree(void *data)
 {
         struct ldlm_namespace *ns = (struct ldlm_namespace *)data;
-        int rc;
         ENTRY;
 
         ptlrpc_daemonize("ll_mgs_nsfree");
-        rc = ldlm_namespace_free(ns, 1 /* obd_force should always be on */);
-        RETURN(rc);
+        ldlm_namespace_free(ns, NULL, 1 /* obd_force should always be on */);
+        RETURN(0);
 }
 
 static int mgs_cleanup(struct obd_device *obd)
index f8f9826..d49f801 100644 (file)
@@ -510,7 +510,7 @@ static int echo_cleanup(struct obd_device *obd)
         set_current_state (TASK_UNINTERRUPTIBLE);
         cfs_schedule_timeout (CFS_TASK_UNINT, cfs_time_seconds(1));
 
-        ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
+        ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
 
         leaked = atomic_read(&obd->u.echo.eo_prep);
         if (leaked != 0)
index 6ebdc58..2f82325 100644 (file)
@@ -2016,7 +2016,7 @@ static int filter_cleanup(struct obd_device *obd)
 
         lquota_cleanup(filter_quota_interface_ref, obd);
 
-        ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
+        ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
 
         if (obd->u.obt.obt_sb == NULL)
                 RETURN(0);