Whamcloud - gitweb
LU-11102 ldlm: don't skip bl_ast for local lock
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index b5c68b2..5e7afe6 100644 (file)
@@ -125,8 +125,6 @@ const char *ldlm_it2str(enum ldlm_intent_flags it)
                return "getattr";
        case IT_LOOKUP:
                return "lookup";
-       case IT_UNLINK:
-               return "unlink";
        case IT_GETXATTR:
                return "getxattr";
        case IT_LAYOUT:
@@ -236,6 +234,8 @@ int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
                struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 
                LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
+               if (ns->ns_last_pos == &lock->l_lru)
+                       ns->ns_last_pos = lock->l_lru.prev;
                list_del_init(&lock->l_lru);
                LASSERT(ns->ns_nr_unused > 0);
                ns->ns_nr_unused--;
@@ -286,7 +286,6 @@ void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
        LASSERT(list_empty(&lock->l_lru));
        LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
        list_add_tail(&lock->l_lru, &ns->ns_unused_list);
-       ldlm_clear_skipped(lock);
        LASSERT(ns->ns_nr_unused >= 0);
        ns->ns_nr_unused++;
 }
@@ -479,12 +478,13 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
 
         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
                              LDLM_NSS_LOCKS);
-       INIT_LIST_HEAD(&lock->l_handle.h_link);
+       INIT_LIST_HEAD_RCU(&lock->l_handle.h_link);
        class_handle_hash(&lock->l_handle, &lock_handle_ops);
 
         lu_ref_init(&lock->l_reference);
         lu_ref_add(&lock->l_reference, "hash", lock);
         lock->l_callback_timeout = 0;
+       lock->l_activity = 0;
 
 #if LUSTRE_TRACKS_LOCK_EXP_REFS
        INIT_LIST_HEAD(&lock->l_exp_refs_link);
@@ -665,12 +665,19 @@ static void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
                 * discard dirty data, rather than writing back. */
                if (ldlm_is_ast_discard_data(new))
                        ldlm_set_discard_data(lock);
-               LASSERT(list_empty(&lock->l_bl_ast));
-               list_add(&lock->l_bl_ast, work_list);
-                LDLM_LOCK_GET(lock);
-                LASSERT(lock->l_blocking_lock == NULL);
-                lock->l_blocking_lock = LDLM_LOCK_GET(new);
-        }
+
+               /* Lock can be converted from a blocking state back to granted
+                * after lock convert or COS downgrade but still be in an
+                * older bl_list because it is controlled only by
+                * ldlm_work_bl_ast_lock(), let it be processed there.
+                */
+               if (list_empty(&lock->l_bl_ast)) {
+                       list_add(&lock->l_bl_ast, work_list);
+                       LDLM_LOCK_GET(lock);
+               }
+               LASSERT(lock->l_blocking_lock == NULL);
+               lock->l_blocking_lock = LDLM_LOCK_GET(new);
+       }
 }
 
 /**
@@ -868,7 +875,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
         } else if (ns_is_client(ns) &&
                    !lock->l_readers && !lock->l_writers &&
                   !ldlm_is_no_lru(lock) &&
-                  !ldlm_is_bl_ast(lock)) {
+                  !ldlm_is_bl_ast(lock) &&
+                  !ldlm_is_converting(lock)) {
 
                 LDLM_DEBUG(lock, "add lock into lru list");
 
@@ -1726,7 +1734,8 @@ restart:
  * set, skip all the enqueueing and delegate lock processing to intent policy
  * function.
  */
-enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
+enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
+                                 struct ldlm_namespace *ns,
                                  struct ldlm_lock **lockp,
                                  void *cookie, __u64 *flags)
 {
@@ -1740,8 +1749,8 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
         /* policies are not executed on the client or during replay */
         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
             && !local && ns->ns_policy) {
-                rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,
-                                   NULL);
+               rc = ns->ns_policy(env, ns, lockp, cookie, lock->l_req_mode,
+                                  *flags, NULL);
                 if (rc == ELDLM_LOCK_REPLACED) {
                         /* The lock that was returned has already been granted,
                          * and placed into lockp.  If it's not the same as the
@@ -1923,8 +1932,6 @@ restart:
        if (!list_empty(&bl_ast_list)) {
                unlock_res(res);
 
-               LASSERT(intention == LDLM_PROCESS_RECOVERY);
-
                rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list,
                                       LDLM_WORK_BL_AST);
 
@@ -2016,27 +2023,21 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
  */
 void ldlm_discard_bl_list(struct list_head *bl_list)
 {
-       struct list_head *tmp, *pos;
-        ENTRY;
+       struct ldlm_lock *lock, *tmp;
 
-       list_for_each_safe(pos, tmp, bl_list) {
-                struct ldlm_lock *lock =
-                       list_entry(pos, struct ldlm_lock, l_bl_ast);
+       ENTRY;
 
+       list_for_each_entry_safe(lock, tmp, bl_list, l_bl_ast) {
+               LASSERT(!list_empty(&lock->l_bl_ast));
                list_del_init(&lock->l_bl_ast);
-               LASSERT(ldlm_is_ast_sent(lock));
                ldlm_clear_ast_sent(lock);
                LASSERT(lock->l_bl_ast_run == 0);
-               LASSERT(lock->l_blocking_lock);
-               LDLM_LOCK_RELEASE(lock->l_blocking_lock);
-               lock->l_blocking_lock = NULL;
+               ldlm_clear_blocking_lock(lock);
                LDLM_LOCK_RELEASE(lock);
        }
        EXIT;
 }
 
-#endif
-
 /**
  * Process a call to blocking AST callback for a lock in ast_work list
  */
@@ -2044,9 +2045,11 @@ static int
 ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 {
        struct ldlm_cb_set_arg *arg = opaq;
-       struct ldlm_lock_desc   d;
-       int                     rc;
-       struct ldlm_lock       *lock;
+       struct ldlm_lock *lock;
+       struct ldlm_lock_desc d;
+       struct ldlm_bl_desc bld;
+       int rc;
+
        ENTRY;
 
        if (list_empty(arg->list))
@@ -2054,66 +2057,49 @@ ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 
        lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast);
 
-       /* nobody should touch l_bl_ast */
+       /* nobody should touch l_bl_ast but some locks in the list may become
+        * granted after lock convert or COS downgrade, these locks should be
+        * just skipped here and removed from the list.
+        */
        lock_res_and_lock(lock);
        list_del_init(&lock->l_bl_ast);
 
+       /* lock is not blocking lock anymore, but was kept in the list because
+        * it can managed only here.
+        */
+       if (!ldlm_is_ast_sent(lock)) {
+               unlock_res_and_lock(lock);
+               LDLM_LOCK_RELEASE(lock);
+               RETURN(0);
+       }
+
+       LASSERT(lock->l_blocking_lock);
+       ldlm_lock2desc(lock->l_blocking_lock, &d);
+       /* copy blocking lock ibits in cancel_bits as well,
+        * new client may use them for lock convert and it is
+        * important to use new field to convert locks from
+        * new servers only
+        */
+       d.l_policy_data.l_inodebits.cancel_bits =
+               lock->l_blocking_lock->l_policy_data.l_inodebits.bits;
+
+       /* Blocking lock is being destroyed here but some information about it
+        * may be needed inside l_blocking_ast() function below,
+        * e.g. in mdt_blocking_ast(). So save needed data in bl_desc.
+        */
+       bld.bl_same_client = lock->l_client_cookie ==
+                            lock->l_blocking_lock->l_client_cookie;
+       bld.bl_cos_incompat = ldlm_is_cos_incompat(lock->l_blocking_lock);
+       arg->bl_desc = &bld;
+
        LASSERT(ldlm_is_ast_sent(lock));
        LASSERT(lock->l_bl_ast_run == 0);
-       LASSERT(lock->l_blocking_lock);
        lock->l_bl_ast_run++;
+       ldlm_clear_blocking_lock(lock);
        unlock_res_and_lock(lock);
 
-       ldlm_lock2desc(lock->l_blocking_lock, &d);
-
        rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
-       LDLM_LOCK_RELEASE(lock->l_blocking_lock);
-       lock->l_blocking_lock = NULL;
-       LDLM_LOCK_RELEASE(lock);
-
-       RETURN(rc);
-}
 
-/**
- * Process a call to completion AST callback for a lock in ast_work list
- */
-static int
-ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
-{
-       struct ldlm_cb_set_arg  *arg = opaq;
-       int                      rc = 0;
-       struct ldlm_lock        *lock;
-       ldlm_completion_callback completion_callback;
-       ENTRY;
-
-       if (list_empty(arg->list))
-               RETURN(-ENOENT);
-
-       lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
-
-       /* It's possible to receive a completion AST before we've set
-        * the l_completion_ast pointer: either because the AST arrived
-        * before the reply, or simply because there's a small race
-        * window between receiving the reply and finishing the local
-        * enqueue. (bug 842)
-        *
-        * This can't happen with the blocking_ast, however, because we
-        * will never call the local blocking_ast until we drop our
-        * reader/writer reference, which we won't do until we get the
-        * reply and finish enqueueing. */
-
-       /* nobody should touch l_cp_ast */
-       lock_res_and_lock(lock);
-       list_del_init(&lock->l_cp_ast);
-       LASSERT(ldlm_is_cp_reqd(lock));
-       /* save l_completion_ast since it can be changed by
-        * mds_intent_policy(), see bug 14225 */
-       completion_callback = lock->l_completion_ast;
-       ldlm_clear_cp_reqd(lock);
-       unlock_res_and_lock(lock);
-
-       if (completion_callback != NULL)
-               rc = completion_callback(lock, 0, (void *)arg);
        LDLM_LOCK_RELEASE(lock);
 
        RETURN(rc);
@@ -2185,6 +2171,53 @@ int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
 
        RETURN(rc);
 }
+#endif
+
+/**
+ * Process a call to completion AST callback for a lock in ast_work list
+ */
+static int
+ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
+{
+       struct ldlm_cb_set_arg *arg = opaq;
+       struct ldlm_lock *lock;
+       ldlm_completion_callback completion_callback;
+       int rc = 0;
+
+       ENTRY;
+
+       if (list_empty(arg->list))
+               RETURN(-ENOENT);
+
+       lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast);
+
+       /* It's possible to receive a completion AST before we've set
+        * the l_completion_ast pointer: either because the AST arrived
+        * before the reply, or simply because there's a small race
+        * window between receiving the reply and finishing the local
+        * enqueue. (bug 842)
+        *
+        * This can't happen with the blocking_ast, however, because we
+        * will never call the local blocking_ast until we drop our
+        * reader/writer reference, which we won't do until we get the
+        * reply and finish enqueueing. */
+
+       /* nobody should touch l_cp_ast */
+       lock_res_and_lock(lock);
+       list_del_init(&lock->l_cp_ast);
+       LASSERT(ldlm_is_cp_reqd(lock));
+       /* save l_completion_ast since it can be changed by
+        * mds_intent_policy(), see bug 14225 */
+       completion_callback = lock->l_completion_ast;
+       ldlm_clear_cp_reqd(lock);
+       unlock_res_and_lock(lock);
+
+       if (completion_callback != NULL)
+               rc = completion_callback(lock, 0, (void *)arg);
+       LDLM_LOCK_RELEASE(lock);
+
+       RETURN(rc);
+}
 
 /**
  * Process list of locks in need of ASTs being sent.
@@ -2193,11 +2226,11 @@ int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
  * one.
  */
 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
-                      ldlm_desc_ast_t ast_type)
+                     ldlm_desc_ast_t ast_type)
 {
        struct ldlm_cb_set_arg *arg;
-       set_producer_func       work_ast_lock;
-       int                     rc;
+       set_producer_func work_ast_lock;
+       int rc;
 
        if (list_empty(rpc_list))
                RETURN(0);
@@ -2210,24 +2243,26 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
        arg->list = rpc_list;
 
        switch (ast_type) {
-               case LDLM_WORK_BL_AST:
-                       arg->type = LDLM_BL_CALLBACK;
-                       work_ast_lock = ldlm_work_bl_ast_lock;
-                       break;
-               case LDLM_WORK_CP_AST:
-                       arg->type = LDLM_CP_CALLBACK;
-                       work_ast_lock = ldlm_work_cp_ast_lock;
-                       break;
-               case LDLM_WORK_REVOKE_AST:
-                       arg->type = LDLM_BL_CALLBACK;
-                       work_ast_lock = ldlm_work_revoke_ast_lock;
-                       break;
-               case LDLM_WORK_GL_AST:
-                       arg->type = LDLM_GL_CALLBACK;
-                       work_ast_lock = ldlm_work_gl_ast_lock;
-                       break;
-               default:
-                       LBUG();
+       case LDLM_WORK_CP_AST:
+               arg->type = LDLM_CP_CALLBACK;
+               work_ast_lock = ldlm_work_cp_ast_lock;
+               break;
+#ifdef HAVE_SERVER_SUPPORT
+       case LDLM_WORK_BL_AST:
+               arg->type = LDLM_BL_CALLBACK;
+               work_ast_lock = ldlm_work_bl_ast_lock;
+               break;
+       case LDLM_WORK_REVOKE_AST:
+               arg->type = LDLM_BL_CALLBACK;
+               work_ast_lock = ldlm_work_revoke_ast_lock;
+               break;
+       case LDLM_WORK_GL_AST:
+               arg->type = LDLM_GL_CALLBACK;
+               work_ast_lock = ldlm_work_gl_ast_lock;
+               break;
+#endif
+       default:
+               LBUG();
        }
 
        /* We create a ptlrpc request set with flow control extension.
@@ -2239,7 +2274,7 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
        if (arg->set == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       ptlrpc_set_wait(arg->set);
+       ptlrpc_set_wait(NULL, arg->set);
        ptlrpc_set_destroy(arg->set);
 
        rc = atomic_read(&arg->restart) ? -ERESTART : 0;
@@ -2398,6 +2433,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
          * talking to me first. -phik */
         if (lock->l_readers || lock->l_writers) {
                 LDLM_ERROR(lock, "lock still has references");
+               unlock_res_and_lock(lock);
                 LBUG();
         }
 
@@ -2448,6 +2484,7 @@ int ldlm_lock_set_data(const struct lustre_handle *lockh, void *data)
 EXPORT_SYMBOL(ldlm_lock_set_data);
 
 struct export_cl_data {
+       const struct lu_env     *ecl_env;
        struct obd_export       *ecl_exp;
        int                     ecl_loop;
 };
@@ -2460,7 +2497,7 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp,
 
        res = ldlm_resource_getref(lock->l_resource);
 
-       ldlm_lvbo_update(res, lock, NULL, 1);
+       ldlm_lvbo_update(ecl->ecl_env, res, lock, NULL, 1);
        ldlm_lock_cancel(lock);
        if (!exp->exp_obd->obd_stopping)
                ldlm_reprocess_all(res);
@@ -2500,10 +2537,17 @@ ldlm_cancel_locks_for_export_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
  */
 int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
 {
+       struct lu_env env;
        struct export_cl_data   ecl = {
                .ecl_exp        = exp,
                .ecl_loop       = 0,
        };
+       int rc;
+
+       rc = lu_env_init(&env, LCT_DT_THREAD);
+       if (rc)
+               RETURN(rc);
+       ecl.ecl_env = &env;
 
        while (!list_empty(&exp->exp_bl_list)) {
                struct ldlm_lock *lock;
@@ -2526,6 +2570,8 @@ int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
                LDLM_LOCK_RELEASE(lock);
        }
 
+       lu_env_fini(&env);
+
        CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
               "left on hash table %d.\n", exp, ecl.ecl_loop,
               atomic_read(&exp->exp_lock_hash->hs_count));
@@ -2540,10 +2586,16 @@ int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
  */
 int ldlm_export_cancel_locks(struct obd_export *exp)
 {
-       struct export_cl_data   ecl = {
-               .ecl_exp        = exp,
-               .ecl_loop       = 0,
-       };
+       struct export_cl_data ecl;
+       struct lu_env env;
+       int rc;
+
+       rc = lu_env_init(&env, LCT_DT_THREAD);
+       if (rc)
+               RETURN(rc);
+       ecl.ecl_env = &env;
+       ecl.ecl_exp = exp;
+       ecl.ecl_loop = 0;
 
        cfs_hash_for_each_empty(exp->exp_lock_hash,
                                ldlm_cancel_locks_for_export_cb, &ecl);
@@ -2557,6 +2609,8 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
            exp->exp_obd->obd_stopping)
                ldlm_reprocess_recovery_done(exp->exp_obd->obd_namespace);
 
+       lu_env_fini(&env);
+
        return ecl.ecl_loop;
 }
 
@@ -2567,13 +2621,18 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
  * convertion may fail if lock was canceled before downgrade, but it doesn't
  * indicate any problem, because such lock has no reader or writer, and will
  * be released soon.
- * Used by Commit on Sharing (COS) code only for now.
+ *
+ * Used by Commit on Sharing (COS) code to force object changes commit in case
+ * of conflict. Converted lock is considered as new lock and all blocking AST
+ * things are cleared, so any pending or new blocked lock on that lock will
+ * cause new call to blocking_ast and force resource object commit.
  *
  * \param lock A lock to convert
  * \param new_mode new lock mode
  */
 void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
 {
+#ifdef HAVE_SERVER_SUPPORT
        ENTRY;
 
        LASSERT(new_mode == LCK_COS);
@@ -2594,14 +2653,20 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
         * ldlm_grant_lock() called below.
         */
        ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
+
+       /* Consider downgraded lock as a new lock and clear all states
+        * related to a previous blocking AST processing.
+        */
+       ldlm_clear_blocking_data(lock);
+
        lock->l_req_mode = new_mode;
        ldlm_grant_lock(lock, NULL);
-
        unlock_res_and_lock(lock);
 
        ldlm_reprocess_all(lock->l_resource);
 
        EXIT;
+#endif
 }
 EXPORT_SYMBOL(ldlm_lock_mode_downgrade);