Whamcloud - gitweb
LU-9679 modules: Use LIST_HEAD for declaring list_heads
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index da90fb1..4276c1e 100644 (file)
@@ -136,8 +136,6 @@ const char *ldlm_it2str(enum ldlm_intent_flags it)
 }
 EXPORT_SYMBOL(ldlm_it2str);
 
-extern struct kmem_cache *ldlm_lock_slab;
-
 #ifdef HAVE_SERVER_SUPPORT
 static ldlm_processing_policy ldlm_processing_policy_table[] = {
        [LDLM_PLAIN]    = ldlm_process_plain_lock,
@@ -151,6 +149,19 @@ ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
         return ldlm_processing_policy_table[res->lr_type];
 }
 EXPORT_SYMBOL(ldlm_get_processing_policy);
+
+static ldlm_reprocessing_policy ldlm_reprocessing_policy_table[] = {
+       [LDLM_PLAIN]    = ldlm_reprocess_queue,
+       [LDLM_EXTENT]   = ldlm_reprocess_queue,
+       [LDLM_FLOCK]    = ldlm_reprocess_queue,
+       [LDLM_IBITS]    = ldlm_reprocess_inodebits_queue,
+};
+
+ldlm_reprocessing_policy ldlm_get_reprocessing_policy(struct ldlm_resource *res)
+{
+       return ldlm_reprocessing_policy_table[res->lr_type];
+}
+
 #endif /* HAVE_SERVER_SUPPORT */
 
 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
@@ -205,8 +216,6 @@ void ldlm_lock_put(struct ldlm_lock *lock)
                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
                                      LDLM_NSS_LOCKS);
                 lu_ref_del(&res->lr_reference, "lock", lock);
-                ldlm_resource_putref(res);
-                lock->l_resource = NULL;
                 if (lock->l_export) {
                         class_export_lock_put(lock->l_export, lock);
                         lock->l_export = NULL;
@@ -215,7 +224,15 @@ void ldlm_lock_put(struct ldlm_lock *lock)
                 if (lock->l_lvb_data != NULL)
                         OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
 
-                ldlm_interval_free(ldlm_interval_detach(lock));
+               if (res->lr_type == LDLM_EXTENT) {
+                       ldlm_interval_free(ldlm_interval_detach(lock));
+               } else if (res->lr_type == LDLM_IBITS) {
+                       if (lock->l_ibits_node != NULL)
+                               OBD_SLAB_FREE_PTR(lock->l_ibits_node,
+                                                 ldlm_inodebits_slab);
+               }
+               ldlm_resource_putref(res);
+               lock->l_resource = NULL;
                 lu_ref_fini(&lock->l_reference);
                OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
         }
@@ -589,7 +606,7 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
 
        LASSERT(handle);
 
-       lock = class_handle2object(handle->cookie, NULL);
+       lock = class_handle2object(handle->cookie, &lock_handle_ops);
        if (lock == NULL)
                RETURN(NULL);
 
@@ -1035,7 +1052,6 @@ static void search_granted_lock(struct list_head *queue,
         prev->mode_link = &req->l_sl_mode;
         prev->policy_link = &req->l_sl_policy;
         EXIT;
-        return;
 }
 
 /**
@@ -1138,29 +1154,16 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
 }
 
 /**
- * Describe the overlap between two locks.  itree_overlap_cb data.
- */
-struct lock_match_data {
-       struct ldlm_lock        *lmd_old;
-       struct ldlm_lock        *lmd_lock;
-       enum ldlm_mode          *lmd_mode;
-       union ldlm_policy_data  *lmd_policy;
-       __u64                    lmd_flags;
-       __u64                    lmd_skip_flags;
-       int                      lmd_unref;
-};
-
-/**
  * Check if the given @lock meets the criteria for a match.
  * A reference on the lock is taken if matched.
  *
  * \param lock     test-against this lock
  * \param data    parameters
  */
-static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
+static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
 {
        union ldlm_policy_data *lpol = &lock->l_policy_data;
-       enum ldlm_mode match;
+       enum ldlm_mode match = LCK_MINMODE;
 
        if (lock == data->lmd_old)
                return INTERVAL_ITER_STOP;
@@ -1185,6 +1188,17 @@ static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
 
        if (!(lock->l_req_mode & *data->lmd_mode))
                return INTERVAL_ITER_CONT;
+
+       /* When we search for ast_data, we are not doing a traditional match,
+        * so we don't worry about IBITS or extent matching.
+        */
+       if (data->lmd_has_ast_data) {
+               if (!lock->l_ast_data)
+                       return INTERVAL_ITER_CONT;
+
+               goto matched;
+       }
+
        match = lock->l_req_mode;
 
        switch (lock->l_resource->lr_type) {
@@ -1222,6 +1236,7 @@ static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
        if (data->lmd_skip_flags & lock->l_flags)
                return INTERVAL_ITER_CONT;
 
+matched:
        if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
                LDLM_LOCK_GET(lock);
                ldlm_lock_touch_in_lru(lock);
@@ -1238,7 +1253,7 @@ static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
 static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
 {
        struct ldlm_interval *node = to_ldlm_interval(in);
-       struct lock_match_data *data = args;
+       struct ldlm_match_data *data = args;
        struct ldlm_lock *lock;
        int rc;
 
@@ -1258,8 +1273,8 @@ static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
  *
  * \retval a referenced lock or NULL.
  */
-static struct ldlm_lock *search_itree(struct ldlm_resource *res,
-                                     struct lock_match_data *data)
+struct ldlm_lock *search_itree(struct ldlm_resource *res,
+                              struct ldlm_match_data *data)
 {
        struct interval_node_extent ext = {
                .start     = data->lmd_policy->l_extent.start,
@@ -1286,6 +1301,7 @@ static struct ldlm_lock *search_itree(struct ldlm_resource *res,
 
        return NULL;
 }
+EXPORT_SYMBOL(search_itree);
 
 
 /**
@@ -1297,7 +1313,7 @@ static struct ldlm_lock *search_itree(struct ldlm_resource *res,
  * \retval a referenced lock or NULL.
  */
 static struct ldlm_lock *search_queue(struct list_head *queue,
-                                     struct lock_match_data *data)
+                                     struct ldlm_match_data *data)
 {
        struct ldlm_lock *lock;
        int rc;
@@ -1393,7 +1409,7 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                                         enum ldlm_mode mode,
                                         struct lustre_handle *lockh, int unref)
 {
-       struct lock_match_data data = {
+       struct ldlm_match_data data = {
                .lmd_old = NULL,
                .lmd_lock = NULL,
                .lmd_mode = &mode,
@@ -1401,6 +1417,7 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                .lmd_flags = flags,
                .lmd_skip_flags = skip_flags,
                .lmd_unref = unref,
+               .lmd_has_ast_data = false,
        };
        struct ldlm_resource *res;
        struct ldlm_lock *lock;
@@ -1668,11 +1685,18 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
                lock->l_glimpse_ast = cbs->lcs_glimpse;
        }
 
-       lock->l_tree_node = NULL;
-       /* if this is the extent lock, allocate the interval tree node */
-       if (type == LDLM_EXTENT)
-               if (ldlm_interval_alloc(lock) == NULL)
-                       GOTO(out, rc = -ENOMEM);
+       switch (type) {
+       case LDLM_EXTENT:
+               rc = ldlm_extent_alloc_lock(lock);
+               break;
+       case LDLM_IBITS:
+               rc = ldlm_inodebits_alloc_lock(lock);
+               break;
+       default:
+               rc = 0;
+       }
+       if (rc)
+               GOTO(out, rc);
 
        if (lvb_len) {
                lock->l_lvb_len = lvb_len;
@@ -1699,11 +1723,12 @@ static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock,
 {
        struct ldlm_resource *res = lock->l_resource;
        enum ldlm_error rc = ELDLM_OK;
-       struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+       LIST_HEAD(rpc_list);
        ldlm_processing_policy policy;
+
        ENTRY;
 
-       policy = ldlm_processing_policy_table[res->lr_type];
+       policy = ldlm_get_processing_policy(res);
 restart:
        policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
        if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
@@ -1882,19 +1907,21 @@ out:
  */
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                         struct list_head *work_list,
-                        enum ldlm_process_intention intention)
+                        enum ldlm_process_intention intention,
+                        struct ldlm_lock *hint)
 {
        struct list_head *tmp, *pos;
        ldlm_processing_policy policy;
        __u64 flags;
        int rc = LDLM_ITER_CONTINUE;
        enum ldlm_error err;
-       struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list);
+       LIST_HEAD(bl_ast_list);
+
        ENTRY;
 
        check_res_locked(res);
 
-       policy = ldlm_processing_policy_table[res->lr_type];
+       policy = ldlm_get_processing_policy(res);
        LASSERT(policy);
        LASSERT(intention == LDLM_PROCESS_RESCAN ||
                intention == LDLM_PROCESS_RECOVERY);
@@ -1902,7 +1929,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
 restart:
        list_for_each_safe(tmp, pos, queue) {
                struct ldlm_lock *pending;
-               struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+               LIST_HEAD(rpc_list);
 
                pending = list_entry(tmp, struct ldlm_lock, l_res_link);
 
@@ -2289,20 +2316,23 @@ out:
  * if anything could be granted as a result of the cancellation.
  */
 static void __ldlm_reprocess_all(struct ldlm_resource *res,
-                                enum ldlm_process_intention intention)
+                                enum ldlm_process_intention intention,
+                                struct ldlm_lock *hint)
 {
        struct list_head rpc_list;
 #ifdef HAVE_SERVER_SUPPORT
+       ldlm_reprocessing_policy reprocess;
        struct obd_device *obd;
-        int rc;
-        ENTRY;
+       int rc;
+
+       ENTRY;
 
        INIT_LIST_HEAD(&rpc_list);
-        /* Local lock trees don't get reprocessed. */
-        if (ns_is_client(ldlm_res_to_ns(res))) {
-                EXIT;
-                return;
-        }
+       /* Local lock trees don't get reprocessed. */
+       if (ns_is_client(ldlm_res_to_ns(res))) {
+               EXIT;
+               return;
+       }
 
        /* Disable reprocess during lock replay stage but allow during
         * request replay stage.
@@ -2313,7 +2343,8 @@ static void __ldlm_reprocess_all(struct ldlm_resource *res,
                RETURN_EXIT;
 restart:
        lock_res(res);
-       ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, intention);
+       reprocess = ldlm_get_reprocessing_policy(res);
+       reprocess(res, &res->lr_waiting, &rpc_list, intention, hint);
        unlock_res(res);
 
        rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
@@ -2323,21 +2354,21 @@ restart:
                goto restart;
        }
 #else
-        ENTRY;
+       ENTRY;
 
        INIT_LIST_HEAD(&rpc_list);
-        if (!ns_is_client(ldlm_res_to_ns(res))) {
-                CERROR("This is client-side-only module, cannot handle "
-                       "LDLM_NAMESPACE_SERVER resource type lock.\n");
-                LBUG();
-        }
+       if (!ns_is_client(ldlm_res_to_ns(res))) {
+               CERROR("This is client-side-only module, cannot handle "
+                      "LDLM_NAMESPACE_SERVER resource type lock.\n");
+               LBUG();
+       }
 #endif
-        EXIT;
+       EXIT;
 }
 
-void ldlm_reprocess_all(struct ldlm_resource *res)
+void ldlm_reprocess_all(struct ldlm_resource *res, struct ldlm_lock *hint)
 {
-       __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN);
+       __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN, hint);
 }
 EXPORT_SYMBOL(ldlm_reprocess_all);
 
@@ -2347,7 +2378,7 @@ static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
        struct ldlm_resource *res = cfs_hash_object(hs, hnode);
 
        /* This is only called once after recovery done. LU-8306. */
-       __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY);
+       __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, NULL);
        return 0;
 }
 
@@ -2376,24 +2407,22 @@ void ldlm_cancel_callback(struct ldlm_lock *lock)
        if (!ldlm_is_cancel(lock)) {
                ldlm_set_cancel(lock);
                if (lock->l_blocking_ast) {
-                        unlock_res_and_lock(lock);
-                        lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
-                                             LDLM_CB_CANCELING);
-                        lock_res_and_lock(lock);
-                } else {
-                        LDLM_DEBUG(lock, "no blocking ast");
-                }
+                       unlock_res_and_lock(lock);
+                       lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
+                                            LDLM_CB_CANCELING);
+                       lock_res_and_lock(lock);
+               } else {
+                       LDLM_DEBUG(lock, "no blocking ast");
+               }
 
                /* only canceller can set bl_done bit */
                ldlm_set_bl_done(lock);
                wake_up_all(&lock->l_waitq);
        } else if (!ldlm_is_bl_done(lock)) {
-               struct l_wait_info lwi = { 0 };
-
                /* The lock is guaranteed to have been canceled once
                 * returning from this function. */
                unlock_res_and_lock(lock);
-               l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi);
+               wait_event_idle(lock->l_waitq, is_bl_done(lock));
                lock_res_and_lock(lock);
        }
 }
@@ -2496,7 +2525,7 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp,
        ldlm_lvbo_update(res, lock, NULL, 1);
        ldlm_lock_cancel(lock);
        if (!exp->exp_obd->obd_stopping)
-               ldlm_reprocess_all(res);
+               ldlm_reprocess_all(res, lock);
        ldlm_resource_putref(res);
 
        ecl->ecl_loop++;
@@ -2611,7 +2640,7 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
 }
 
 /**
- * Downgrade an PW/EX lock to COS mode.
+ * Downgrade an PW/EX lock to COS | CR mode.
  *
  * A lock mode convertion from PW/EX mode to less conflict mode. The
  * convertion may fail if lock was canceled before downgrade, but it doesn't
@@ -2623,6 +2652,8 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
  * things are cleared, so any pending or new blocked lock on that lock will
  * cause new call to blocking_ast and force resource object commit.
  *
+ * Also used by layout_change to replace EX lock to CR lock.
+ *
  * \param lock A lock to convert
  * \param new_mode new lock mode
  */
@@ -2631,7 +2662,7 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
 #ifdef HAVE_SERVER_SUPPORT
        ENTRY;
 
-       LASSERT(new_mode == LCK_COS);
+       LASSERT(new_mode == LCK_COS || new_mode == LCK_CR);
 
        lock_res_and_lock(lock);
 
@@ -2659,7 +2690,7 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
        ldlm_grant_lock(lock, NULL);
        unlock_res_and_lock(lock);
 
-       ldlm_reprocess_all(lock->l_resource);
+       ldlm_reprocess_all(lock->l_resource, lock);
 
        EXIT;
 #endif