Whamcloud - gitweb
LU-10406 mdt: invalidate cache upon LDLM lock 94/31194/33
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Wed, 7 Feb 2018 08:51:54 +0000 (11:51 +0300)
committerOleg Drokin <green@whamcloud.com>
Tue, 13 Nov 2018 06:14:11 +0000 (06:14 +0000)
given new components (like LFSCK) can access storage w/o LDLM
locking, it's possible to find cache stale. so MDT should
invalidate local cache explicitly once LDLM lock is granted.

also, OSP should invdalite any state requested before invalidation
request as LDLM lock and request can be handled in different
order on the target and source MDTs. for example, get_attr was
sent first, the target MDT serviced it first, but the reply got
delayed for a reason. in the meantime LDLM lock was requested,
granted and received by the source MDT before reply for get_attr.
IOW, we get a state with old (potentially stale) state and LDLM
lock granted.

Change-Id: I7b96465d98960617c96a45accfb30dbfbd0e1576
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-on: https://review.whamcloud.com/31194
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/lfsck/lfsck_lib.c
lustre/mdt/mdt_handler.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/tests/sanity-lfsck.sh

index 6d146a0..80cb8eb 100644 (file)
@@ -379,6 +379,12 @@ static int __lfsck_ibits_lock(const struct lu_env *env,
                einfo->ei_res_id = resid;
 
                rc = dt_object_lock(env, obj, lh, einfo, policy);
+               /* for regular checks LFSCK doesn't use LDLM locking,
+                * so the state isn't coherent. here we just took LDLM
+                * lock for coherency and it's time to invalidate
+                * previous state */
+               if (rc == ELDLM_OK)
+                       dt_invalidate(env, obj);
        } else {
                rc = ldlm_cli_enqueue_local(env, lfsck->li_namespace, resid,
                                            LDLM_IBITS, policy, mode,
index da37f72..fd46f71 100644 (file)
@@ -3051,6 +3051,10 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
                }
        }
 
+       /* other components like LFSCK can use lockless access
+        * and populate cache, so we better invalidate it */
+       mo_invalidate(info->mti_env, mdt_object_child(o));
+
        RETURN(0);
 }
 
index f5f44c5..09ddfac 100644 (file)
@@ -318,6 +318,9 @@ struct osp_object {
        struct list_head        opo_invalidate_cb_list;
        /* Protect opo_ooa. */
        spinlock_t              opo_lock;
+       /* to implement in-flight invalidation */
+       atomic_t                opo_invalidate_seq;
+       struct rw_semaphore     opo_invalidate_sem;
 };
 
 extern struct lu_object_operations osp_lu_obj_ops;
index aa77962..fb1c055 100644 (file)
@@ -178,6 +178,7 @@ int osp_md_create(const struct lu_env *env, struct dt_object *dt,
 
        dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
        dt2osp_obj(dt)->opo_non_exist = 0;
+       obj->opo_stale = 0;
 
        obj->opo_attr = *attr;
 out:
index 896ce2e..e327cbb 100644 (file)
@@ -431,11 +431,13 @@ static int osp_get_attr_from_reply(const struct lu_env *env,
                lustre_swab_obdo(wobdo);
 
        lustre_get_wire_obdo(NULL, lobdo, wobdo);
-       spin_lock(&obj->opo_lock);
-       la_from_obdo(&obj->opo_attr, lobdo, lobdo->o_valid);
-       if (attr != NULL)
-               *attr = obj->opo_attr;
-       spin_unlock(&obj->opo_lock);
+       if (obj) {
+               spin_lock(&obj->opo_lock);
+               la_from_obdo(&obj->opo_attr, lobdo, lobdo->o_valid);
+               spin_unlock(&obj->opo_lock);
+       }
+       if (attr)
+               la_from_obdo(attr, lobdo, lobdo->o_valid);
 
        return 0;
 }
@@ -542,7 +544,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
        struct osp_update_request       *update;
        struct object_update_reply      *reply;
        struct ptlrpc_request           *req = NULL;
-       int                             rc = 0;
+       int                             invalidated, cache = 0, rc = 0;
        ENTRY;
 
        if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist)
@@ -568,14 +570,24 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                       dev->dd_lu_dev.ld_obd->obd_name,
                       PFID(lu_object_fid(&dt->do_lu)), rc);
 
-               GOTO(out, rc);
+               GOTO(out_req, rc);
        }
 
+       invalidated = atomic_read(&obj->opo_invalidate_seq);
+
        rc = osp_remote_sync(env, osp, update, &req);
+
+       down_read(&obj->opo_invalidate_sem);
+       if (invalidated == atomic_read(&obj->opo_invalidate_seq)) {
+               /* no invalited has came so far, we can cache the attrs */
+               cache = 1;
+       }
+
        if (rc != 0) {
                if (rc == -ENOENT) {
                        osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
-                       obj->opo_non_exist = 1;
+                       if (cache)
+                               obj->opo_non_exist = 1;
                } else {
                        CERROR("%s:osp_attr_get update error "DFID": rc = %d\n",
                               dev->dd_lu_dev.ld_obd->obd_name,
@@ -593,17 +605,22 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
        if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
                GOTO(out, rc = -EPROTO);
 
-       rc = osp_get_attr_from_reply(env, reply, req, attr, obj, 0);
+       rc = osp_get_attr_from_reply(env, reply, req, attr,
+                                    cache ? obj : NULL, 0);
        if (rc != 0)
                GOTO(out, rc);
 
        spin_lock(&obj->opo_lock);
-       obj->opo_stale = 0;
+       if (cache)
+               obj->opo_stale = 0;
        spin_unlock(&obj->opo_lock);
 
        GOTO(out, rc);
 
 out:
+       up_read(&obj->opo_invalidate_sem);
+
+out_req:
        if (req != NULL)
                ptlrpc_req_finished(req);
 
@@ -938,7 +955,7 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
        struct object_update_reply *reply;
        struct osp_xattr_entry  *oxe    = NULL;
        const char *dname = osp_dto2name(obj);
-       int rc = 0;
+       int invalidated, rc = 0;
        ENTRY;
 
        LASSERT(buf != NULL);
@@ -958,6 +975,8 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
        if (unlikely(obj->opo_non_exist))
                RETURN(-ENOENT);
 
+       invalidated = atomic_read(&obj->opo_invalidate_seq);
+
        oxe = osp_oac_xattr_find(obj, name, false);
        if (oxe != NULL) {
                spin_lock(&obj->opo_lock);
@@ -986,17 +1005,40 @@ unlock:
        }
        update = osp_update_request_create(dev);
        if (IS_ERR(update))
-               GOTO(out, rc = PTR_ERR(update));
+               GOTO(out_req, rc = PTR_ERR(update));
 
        rc = OSP_UPDATE_RPC_PACK(env, out_xattr_get_pack, update,
                                 lu_object_fid(&dt->do_lu), name, buf->lb_len);
        if (rc != 0) {
                CERROR("%s: Insert update error "DFID": rc = %d\n",
                       dname, PFID(lu_object_fid(&dt->do_lu)), rc);
-               GOTO(out, rc);
+               GOTO(out_req, rc);
        }
 
        rc = osp_remote_sync(env, osp, update, &req);
+
+       down_read(&obj->opo_invalidate_sem);
+       if (invalidated != atomic_read(&obj->opo_invalidate_seq)) {
+               /* invalidated has been requested, we can't cache the result */
+               if (rc < 0) {
+                       if (rc == -ENOENT)
+                               dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
+                       GOTO(out, rc);
+               }
+               reply = req_capsule_server_sized_get(&req->rq_pill,
+                                                    &RMF_OUT_UPDATE_REPLY,
+                               OUT_UPDATE_REPLY_SIZE);
+               if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
+                       CERROR("%s: Wrong version %x expected %x "DFID
+                              ": rc = %d\n", dname, reply->ourp_magic,
+                              UPDATE_REPLY_MAGIC,
+                              PFID(lu_object_fid(&dt->do_lu)), -EPROTO);
+                       GOTO(out, rc = -EPROTO);
+               }
+               rc = object_update_result_data_get(reply, rbuf, 0);
+               GOTO(out, rc);
+       }
+
        if (rc < 0) {
                if (rc == -ENOENT) {
                        dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
@@ -1084,6 +1126,9 @@ unlock:
        GOTO(out, rc);
 
 out:
+       up_read(&obj->opo_invalidate_sem);
+
+out_req:
        if (rc > 0 && buf->lb_buf) {
                if (unlikely(buf->lb_len < rbuf->lb_len))
                        rc = -ERANGE;
@@ -1300,12 +1345,24 @@ int osp_invalidate(const struct lu_env *env, struct dt_object *dt)
 
        CDEBUG(D_HA, "Invalidate osp_object "DFID"\n",
               PFID(lu_object_fid(&dt->do_lu)));
-       osp_obj_invalidate_cache(obj);
+
+       /* serialize attr/EA set vs. invalidation */
+       down_write(&obj->opo_invalidate_sem);
+
+       /* this should invalidate all in-flights */
+       atomic_inc(&obj->opo_invalidate_seq);
 
        spin_lock(&obj->opo_lock);
-       obj->opo_stale = 1;
+       /* do not mark new objects stale */
+       if (obj->opo_attr.la_valid)
+               obj->opo_stale = 1;
+       obj->opo_non_exist = 0;
        spin_unlock(&obj->opo_lock);
 
+       osp_obj_invalidate_cache(obj);
+
+       up_write(&obj->opo_invalidate_sem);
+
        RETURN(0);
 }
 
@@ -2196,6 +2253,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o,
        o->lo_header->loh_attr |= LOHA_REMOTE;
        INIT_LIST_HEAD(&po->opo_xattr_list);
        INIT_LIST_HEAD(&po->opo_invalidate_cb_list);
+       init_rwsem(&po->opo_invalidate_sem);
 
        if (is_ost_obj(o)) {
                po->opo_obj.do_ops = &osp_obj_ops;
index 7619bb0..fdd987f 100644 (file)
@@ -8,8 +8,8 @@ set -e
 
 ONLY=${ONLY:-"$*"}
 
-#Bug number for excepting test      LU-10406
-ALWAYS_EXCEPT="$SANITY_LFSCK_EXCEPT 31c"
+#Bug number for excepting test
+ALWAYS_EXCEPT="$SANITY_LFSCK_EXCEPT"
 
 [ "$SLOW" = "no" ] && EXCEPT_SLOW=""
 # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!