Whamcloud - gitweb
LU-14286 osd-ldiskfs: fallocate() should zero new blocks
[fs/lustre-release.git] / lustre / osc / osc_lock.c
index c80296a..62da056 100644 (file)
@@ -106,7 +106,7 @@ static int osc_lock_invariant(struct osc_lock *ols)
 
        if (! ergo(ols->ols_state == OLS_GRANTED,
                   olock != NULL &&
-                  olock->l_req_mode == olock->l_granted_mode &&
+                  ldlm_is_granted(olock) &&
                   ols->ols_hold))
                return 0;
        return 1;
@@ -144,20 +144,17 @@ static void osc_lock_build_policy(const struct lu_env *env,
  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
  * logic.
  *
- * This can be optimized to not update attributes when lock is a result of a
- * local match.
- *
  * Called under lock and resource spin-locks.
  */
-static void osc_lock_lvb_update(const struct lu_env *env,
-                               struct osc_object *osc,
-                               struct ldlm_lock *dlmlock,
-                               struct ost_lvb *lvb)
+void osc_lock_lvb_update(const struct lu_env *env,
+                        struct osc_object *osc,
+                        struct ldlm_lock *dlmlock,
+                        struct ost_lvb *lvb)
 {
-       struct cl_object  *obj = osc2cl(osc);
-       struct lov_oinfo  *oinfo = osc->oo_oinfo;
-       struct cl_attr    *attr = &osc_env_info(env)->oti_attr;
-       unsigned           valid;
+       struct cl_object *obj = osc2cl(osc);
+       struct lov_oinfo *oinfo = osc->oo_oinfo;
+       struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+       unsigned valid, setkms = 0;
 
        ENTRY;
 
@@ -182,19 +179,23 @@ static void osc_lock_lvb_update(const struct lu_env *env,
                 if (size > dlmlock->l_policy_data.l_extent.end)
                         size = dlmlock->l_policy_data.l_extent.end + 1;
                 if (size >= oinfo->loi_kms) {
-                       LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu"
-                                  ", kms=%llu", lvb->lvb_size, size);
                         valid |= CAT_KMS;
                         attr->cat_kms = size;
-                } else {
-                        LDLM_DEBUG(dlmlock, "lock acquired, setting rss="
-                                  "%llu; leaving kms=%llu, end=%llu",
-                                   lvb->lvb_size, oinfo->loi_kms,
-                                   dlmlock->l_policy_data.l_extent.end);
+                       setkms = 1;
                 }
                ldlm_lock_allow_match_locked(dlmlock);
        }
 
+       /* The size should not be less than the kms */
+       if (attr->cat_size < oinfo->loi_kms)
+               attr->cat_size = oinfo->loi_kms;
+
+       LDLM_DEBUG(dlmlock, "acquired size %llu, setting rss=%llu;%s "
+                  "kms=%llu, end=%llu", lvb->lvb_size, attr->cat_size,
+                  setkms ? "" : " leaving",
+                  setkms ? attr->cat_kms : oinfo->loi_kms,
+                  dlmlock ? dlmlock->l_policy_data.l_extent.end : -1ull);
+
        cl_object_attr_update(env, obj, attr, valid);
        cl_object_attr_unlock(obj);
 
@@ -202,8 +203,9 @@ static void osc_lock_lvb_update(const struct lu_env *env,
 }
 
 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
-                            struct lustre_handle *lockh, bool lvb_update)
+                            struct lustre_handle *lockh)
 {
+       struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj);
        struct ldlm_lock *dlmlock;
 
        dlmlock = ldlm_handle2lock_long(lockh, 0);
@@ -212,7 +214,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
        /* lock reference taken by ldlm_handle2lock_long() is
         * owned by osc_lock and released in osc_lock_detach()
         */
-       lu_ref_add(&dlmlock->l_reference, "osc_lock", oscl);
+       lu_ref_add_atomic(&dlmlock->l_reference, "osc_lock", oscl);
        oscl->ols_has_ref = 1;
 
        LASSERT(oscl->ols_dlmlock == NULL);
@@ -230,7 +232,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
 
        /* Lock must have been granted. */
        lock_res_and_lock(dlmlock);
-       if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
+       if (ldlm_is_granted(dlmlock)) {
                struct ldlm_extent *ext = &dlmlock->l_policy_data.l_extent;
                struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr;
 
@@ -242,10 +244,11 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
                descr->cld_gid   = ext->gid;
 
                /* no lvb update for matched lock */
-               if (lvb_update) {
+               if (!ldlm_is_lvb_cached(dlmlock)) {
                        LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
-                       osc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj),
-                                           dlmlock, NULL);
+                       LASSERT(osc == dlmlock->l_ast_data);
+                       osc_lock_lvb_update(env, osc, dlmlock, NULL);
+                       ldlm_set_lvb_cached(dlmlock);
                }
                LINVRNT(osc_lock_invariant(oscl));
        }
@@ -285,7 +288,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh,
        }
 
        if (rc == 0)
-               osc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK);
+               osc_lock_granted(env, oscl, lockh);
 
        /* Error handling, some errors are tolerable. */
        if (oscl->ols_locklessable && rc == -EUSERS) {
@@ -339,9 +342,10 @@ static int osc_lock_upcall_speculative(void *cookie,
        LASSERT(dlmlock != NULL);
 
        lock_res_and_lock(dlmlock);
-       LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode);
+       LASSERT(ldlm_is_granted(dlmlock));
 
-       /* there is no osc_lock associated with speculative locks */
+       /* there is no osc_lock associated with speculative locks
+        * thus no need to set LDLM_FL_LVB_CACHED */
        osc_lock_lvb_update(env, osc, dlmlock, NULL);
 
        unlock_res_and_lock(dlmlock);
@@ -377,7 +381,12 @@ static int osc_lock_flush(struct osc_object *obj, pgoff_t start, pgoff_t end,
                        rc = 0;
        }
 
-       rc2 = osc_lock_discard_pages(env, obj, start, end, discard);
+       /*
+        * Do not try to match other locks with CLM_WRITE since we already
+        * know there're none
+        */
+       rc2 = osc_lock_discard_pages(env, obj, start, end,
+                                    mode == CLM_WRITE || discard);
        if (rc == 0 && rc2 < 0)
                rc = rc2;
 
@@ -402,7 +411,7 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env,
        LASSERT(flag == LDLM_CB_CANCELING);
 
        lock_res_and_lock(dlmlock);
-       if (dlmlock->l_granted_mode != dlmlock->l_req_mode) {
+       if (!ldlm_is_granted(dlmlock)) {
                dlmlock->l_ast_data = NULL;
                unlock_res_and_lock(dlmlock);
                RETURN(0);
@@ -414,13 +423,13 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env,
 
        if (dlmlock->l_ast_data != NULL) {
                obj = osc2cl(dlmlock->l_ast_data);
-               dlmlock->l_ast_data = NULL;
-
                cl_object_get(obj);
        }
 
        unlock_res_and_lock(dlmlock);
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_CANCEL, 5);
+
        /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or
         * the object has been destroyed. */
        if (obj != NULL) {
@@ -436,6 +445,9 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env,
 
                /* losing a lock, update kms */
                lock_res_and_lock(dlmlock);
+               /* clearing l_ast_data after flushing data,
+                * to let glimpse ast find the lock and the object */
+               dlmlock->l_ast_data = NULL;
                cl_object_attr_lock(obj);
                /* Must get the value under the lock to avoid race. */
                old_kms = cl2osc(obj)->oo_oinfo->loi_kms;
@@ -549,6 +561,10 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
        struct ost_lvb          *lvb;
        struct req_capsule      *cap;
        struct cl_object        *obj = NULL;
+       struct ldlm_resource    *res = dlmlock->l_resource;
+       struct ldlm_match_data  matchdata = { 0 };
+       union ldlm_policy_data  policy;
+       enum ldlm_mode          mode = LCK_PW | LCK_GROUP | LCK_PR;
        int                     result;
        __u16                   refcheck;
 
@@ -560,13 +576,39 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
        if (IS_ERR(env))
                GOTO(out, result = PTR_ERR(env));
 
+       policy.l_extent.start = 0;
+       policy.l_extent.end = LUSTRE_EOF;
 
-       lock_res_and_lock(dlmlock);
-       if (dlmlock->l_ast_data != NULL) {
-               obj = osc2cl(dlmlock->l_ast_data);
-               cl_object_get(obj);
+       matchdata.lmd_mode = &mode;
+       matchdata.lmd_policy = &policy;
+       matchdata.lmd_flags = LDLM_FL_TEST_LOCK | LDLM_FL_CBPENDING;
+       matchdata.lmd_match = LDLM_MATCH_UNREF | LDLM_MATCH_AST_ANY;
+
+       LDLM_LOCK_GET(dlmlock);
+
+       /* If any dlmlock has l_ast_data set, we must find it or we risk
+        * missing a size update done under a different lock.
+        */
+       while (dlmlock) {
+               lock_res_and_lock(dlmlock);
+               if (dlmlock->l_ast_data) {
+                       obj = osc2cl(dlmlock->l_ast_data);
+                       cl_object_get(obj);
+               }
+               unlock_res_and_lock(dlmlock);
+               LDLM_LOCK_RELEASE(dlmlock);
+
+               dlmlock = NULL;
+
+               if (obj == NULL && res->lr_type == LDLM_EXTENT) {
+                       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA))
+                               break;
+
+                       lock_res(res);
+                       dlmlock = search_itree(res, &matchdata);
+                       unlock_res(res);
+               }
        }
-       unlock_res_and_lock(dlmlock);
 
        if (obj != NULL) {
                /* Do not grab the mutex of cl_lock for glimpse.
@@ -606,50 +648,45 @@ out:
 }
 EXPORT_SYMBOL(osc_ldlm_glimpse_ast);
 
-static int weigh_cb(const struct lu_env *env, struct cl_io *io,
-                   struct osc_page *ops, void *cbdata)
+static bool weigh_cb(const struct lu_env *env, struct cl_io *io,
+                    struct osc_page *ops, void *cbdata)
 {
        struct cl_page *page = ops->ops_cl.cpl_page;
 
-       if (cl_page_is_vmlocked(env, page)
-           || PageDirty(page->cp_vmpage) || PageWriteback(page->cp_vmpage)
-          )
-               return CLP_GANG_ABORT;
+       if (cl_page_is_vmlocked(env, page) || PageDirty(page->cp_vmpage) ||
+           PageWriteback(page->cp_vmpage))
+               return false;
 
        *(pgoff_t *)cbdata = osc_index(ops) + 1;
-       return CLP_GANG_OKAY;
+       return true;
 }
 
 static unsigned long osc_lock_weight(const struct lu_env *env,
                                     struct osc_object *oscobj,
-                                    struct ldlm_extent *extent)
+                                    loff_t start, loff_t end)
 {
-       struct cl_io     *io = osc_env_thread_io(env);
+       struct cl_io *io = osc_env_thread_io(env);
        struct cl_object *obj = cl_object_top(&oscobj->oo_cl);
-       pgoff_t          page_index;
-       int              result;
+       pgoff_t page_index;
+       int result;
+
        ENTRY;
 
        io->ci_obj = obj;
        io->ci_ignore_layout = 1;
        result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
        if (result != 0)
-               RETURN(result);
+               RETURN(1);
 
-       page_index = cl_index(obj, extent->start);
-       do {
-               result = osc_page_gang_lookup(env, io, oscobj,
-                                             page_index,
-                                             cl_index(obj, extent->end),
-                                             weigh_cb, (void *)&page_index);
-               if (result == CLP_GANG_ABORT)
-                       break;
-               if (result == CLP_GANG_RESCHED)
-                       cond_resched();
-       } while (result != CLP_GANG_OKAY);
+       page_index = cl_index(obj, start);
+
+       if (!osc_page_gang_lookup(env, io, oscobj,
+                                 page_index, cl_index(obj, end),
+                                 weigh_cb, (void *)&page_index))
+               result = 1;
        cl_io_fini(env, io);
 
-       return result == CLP_GANG_ABORT ? 1 : 0;
+       return result;
 }
 
 /**
@@ -657,12 +694,13 @@ static unsigned long osc_lock_weight(const struct lu_env *env,
  */
 unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
 {
-       struct lu_env           *env;
-       struct osc_object       *obj;
-       struct osc_lock         *oscl;
-       unsigned long            weight;
-       bool                    found = false;
-       __u16                   refcheck;
+       struct lu_env *env;
+       struct osc_object *obj;
+       struct osc_lock *oscl;
+       unsigned long weight;
+       bool found = false;
+       __u16 refcheck;
+
        ENTRY;
 
        might_sleep();
@@ -678,7 +716,9 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
                /* Mostly because lack of memory, do not eliminate this lock */
                RETURN(1);
 
-       LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT);
+       LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT ||
+               dlmlock->l_resource->lr_type == LDLM_IBITS);
+
        lock_res_and_lock(dlmlock);
        obj = dlmlock->l_ast_data;
        if (obj)
@@ -686,13 +726,14 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
        unlock_res_and_lock(dlmlock);
 
        if (obj == NULL)
-               GOTO(out, weight = 1);
+               GOTO(out, weight = 0);
 
        spin_lock(&obj->oo_ol_spin);
        list_for_each_entry(oscl, &obj->oo_ol_list, ols_nextlock_oscobj) {
-               if (oscl->ols_dlmlock != NULL && oscl->ols_dlmlock != dlmlock)
-                       continue;
-               found = true;
+               if (oscl->ols_dlmlock == dlmlock) {
+                       found = true;
+                       break;
+               }
        }
        spin_unlock(&obj->oo_ol_spin);
        if (found) {
@@ -702,7 +743,18 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
                GOTO(out, weight = 1);
        }
 
-       weight = osc_lock_weight(env, obj, &dlmlock->l_policy_data.l_extent);
+       if (dlmlock->l_resource->lr_type == LDLM_EXTENT)
+               weight = osc_lock_weight(env, obj,
+                                        dlmlock->l_policy_data.l_extent.start,
+                                        dlmlock->l_policy_data.l_extent.end);
+       else if (ldlm_has_dom(dlmlock))
+               weight = osc_lock_weight(env, obj, 0, OBD_OBJECT_EOF);
+       /* The DOM bit can be cancelled at any time; in that case, we know
+        * there are no pages, so just return weight of 0
+        */
+       else
+               weight = 0;
+
        EXIT;
 
 out:
@@ -712,6 +764,7 @@ out:
        cl_env_put(env, &refcheck);
        return weight;
 }
+EXPORT_SYMBOL(osc_ldlm_weigh_ast);
 
 static void osc_lock_build_einfo(const struct lu_env *env,
                                 const struct cl_lock *lock,
@@ -858,7 +911,7 @@ restart:
                        continue;
 
                /* wait for conflicting lock to be canceled */
-               cl_sync_io_init(waiter, 1, cl_sync_io_end);
+               cl_sync_io_init(waiter, 1);
                oscl->ols_owner = waiter;
 
                spin_lock(&tmp_oscl->ols_lock);
@@ -923,10 +976,10 @@ static int osc_lock_enqueue(const struct lu_env *env,
                RETURN(0);
 
        if ((oscl->ols_flags & LDLM_FL_NO_EXPANSION) &&
-           !(exp_connect_lockahead_old(exp) || exp_connect_lockahead(exp))) {
+           !exp_connect_lockahead(exp)) {
                result = -EOPNOTSUPP;
-               CERROR("%s: server does not support lockahead/locknoexpand:"
-                      "rc = %d\n", exp->exp_obd->obd_name, result);
+               CERROR("%s: server does not support lockahead/locknoexpand: rc = %d\n",
+                      exp->exp_obd->obd_name, result);
                RETURN(result);
        }
 
@@ -984,7 +1037,6 @@ enqueue_base:
        }
        result = osc_enqueue_base(exp, resname, &oscl->ols_flags,
                                  policy, &oscl->ols_lvb,
-                                 osc->oo_oinfo->loi_kms_valid,
                                  upcall, cookie,
                                  &oscl->ols_einfo, PTLRPCD_SET, async,
                                  oscl->ols_speculative);
@@ -1134,9 +1186,9 @@ void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io,
                return;
 
        if (likely(io->ci_type == CIT_WRITE)) {
-               io_start = cl_index(obj, io->u.ci_rw.rw_range.cir_pos);
-               io_end = cl_index(obj, io->u.ci_rw.rw_range.cir_pos +
-                                 io->u.ci_rw.rw_range.cir_count - 1);
+               io_start = cl_index(obj, io->u.ci_rw.crw_pos);
+               io_end = cl_index(obj, io->u.ci_rw.crw_pos +
+                                               io->u.ci_rw.crw_count - 1);
        } else {
                LASSERT(cl_io_is_mkwrite(io));
                io_start = io_end = io->u.ci_fault.ft_index;
@@ -1180,6 +1232,8 @@ int osc_lock_init(const struct lu_env *env,
 
        oscl->ols_flags = osc_enq2ldlm_flags(enqflags);
        oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
+       if (lock->cll_descr.cld_mode == CLM_GROUP)
+               oscl->ols_flags |= LDLM_FL_ATOMIC_CB;
 
        if (oscl->ols_flags & LDLM_FL_HAS_INTENT) {
                oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;
@@ -1222,6 +1276,7 @@ struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env,
        struct ldlm_lock *lock = NULL;
        enum ldlm_mode mode;
        __u64 flags;
+       enum ldlm_match_flags match_flags = 0;
 
        ENTRY;
 
@@ -1232,14 +1287,24 @@ struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env,
        flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
        if (dap_flags & OSC_DAP_FL_TEST_LOCK)
                flags |= LDLM_FL_TEST_LOCK;
+
+       if (dap_flags & OSC_DAP_FL_AST)
+               match_flags |= LDLM_MATCH_AST;
+
+       if (dap_flags & OSC_DAP_FL_CANCELING)
+               match_flags |= LDLM_MATCH_UNREF;
+
+       if (dap_flags & OSC_DAP_FL_RIGHT)
+               match_flags |= LDLM_MATCH_RIGHT;
+
        /*
         * It is fine to match any group lock since there could be only one
         * with a uniq gid and it conflicts with all other lock modes too
         */
 again:
-       mode = osc_match_base(osc_export(obj), resname, LDLM_EXTENT, policy,
-                              LCK_PR | LCK_PW | LCK_GROUP, &flags, obj, &lockh,
-                              dap_flags & OSC_DAP_FL_CANCELING);
+       mode = osc_match_base(env, osc_export(obj), resname, LDLM_EXTENT,
+                             policy, LCK_PR | LCK_PW | LCK_GROUP, &flags,
+                             obj, &lockh, match_flags);
        if (mode != 0) {
                lock = ldlm_handle2lock(&lockh);
                /* RACE: the lock is cancelled so let's try again */