Whamcloud - gitweb
LU-14286 osd-ldiskfs: fallocate() should zero new blocks
[fs/lustre-release.git] / lustre / osc / osc_lock.c
index 90811af..62da056 100644 (file)
@@ -144,20 +144,17 @@ static void osc_lock_build_policy(const struct lu_env *env,
  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
  * logic.
  *
- * This can be optimized to not update attributes when lock is a result of a
- * local match.
- *
  * Called under lock and resource spin-locks.
  */
-static void osc_lock_lvb_update(const struct lu_env *env,
-                               struct osc_object *osc,
-                               struct ldlm_lock *dlmlock,
-                               struct ost_lvb *lvb)
+void osc_lock_lvb_update(const struct lu_env *env,
+                        struct osc_object *osc,
+                        struct ldlm_lock *dlmlock,
+                        struct ost_lvb *lvb)
 {
-       struct cl_object  *obj = osc2cl(osc);
-       struct lov_oinfo  *oinfo = osc->oo_oinfo;
-       struct cl_attr    *attr = &osc_env_info(env)->oti_attr;
-       unsigned           valid;
+       struct cl_object *obj = osc2cl(osc);
+       struct lov_oinfo *oinfo = osc->oo_oinfo;
+       struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+       unsigned valid, setkms = 0;
 
        ENTRY;
 
@@ -182,19 +179,23 @@ static void osc_lock_lvb_update(const struct lu_env *env,
                 if (size > dlmlock->l_policy_data.l_extent.end)
                         size = dlmlock->l_policy_data.l_extent.end + 1;
                 if (size >= oinfo->loi_kms) {
-                       LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu"
-                                  ", kms=%llu", lvb->lvb_size, size);
                         valid |= CAT_KMS;
                         attr->cat_kms = size;
-                } else {
-                        LDLM_DEBUG(dlmlock, "lock acquired, setting rss="
-                                  "%llu; leaving kms=%llu, end=%llu",
-                                   lvb->lvb_size, oinfo->loi_kms,
-                                   dlmlock->l_policy_data.l_extent.end);
+                       setkms = 1;
                 }
                ldlm_lock_allow_match_locked(dlmlock);
        }
 
+       /* The size should not be less than the kms */
+       if (attr->cat_size < oinfo->loi_kms)
+               attr->cat_size = oinfo->loi_kms;
+
+       LDLM_DEBUG(dlmlock, "acquired size %llu, setting rss=%llu;%s "
+                  "kms=%llu, end=%llu", lvb->lvb_size, attr->cat_size,
+                  setkms ? "" : " leaving",
+                  setkms ? attr->cat_kms : oinfo->loi_kms,
+                  dlmlock ? dlmlock->l_policy_data.l_extent.end : -1ull);
+
        cl_object_attr_update(env, obj, attr, valid);
        cl_object_attr_unlock(obj);
 
@@ -202,8 +203,9 @@ static void osc_lock_lvb_update(const struct lu_env *env,
 }
 
 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
-                            struct lustre_handle *lockh, bool lvb_update)
+                            struct lustre_handle *lockh)
 {
+       struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj);
        struct ldlm_lock *dlmlock;
 
        dlmlock = ldlm_handle2lock_long(lockh, 0);
@@ -212,7 +214,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
        /* lock reference taken by ldlm_handle2lock_long() is
         * owned by osc_lock and released in osc_lock_detach()
         */
-       lu_ref_add(&dlmlock->l_reference, "osc_lock", oscl);
+       lu_ref_add_atomic(&dlmlock->l_reference, "osc_lock", oscl);
        oscl->ols_has_ref = 1;
 
        LASSERT(oscl->ols_dlmlock == NULL);
@@ -242,10 +244,11 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
                descr->cld_gid   = ext->gid;
 
                /* no lvb update for matched lock */
-               if (lvb_update) {
+               if (!ldlm_is_lvb_cached(dlmlock)) {
                        LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
-                       osc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj),
-                                           dlmlock, NULL);
+                       LASSERT(osc == dlmlock->l_ast_data);
+                       osc_lock_lvb_update(env, osc, dlmlock, NULL);
+                       ldlm_set_lvb_cached(dlmlock);
                }
                LINVRNT(osc_lock_invariant(oscl));
        }
@@ -285,7 +288,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh,
        }
 
        if (rc == 0)
-               osc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK);
+               osc_lock_granted(env, oscl, lockh);
 
        /* Error handling, some errors are tolerable. */
        if (oscl->ols_locklessable && rc == -EUSERS) {
@@ -341,7 +344,8 @@ static int osc_lock_upcall_speculative(void *cookie,
        lock_res_and_lock(dlmlock);
        LASSERT(ldlm_is_granted(dlmlock));
 
-       /* there is no osc_lock associated with speculative locks */
+       /* there is no osc_lock associated with speculative locks
+        * thus no need to set LDLM_FL_LVB_CACHED */
        osc_lock_lvb_update(env, osc, dlmlock, NULL);
 
        unlock_res_and_lock(dlmlock);
@@ -419,13 +423,13 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env,
 
        if (dlmlock->l_ast_data != NULL) {
                obj = osc2cl(dlmlock->l_ast_data);
-               dlmlock->l_ast_data = NULL;
-
                cl_object_get(obj);
        }
 
        unlock_res_and_lock(dlmlock);
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_CANCEL, 5);
+
        /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or
         * the object has been destroyed. */
        if (obj != NULL) {
@@ -441,6 +445,9 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env,
 
                /* losing a lock, update kms */
                lock_res_and_lock(dlmlock);
+               /* clearing l_ast_data after flushing data,
+                * to let glimpse ast find the lock and the object */
+               dlmlock->l_ast_data = NULL;
                cl_object_attr_lock(obj);
                /* Must get the value under the lock to avoid race. */
                old_kms = cl2osc(obj)->oo_oinfo->loi_kms;
@@ -554,6 +561,10 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
        struct ost_lvb          *lvb;
        struct req_capsule      *cap;
        struct cl_object        *obj = NULL;
+       struct ldlm_resource    *res = dlmlock->l_resource;
+       struct ldlm_match_data  matchdata = { 0 };
+       union ldlm_policy_data  policy;
+       enum ldlm_mode          mode = LCK_PW | LCK_GROUP | LCK_PR;
        int                     result;
        __u16                   refcheck;
 
@@ -565,13 +576,39 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
        if (IS_ERR(env))
                GOTO(out, result = PTR_ERR(env));
 
+       policy.l_extent.start = 0;
+       policy.l_extent.end = LUSTRE_EOF;
 
-       lock_res_and_lock(dlmlock);
-       if (dlmlock->l_ast_data != NULL) {
-               obj = osc2cl(dlmlock->l_ast_data);
-               cl_object_get(obj);
+       matchdata.lmd_mode = &mode;
+       matchdata.lmd_policy = &policy;
+       matchdata.lmd_flags = LDLM_FL_TEST_LOCK | LDLM_FL_CBPENDING;
+       matchdata.lmd_match = LDLM_MATCH_UNREF | LDLM_MATCH_AST_ANY;
+
+       LDLM_LOCK_GET(dlmlock);
+
+       /* If any dlmlock has l_ast_data set, we must find it or we risk
+        * missing a size update done under a different lock.
+        */
+       while (dlmlock) {
+               lock_res_and_lock(dlmlock);
+               if (dlmlock->l_ast_data) {
+                       obj = osc2cl(dlmlock->l_ast_data);
+                       cl_object_get(obj);
+               }
+               unlock_res_and_lock(dlmlock);
+               LDLM_LOCK_RELEASE(dlmlock);
+
+               dlmlock = NULL;
+
+               if (obj == NULL && res->lr_type == LDLM_EXTENT) {
+                       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA))
+                               break;
+
+                       lock_res(res);
+                       dlmlock = search_itree(res, &matchdata);
+                       unlock_res(res);
+               }
        }
-       unlock_res_and_lock(dlmlock);
 
        if (obj != NULL) {
                /* Do not grab the mutex of cl_lock for glimpse.
@@ -611,17 +648,17 @@ out:
 }
 EXPORT_SYMBOL(osc_ldlm_glimpse_ast);
 
-static int weigh_cb(const struct lu_env *env, struct cl_io *io,
-                   struct osc_page *ops, void *cbdata)
+static bool weigh_cb(const struct lu_env *env, struct cl_io *io,
+                    struct osc_page *ops, void *cbdata)
 {
        struct cl_page *page = ops->ops_cl.cpl_page;
 
        if (cl_page_is_vmlocked(env, page) || PageDirty(page->cp_vmpage) ||
            PageWriteback(page->cp_vmpage))
-               return CLP_GANG_ABORT;
+               return false;
 
        *(pgoff_t *)cbdata = osc_index(ops) + 1;
-       return CLP_GANG_OKAY;
+       return true;
 }
 
 static unsigned long osc_lock_weight(const struct lu_env *env,
@@ -639,21 +676,17 @@ static unsigned long osc_lock_weight(const struct lu_env *env,
        io->ci_ignore_layout = 1;
        result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
        if (result != 0)
-               RETURN(result);
+               RETURN(1);
 
        page_index = cl_index(obj, start);
-       do {
-               result = osc_page_gang_lookup(env, io, oscobj,
-                                             page_index, cl_index(obj, end),
-                                             weigh_cb, (void *)&page_index);
-               if (result == CLP_GANG_ABORT)
-                       break;
-               if (result == CLP_GANG_RESCHED)
-                       cond_resched();
-       } while (result != CLP_GANG_OKAY);
+
+       if (!osc_page_gang_lookup(env, io, oscobj,
+                                 page_index, cl_index(obj, end),
+                                 weigh_cb, (void *)&page_index))
+               result = 1;
        cl_io_fini(env, io);
 
-       return result == CLP_GANG_ABORT ? 1 : 0;
+       return result;
 }
 
 /**
@@ -693,7 +726,7 @@ unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
        unlock_res_and_lock(dlmlock);
 
        if (obj == NULL)
-               GOTO(out, weight = 1);
+               GOTO(out, weight = 0);
 
        spin_lock(&obj->oo_ol_spin);
        list_for_each_entry(oscl, &obj->oo_ol_list, ols_nextlock_oscobj) {
@@ -943,10 +976,10 @@ static int osc_lock_enqueue(const struct lu_env *env,
                RETURN(0);
 
        if ((oscl->ols_flags & LDLM_FL_NO_EXPANSION) &&
-           !(exp_connect_lockahead_old(exp) || exp_connect_lockahead(exp))) {
+           !exp_connect_lockahead(exp)) {
                result = -EOPNOTSUPP;
-               CERROR("%s: server does not support lockahead/locknoexpand:"
-                      "rc = %d\n", exp->exp_obd->obd_name, result);
+               CERROR("%s: server does not support lockahead/locknoexpand: rc = %d\n",
+                      exp->exp_obd->obd_name, result);
                RETURN(result);
        }
 
@@ -1004,7 +1037,6 @@ enqueue_base:
        }
        result = osc_enqueue_base(exp, resname, &oscl->ols_flags,
                                  policy, &oscl->ols_lvb,
-                                 osc->oo_oinfo->loi_kms_valid,
                                  upcall, cookie,
                                  &oscl->ols_einfo, PTLRPCD_SET, async,
                                  oscl->ols_speculative);
@@ -1244,6 +1276,7 @@ struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env,
        struct ldlm_lock *lock = NULL;
        enum ldlm_mode mode;
        __u64 flags;
+       enum ldlm_match_flags match_flags = 0;
 
        ENTRY;
 
@@ -1254,14 +1287,24 @@ struct ldlm_lock *osc_obj_dlmlock_at_pgoff(const struct lu_env *env,
        flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
        if (dap_flags & OSC_DAP_FL_TEST_LOCK)
                flags |= LDLM_FL_TEST_LOCK;
+
+       if (dap_flags & OSC_DAP_FL_AST)
+               match_flags |= LDLM_MATCH_AST;
+
+       if (dap_flags & OSC_DAP_FL_CANCELING)
+               match_flags |= LDLM_MATCH_UNREF;
+
+       if (dap_flags & OSC_DAP_FL_RIGHT)
+               match_flags |= LDLM_MATCH_RIGHT;
+
        /*
         * It is fine to match any group lock since there could be only one
         * with a uniq gid and it conflicts with all other lock modes too
         */
 again:
-       mode = osc_match_base(osc_export(obj), resname, LDLM_EXTENT, policy,
-                              LCK_PR | LCK_PW | LCK_GROUP, &flags, obj, &lockh,
-                              dap_flags & OSC_DAP_FL_CANCELING);
+       mode = osc_match_base(env, osc_export(obj), resname, LDLM_EXTENT,
+                             policy, LCK_PR | LCK_PW | LCK_GROUP, &flags,
+                             obj, &lockh, match_flags);
        if (mode != 0) {
                lock = ldlm_handle2lock(&lockh);
                /* RACE: the lock is cancelled so let's try again */