Whamcloud - gitweb
LU-12616 obclass: fix MDS start/stop race
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index d3737cb..980d225 100644 (file)
@@ -663,8 +663,7 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
 /**
  * Perform lock cleanup if AST reply came with error.
  */
-static int ldlm_handle_ast_error(const struct lu_env *env,
-                                struct ldlm_lock *lock,
+static int ldlm_handle_ast_error(struct ldlm_lock *lock,
                                 struct ptlrpc_request *req, int rc,
                                 const char *ast_type)
 {
@@ -726,7 +725,7 @@ static int ldlm_handle_ast_error(const struct lu_env *env,
                         * see b=23174
                         */
                        ldlm_resource_getref(res);
-                       ldlm_lvbo_update(env, res, lock, NULL, 1);
+                       ldlm_lvbo_update(res, lock, NULL, 1);
                        ldlm_resource_putref(res);
                }
                ldlm_lock_cancel(lock);
@@ -759,28 +758,25 @@ static int ldlm_cb_interpret(const struct lu_env *env,
                 *   -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274
                 */
                if (unlikely(arg->gl_interpret_reply)) {
-                       rc = arg->gl_interpret_reply(env, req, args, rc);
+                       rc = arg->gl_interpret_reply(NULL, req, args, rc);
                } else if (rc == -ELDLM_NO_LOCK_DATA) {
                        LDLM_DEBUG(lock,
                                   "lost race - client has a lock but no inode");
-                       ldlm_lvbo_update(env, lock->l_resource, lock, NULL, 1);
+                       ldlm_lvbo_update(lock->l_resource, lock, NULL, 1);
                } else if (rc != 0) {
-                       rc = ldlm_handle_ast_error(env, lock, req,
-                                                  rc, "glimpse");
+                       rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
                } else {
-                       rc = ldlm_lvbo_update(env, lock->l_resource,
+                       rc = ldlm_lvbo_update(lock->l_resource,
                                              lock, req, 1);
                }
                break;
        case LDLM_BL_CALLBACK:
                if (rc != 0)
-                       rc = ldlm_handle_ast_error(env, lock, req,
-                                                  rc, "blocking");
+                       rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
                break;
        case LDLM_CP_CALLBACK:
                if (rc != 0)
-                       rc = ldlm_handle_ast_error(env, lock, req,
-                                                  rc, "completion");
+                       rc = ldlm_handle_ast_error(lock, req, rc, "completion");
                break;
        default:
                LDLM_ERROR(lock, "invalid opcode for lock callback %d",
@@ -1037,12 +1033,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
        ldlm_lock2desc(lock, &body->lock_desc);
        if (lvb_len > 0) {
                void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
-               const struct lu_env *env = NULL;
-
-               if (req->rq_svc_thread)
-                       env = req->rq_svc_thread->t_env;
-
-               lvb_len = ldlm_lvbo_fill(env, lock, lvb, &lvb_len);
+               lvb_len = ldlm_lvbo_fill(lock, lvb, &lvb_len);
                if (lvb_len < 0) {
                        /*
                         * We still need to send the RPC to wake up the blocked
@@ -1198,7 +1189,7 @@ int ldlm_glimpse_locks(struct ldlm_resource *res,
        rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
                               LDLM_WORK_GL_AST);
        if (rc == -ERESTART)
-               ldlm_reprocess_all(res);
+               ldlm_reprocess_all(res, NULL);
 
        RETURN(rc);
 }
@@ -1322,7 +1313,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
        res = lock->l_resource;
        if (!(flags & LDLM_FL_REPLAY)) {
                /* non-replayed lock, delayed lvb init may need to be done */
-               rc = ldlm_lvbo_init(env, res);
+               rc = ldlm_lvbo_init(res);
                if (rc < 0) {
                        LDLM_DEBUG(lock, "delayed lvb init failed (rc %d)", rc);
                        GOTO(out, rc);
@@ -1500,7 +1491,7 @@ retry:
                        if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
                                int rc2;
 
-                               rc2 = ldlm_lvbo_fill(env, lock, buf, &buflen);
+                               rc2 = ldlm_lvbo_fill(lock, buf, &buflen);
                                if (rc2 >= 0) {
                                        req_capsule_shrink(&req->rq_pill,
                                                           &RMF_DLM_LVB,
@@ -1552,7 +1543,7 @@ retry:
 
                if (!err && !ldlm_is_cbpending(lock) &&
                    dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
-                       ldlm_reprocess_all(lock->l_resource);
+                       ldlm_reprocess_all(lock->l_resource, lock);
 
                LDLM_LOCK_RELEASE(lock);
        }
@@ -1648,7 +1639,7 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
                        ldlm_clear_blocking_data(lock);
                        unlock_res_and_lock(lock);
 
-                       ldlm_reprocess_all(lock->l_resource);
+                       ldlm_reprocess_all(lock->l_resource, NULL);
                        rc = ELDLM_OK;
                }
 
@@ -1682,7 +1673,6 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                        const struct ldlm_request *dlm_req,
                        int first, enum lustre_at_flags flags)
 {
-       const struct lu_env *env = req->rq_svc_thread->t_env;
        struct ldlm_resource *res, *pres = NULL;
        struct ldlm_lock *lock;
        int i, count, done = 0;
@@ -1724,7 +1714,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                 */
                if (res != pres) {
                        if (pres != NULL) {
-                               ldlm_reprocess_all(pres);
+                               ldlm_reprocess_all(pres, NULL);
                                LDLM_RESOURCE_DELREF(pres);
                                ldlm_resource_putref(pres);
                        }
@@ -1733,7 +1723,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                                LDLM_RESOURCE_ADDREF(res);
 
                                if (!ldlm_is_discard_data(lock))
-                                       ldlm_lvbo_update(env, res, lock,
+                                       ldlm_lvbo_update(res, lock,
                                                         NULL, 1);
                        }
                        pres = res;
@@ -1752,7 +1742,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                LDLM_LOCK_PUT(lock);
        }
        if (pres != NULL) {
-               ldlm_reprocess_all(pres);
+               ldlm_reprocess_all(pres, NULL);
                LDLM_RESOURCE_DELREF(pres);
                ldlm_resource_putref(pres);
        }
@@ -2826,11 +2816,23 @@ static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp,
  */
 static int ldlm_bl_thread_main(void *arg)
 {
+       struct lu_env *env;
        struct ldlm_bl_pool *blp;
        struct ldlm_bl_thread_data *bltd = arg;
+       int rc;
 
        ENTRY;
 
+       OBD_ALLOC_PTR(env);
+       if (!env)
+               RETURN(-ENOMEM);
+       rc = lu_env_init(env, LCT_DT_THREAD);
+       if (rc)
+               GOTO(out_env, rc);
+       rc = lu_env_add(env);
+       if (rc)
+               GOTO(out_env_fini, rc);
+
        blp = bltd->bltd_blp;
 
        complete(&bltd->bltd_comp);
@@ -2875,7 +2877,13 @@ static int ldlm_bl_thread_main(void *arg)
 
        atomic_dec(&blp->blp_num_threads);
        complete(&blp->blp_comp);
-       RETURN(0);
+
+       lu_env_remove(env);
+out_env_fini:
+       lu_env_fini(env);
+out_env:
+       OBD_FREE_PTR(env);
+       RETURN(rc);
 }
 
 
@@ -3336,11 +3344,17 @@ int ldlm_init(void)
                goto out_interval;
 
 #ifdef HAVE_SERVER_SUPPORT
+       ldlm_inodebits_slab = kmem_cache_create("ldlm_ibits_node",
+                                               sizeof(struct ldlm_ibits_node),
+                                               0, SLAB_HWCACHE_ALIGN, NULL);
+       if (ldlm_inodebits_slab == NULL)
+               goto out_interval_tree;
+
        ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem",
                                        sizeof(struct ldlm_glimpse_work),
                                        0, 0, NULL);
        if (ldlm_glimpse_work_kmem == NULL)
-               goto out_interval_tree;
+               goto out_inodebits;
 #endif
 
 #if LUSTRE_TRACKS_LOCK_EXP_REFS
@@ -3348,6 +3362,8 @@ int ldlm_init(void)
 #endif
        return 0;
 #ifdef HAVE_SERVER_SUPPORT
+out_inodebits:
+       kmem_cache_destroy(ldlm_inodebits_slab);
 out_interval_tree:
        kmem_cache_destroy(ldlm_interval_tree_slab);
 #endif
@@ -3368,14 +3384,15 @@ void ldlm_exit(void)
        kmem_cache_destroy(ldlm_resource_slab);
        /*
         * ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
-        * synchronize_rcu() to wait a grace period elapsed, so that
-        * ldlm_lock_free() get a chance to be called.
+        * rcu_barrier() to wait all outstanding RCU callbacks to complete,
+        * so that ldlm_lock_free() get a chance to be called.
         */
-       synchronize_rcu();
+       rcu_barrier();
        kmem_cache_destroy(ldlm_lock_slab);
        kmem_cache_destroy(ldlm_interval_slab);
        kmem_cache_destroy(ldlm_interval_tree_slab);
 #ifdef HAVE_SERVER_SUPPORT
+       kmem_cache_destroy(ldlm_inodebits_slab);
        kmem_cache_destroy(ldlm_glimpse_work_kmem);
 #endif
 }