Whamcloud - gitweb
LU-11014 mdc: remove obsolete intent opcodes
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 01d9837..44680dd 100644 (file)
@@ -125,8 +125,6 @@ const char *ldlm_it2str(enum ldlm_intent_flags it)
                return "getattr";
        case IT_LOOKUP:
                return "lookup";
-       case IT_UNLINK:
-               return "unlink";
        case IT_GETXATTR:
                return "getxattr";
        case IT_LAYOUT:
@@ -236,6 +234,8 @@ int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock)
                struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 
                LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
+               if (ns->ns_last_pos == &lock->l_lru)
+                       ns->ns_last_pos = lock->l_lru.prev;
                list_del_init(&lock->l_lru);
                LASSERT(ns->ns_nr_unused > 0);
                ns->ns_nr_unused--;
@@ -286,7 +286,6 @@ void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock)
        LASSERT(list_empty(&lock->l_lru));
        LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
        list_add_tail(&lock->l_lru, &ns->ns_unused_list);
-       ldlm_clear_skipped(lock);
        LASSERT(ns->ns_nr_unused >= 0);
        ns->ns_nr_unused++;
 }
@@ -485,6 +484,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
         lu_ref_init(&lock->l_reference);
         lu_ref_add(&lock->l_reference, "hash", lock);
         lock->l_callback_timeout = 0;
+       lock->l_activity = 0;
 
 #if LUSTRE_TRACKS_LOCK_EXP_REFS
        INIT_LIST_HEAD(&lock->l_exp_refs_link);
@@ -868,7 +868,8 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
         } else if (ns_is_client(ns) &&
                    !lock->l_readers && !lock->l_writers &&
                   !ldlm_is_no_lru(lock) &&
-                  !ldlm_is_bl_ast(lock)) {
+                  !ldlm_is_bl_ast(lock) &&
+                  !ldlm_is_converting(lock)) {
 
                 LDLM_DEBUG(lock, "add lock into lru list");
 
@@ -1689,6 +1690,33 @@ out:
        RETURN(ERR_PTR(rc));
 }
 
+#ifdef HAVE_SERVER_SUPPORT
+static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock,
+                                            __u64 *flags)
+{
+       struct ldlm_resource *res = lock->l_resource;
+       enum ldlm_error rc = ELDLM_OK;
+       struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+       ldlm_processing_policy policy;
+       ENTRY;
+
+       policy = ldlm_processing_policy_table[res->lr_type];
+restart:
+       policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
+       if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
+           res->lr_type != LDLM_FLOCK) {
+               rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list);
+               if (rc == -ERESTART)
+                       GOTO(restart, rc);
+       }
+
+       if (!list_empty(&rpc_list))
+               ldlm_discard_bl_list(&rpc_list);
+
+       RETURN(rc);
+}
+#endif
+
 /**
  * Enqueue (request) a lock.
  *
@@ -1706,9 +1734,6 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
        struct ldlm_lock *lock = *lockp;
        struct ldlm_resource *res = lock->l_resource;
        int local = ns_is_client(ldlm_res_to_ns(res));
-#ifdef HAVE_SERVER_SUPPORT
-       ldlm_processing_policy policy;
-#endif
        enum ldlm_error rc = ELDLM_OK;
        struct ldlm_interval *node = NULL;
        ENTRY;
@@ -1827,8 +1852,7 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
                /* If no flags, fall through to normal enqueue path. */
        }
 
-       policy = ldlm_processing_policy_table[res->lr_type];
-       policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL);
+       rc = ldlm_lock_enqueue_helper(lock, flags);
        GOTO(out, rc);
 #else
         } else {
@@ -1861,6 +1885,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
        __u64 flags;
        int rc = LDLM_ITER_CONTINUE;
        enum ldlm_error err;
+       struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list);
        ENTRY;
 
        check_res_locked(res);
@@ -1870,15 +1895,23 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
        LASSERT(intention == LDLM_PROCESS_RESCAN ||
                intention == LDLM_PROCESS_RECOVERY);
 
+restart:
        list_for_each_safe(tmp, pos, queue) {
                struct ldlm_lock *pending;
+               struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
 
                pending = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
 
                 flags = 0;
-               rc = policy(pending, &flags, intention, &err, work_list);
+               rc = policy(pending, &flags, intention, &err, &rpc_list);
+               if (pending->l_granted_mode == pending->l_req_mode ||
+                   res->lr_type == LDLM_FLOCK) {
+                       list_splice(&rpc_list, work_list);
+               } else {
+                       list_splice(&rpc_list, &bl_ast_list);
+               }
                /*
                 * When this is called from recovery done, we always want
                 * to scan the whole list no matter what 'rc' is returned.
@@ -1888,6 +1921,20 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                        break;
         }
 
+       if (!list_empty(&bl_ast_list)) {
+               unlock_res(res);
+
+               rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list,
+                                      LDLM_WORK_BL_AST);
+
+               lock_res(res);
+               if (rc == -ERESTART)
+                       GOTO(restart, rc);
+       }
+
+       if (!list_empty(&bl_ast_list))
+               ldlm_discard_bl_list(&bl_ast_list);
+
         RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
 }
 
@@ -1898,7 +1945,6 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
  * \param[in] lock             The lock to be enqueued.
  * \param[out] flags           Lock flags for the lock to be enqueued.
  * \param[in] rpc_list         Conflicting locks list.
- * \param[in] grant_flags      extra flags when granting a lock.
  *
  * \retval -ERESTART:  Some lock was instantly canceled while sending
  *                     blocking ASTs, caller needs to re-check conflicting
@@ -1907,7 +1953,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
  * \reval 0:           Lock is successfully added in waiting list.
  */
 int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
-                             struct list_head *rpc_list, __u64 grant_flags)
+                             struct list_head *rpc_list)
 {
        struct ldlm_resource *res = lock->l_resource;
        int rc;
@@ -1956,7 +2002,7 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
 
                RETURN(rc);
        }
-       *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags);
+       *flags |= LDLM_FL_BLOCK_GRANTED;
 
        RETURN(0);
 }
@@ -2018,6 +2064,13 @@ ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        unlock_res_and_lock(lock);
 
        ldlm_lock2desc(lock->l_blocking_lock, &d);
+       /* copy blocking lock ibits in cancel_bits as well,
+        * new client may use them for lock convert and it is
+        * important to use new field to convert locks from
+        * new servers only
+        */
+       d.l_policy_data.l_inodebits.cancel_bits =
+               lock->l_blocking_lock->l_policy_data.l_inodebits.bits;
 
        rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING);
        LDLM_LOCK_RELEASE(lock->l_blocking_lock);
@@ -2351,6 +2404,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
          * talking to me first. -phik */
         if (lock->l_readers || lock->l_writers) {
                 LDLM_ERROR(lock, "lock still has references");
+               unlock_res_and_lock(lock);
                 LBUG();
         }