Whamcloud - gitweb
LU-12485 obdclass: 0-nlink race in lu_object_find_at() 60/35360/11
authorLai Siyao <lai.siyao@whamcloud.com>
Fri, 28 Jun 2019 12:19:56 +0000 (20:19 +0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 15 Aug 2019 07:54:08 +0000 (07:54 +0000)
There is a race in lu_object_find_at: in the gap between
lu_object_alloc() and hash insertion, another thread may
have allocated another object for the same file and unlinked
it, so we may get an object with 0-nlink, which will trigger
assertion in osd_object_release().

To avoid such race, initialize object after hash insertion.
But this may cause an unitialized object found in cache, if
so, wait for the object initialized by the allocator.

To reproduce the race, introduced cfs_race_wait() and
cfs_race_wakeup(): cfs_race_wait() will cause the thread that
calls it wait on the race; while cfs_race_wakeup() will wake
up the waiting thread. Same as cfs_race(), CFS_FAIL_ONCE
should be set together with fail_loc.

Add sanityn test_84.

Signed-off-by: Lai Siyao <lai.siyao@whamcloud.com>
Change-Id: I0869f254544256987b73f0ff92f75e4d1562e566
Reviewed-on: https://review.whamcloud.com/35360
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Mike Pershin <mpershin@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
libcfs/include/libcfs/libcfs_fail.h
lustre/include/lu_object.h
lustre/include/obd_support.h
lustre/mdt/mdt_reint.c
lustre/obdclass/lu_object.c
lustre/tests/sanityn.sh

index d3651f8..79eda2d 100644 (file)
@@ -169,7 +169,7 @@ static inline void cfs_race(__u32 id)
                        CERROR("cfs_race id %x sleeping\n", id);
                        rc = wait_event_interruptible(cfs_race_waitq,
                                                      cfs_race_state != 0);
                        CERROR("cfs_race id %x sleeping\n", id);
                        rc = wait_event_interruptible(cfs_race_waitq,
                                                      cfs_race_state != 0);
-                       CERROR("cfs_fail_race id %x awake, rc=%d\n", id, rc);
+                       CERROR("cfs_fail_race id %x awake: rc=%d\n", id, rc);
                } else {
                        CERROR("cfs_fail_race id %x waking\n", id);
                        cfs_race_state = 1;
                } else {
                        CERROR("cfs_fail_race id %x waking\n", id);
                        cfs_race_state = 1;
@@ -179,4 +179,42 @@ static inline void cfs_race(__u32 id)
 }
 #define CFS_RACE(id) cfs_race(id)
 
 }
 #define CFS_RACE(id) cfs_race(id)
 
+/**
+ * Wait on race.
+ *
+ * The first thread that calls this with a matching fail_loc is put to sleep,
+ * but subseqent callers of this won't sleep. Until another thread that calls
+ * cfs_race_wakeup(), the first thread will be woken up and continue.
+ */
+static inline void cfs_race_wait(__u32 id)
+{
+       if (CFS_FAIL_PRECHECK(id)) {
+               if (unlikely(__cfs_fail_check_set(id, 0, CFS_FAIL_LOC_NOSET))) {
+                       int rc;
+
+                       cfs_race_state = 0;
+                       CERROR("cfs_race id %x sleeping\n", id);
+                       rc = wait_event_interruptible(cfs_race_waitq,
+                                                     cfs_race_state != 0);
+                       CERROR("cfs_fail_race id %x awake: rc=%d\n", id, rc);
+               }
+       }
+}
+#define CFS_RACE_WAIT(id) cfs_race_wait(id)
+
+/**
+ * Wake up the thread that is waiting on the matching fail_loc.
+ */
+static inline void cfs_race_wakeup(__u32 id)
+{
+       if (CFS_FAIL_PRECHECK(id)) {
+               if (likely(!__cfs_fail_check_set(id, 0, CFS_FAIL_LOC_NOSET))) {
+                       CERROR("cfs_fail_race id %x waking\n", id);
+                       cfs_race_state = 1;
+                       wake_up(&cfs_race_waitq);
+               }
+       }
+}
+#define CFS_RACE_WAKEUP(id) cfs_race_wakeup(id)
+
 #endif /* _LIBCFS_FAIL_H */
 #endif /* _LIBCFS_FAIL_H */
index 4a7df97..0f65f42 100644 (file)
@@ -466,7 +466,12 @@ enum lu_object_header_flags {
        /**
         * Mark this object has already been taken out of cache.
         */
        /**
         * Mark this object has already been taken out of cache.
         */
-       LU_OBJECT_UNHASHED = 1,
+       LU_OBJECT_UNHASHED      = 1,
+       /**
+        * Object is initialized, when object is found in cache, it may not be
+        * intialized yet, the object allocator will initialize it.
+        */
+       LU_OBJECT_INITED        = 2
 };
 
 enum lu_object_header_attr {
 };
 
 enum lu_object_header_attr {
@@ -667,6 +672,14 @@ static inline int lu_object_is_dying(const struct lu_object_header *h)
        return test_bit(LU_OBJECT_HEARD_BANSHEE, &h->loh_flags);
 }
 
        return test_bit(LU_OBJECT_HEARD_BANSHEE, &h->loh_flags);
 }
 
+/**
+ * Return true if object is initialized.
+ */
+static inline int lu_object_is_inited(const struct lu_object_header *h)
+{
+       return test_bit(LU_OBJECT_INITED, &h->loh_flags);
+}
+
 void lu_object_put(const struct lu_env *env, struct lu_object *o);
 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o);
 void lu_object_unhash(const struct lu_env *env, struct lu_object *o);
 void lu_object_put(const struct lu_env *env, struct lu_object *o);
 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o);
 void lu_object_unhash(const struct lu_env *env, struct lu_object *o);
index 73d3191..9ac5de0 100644 (file)
@@ -461,6 +461,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_OBD_IDX_READ_BREAK     0x608
 #define OBD_FAIL_OBD_NO_LRU             0x609
 #define OBD_FAIL_OBDCLASS_MODULE_LOAD   0x60a
 #define OBD_FAIL_OBD_IDX_READ_BREAK     0x608
 #define OBD_FAIL_OBD_NO_LRU             0x609
 #define OBD_FAIL_OBDCLASS_MODULE_LOAD   0x60a
+#define OBD_FAIL_OBD_ZERO_NLINK_RACE    0x60b
 
 #define OBD_FAIL_TGT_REPLY_NET           0x700
 #define OBD_FAIL_TGT_CONN_RACE           0x701
 
 #define OBD_FAIL_TGT_REPLY_NET           0x700
 #define OBD_FAIL_TGT_CONN_RACE           0x701
index 452a006..ef6446a 100644 (file)
@@ -1101,6 +1101,7 @@ unlock_parent:
        mdt_object_unlock(info, mp, parent_lh, rc);
 put_parent:
        mdt_object_put(info->mti_env, mp);
        mdt_object_unlock(info, mp, parent_lh, rc);
 put_parent:
        mdt_object_put(info->mti_env, mp);
+       CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
         return rc;
 }
 
         return rc;
 }
 
index 0af0a10..3d09e47 100644 (file)
@@ -70,13 +70,14 @@ struct lu_site_bkt_data {
        struct list_head                lsb_lru;
        /**
         * Wait-queue signaled when an object in this site is ultimately
        struct list_head                lsb_lru;
        /**
         * Wait-queue signaled when an object in this site is ultimately
-        * destroyed (lu_object_free()). It is used by lu_object_find() to
-        * wait before re-trying when object in the process of destruction is
-        * found in the hash table.
+        * destroyed (lu_object_free()) or initialized (lu_object_start()).
+        * It is used by lu_object_find() to wait before re-trying when
+        * object in the process of destruction is found in the hash table;
+        * or wait object to be initialized by the allocator.
         *
         * \see htable_lookup().
         */
         *
         * \see htable_lookup().
         */
-       wait_queue_head_t               lsb_marche_funebre;
+       wait_queue_head_t               lsb_waitq;
 };
 
 enum {
 };
 
 enum {
@@ -121,7 +122,7 @@ lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
 
        cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
        bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
 
        cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
        bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
-       return &bkt->lsb_marche_funebre;
+       return &bkt->lsb_waitq;
 }
 EXPORT_SYMBOL(lu_site_wq_from_fid);
 
 }
 EXPORT_SYMBOL(lu_site_wq_from_fid);
 
@@ -174,7 +175,7 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                         * somebody may be waiting for this, currently only
                         * used for cl_object, see cl_object_put_last().
                         */
                         * somebody may be waiting for this, currently only
                         * used for cl_object, see cl_object_put_last().
                         */
-                       wake_up_all(&bkt->lsb_marche_funebre);
+                       wake_up_all(&bkt->lsb_waitq);
                }
                return;
        }
                }
                return;
        }
@@ -273,17 +274,9 @@ EXPORT_SYMBOL(lu_object_unhash);
  */
 static struct lu_object *lu_object_alloc(const struct lu_env *env,
                                         struct lu_device *dev,
  */
 static struct lu_object *lu_object_alloc(const struct lu_env *env,
                                         struct lu_device *dev,
-                                        const struct lu_fid *f,
-                                        const struct lu_object_conf *conf)
+                                        const struct lu_fid *f)
 {
 {
-       struct lu_object *scan;
        struct lu_object *top;
        struct lu_object *top;
-       struct list_head *layers;
-       unsigned int init_mask = 0;
-       unsigned int init_flag;
-       int clean;
-       int result;
-       ENTRY;
 
        /*
         * Create top-level object slice. This will also create
 
        /*
         * Create top-level object slice. This will also create
@@ -291,15 +284,36 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
         */
        top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
        if (top == NULL)
         */
        top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
        if (top == NULL)
-               RETURN(ERR_PTR(-ENOMEM));
+               return ERR_PTR(-ENOMEM);
        if (IS_ERR(top))
        if (IS_ERR(top))
-               RETURN(top);
-        /*
-         * This is the only place where object fid is assigned. It's constant
-         * after this point.
-         */
-        top->lo_header->loh_fid = *f;
-        layers = &top->lo_header->loh_layers;
+               return top;
+       /*
+        * This is the only place where object fid is assigned. It's constant
+        * after this point.
+        */
+       top->lo_header->loh_fid = *f;
+
+       return top;
+}
+
+/**
+ * Initialize object.
+ *
+ * This is called after object hash insertion to avoid returning an object with
+ * stale attributes.
+ */
+static int lu_object_start(const struct lu_env *env, struct lu_device *dev,
+                          struct lu_object *top,
+                          const struct lu_object_conf *conf)
+{
+       struct lu_object *scan;
+       struct list_head *layers;
+       unsigned int init_mask = 0;
+       unsigned int init_flag;
+       int clean;
+       int result;
+
+       layers = &top->lo_header->loh_layers;
 
        do {
                /*
 
        do {
                /*
@@ -314,10 +328,9 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
                        clean = 0;
                        scan->lo_header = top->lo_header;
                        result = scan->lo_ops->loo_object_init(env, scan, conf);
                        clean = 0;
                        scan->lo_header = top->lo_header;
                        result = scan->lo_ops->loo_object_init(env, scan, conf);
-                       if (result != 0) {
-                               lu_object_free(env, top);
-                               RETURN(ERR_PTR(result));
-                       }
+                       if (result)
+                               return result;
+
                        init_mask |= init_flag;
 next:
                        init_flag <<= 1;
                        init_mask |= init_flag;
 next:
                        init_flag <<= 1;
@@ -325,17 +338,18 @@ next:
        } while (!clean);
 
        list_for_each_entry_reverse(scan, layers, lo_linkage) {
        } while (!clean);
 
        list_for_each_entry_reverse(scan, layers, lo_linkage) {
-                if (scan->lo_ops->loo_object_start != NULL) {
-                        result = scan->lo_ops->loo_object_start(env, scan);
-                        if (result != 0) {
-                                lu_object_free(env, top);
-                                RETURN(ERR_PTR(result));
-                        }
-                }
-        }
+               if (scan->lo_ops->loo_object_start != NULL) {
+                       result = scan->lo_ops->loo_object_start(env, scan);
+                       if (result)
+                               return result;
+               }
+       }
+
+       lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
 
 
-        lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
-        RETURN(top);
+       set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags);
+
+       return 0;
 }
 
 /**
 }
 
 /**
@@ -633,7 +647,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
                                       const struct lu_fid *f,
                                       __u64 *version)
 {
                                       const struct lu_fid *f,
                                       __u64 *version)
 {
-       struct lu_site_bkt_data *bkt;
        struct lu_object_header *h;
        struct hlist_node *hnode;
        __u64 ver = cfs_hash_bd_version_get(bd);
        struct lu_object_header *h;
        struct hlist_node *hnode;
        __u64 ver = cfs_hash_bd_version_get(bd);
@@ -642,7 +655,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
                return ERR_PTR(-ENOENT);
 
        *version = ver;
                return ERR_PTR(-ENOENT);
 
        *version = ver;
-       bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
        /* cfs_hash_bd_peek_locked is a somehow "internal" function
         * of cfs_hash, it doesn't add refcount on object. */
        hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
        /* cfs_hash_bd_peek_locked is a somehow "internal" function
         * of cfs_hash, it doesn't add refcount on object. */
        hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
@@ -715,7 +727,12 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
        struct lu_site *s;
        struct cfs_hash *hs;
        struct cfs_hash_bd bd;
        struct lu_site *s;
        struct cfs_hash *hs;
        struct cfs_hash_bd bd;
+       struct lu_site_bkt_data *bkt;
+       struct l_wait_info lwi = { 0 };
        __u64 version = 0;
        __u64 version = 0;
+       int rc;
+
+       ENTRY;
 
        /*
         * This uses standard index maintenance protocol:
 
        /*
         * This uses standard index maintenance protocol:
@@ -737,25 +754,51 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
         */
        s  = dev->ld_site;
        hs = s->ls_obj_hash;
         */
        s  = dev->ld_site;
        hs = s->ls_obj_hash;
+
+       if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
+               lu_site_purge(env, s, -1);
+
        cfs_hash_bd_get(hs, f, &bd);
        cfs_hash_bd_get(hs, f, &bd);
+       bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
        if (!(conf && conf->loc_flags & LOC_F_NEW)) {
                cfs_hash_bd_lock(hs, &bd, 1);
                o = htable_lookup(s, &bd, f, &version);
                cfs_hash_bd_unlock(hs, &bd, 1);
 
        if (!(conf && conf->loc_flags & LOC_F_NEW)) {
                cfs_hash_bd_lock(hs, &bd, 1);
                o = htable_lookup(s, &bd, f, &version);
                cfs_hash_bd_unlock(hs, &bd, 1);
 
-               if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
-                       return o;
+               if (!IS_ERR(o)) {
+                       if (likely(lu_object_is_inited(o->lo_header)))
+                               RETURN(o);
+
+                       l_wait_event(bkt->lsb_waitq,
+                                    lu_object_is_inited(o->lo_header) ||
+                                    lu_object_is_dying(o->lo_header), &lwi);
+
+                       if (lu_object_is_dying(o->lo_header)) {
+                               lu_object_put(env, o);
+
+                               RETURN(ERR_PTR(-ENOENT));
+                       }
+
+                       RETURN(o);
+               }
+
+               if (PTR_ERR(o) != -ENOENT)
+                       RETURN(o);
        }
        }
+
        /*
        /*
-        * Allocate new object. This may result in rather complicated
-        * operations, including fld queries, inode loading, etc.
+        * Allocate new object, NB, object is unitialized in case object
+        * is changed between allocation and hash insertion, thus the object
+        * with stale attributes is returned.
         */
         */
-       o = lu_object_alloc(env, dev, f, conf);
+       o = lu_object_alloc(env, dev, f);
        if (IS_ERR(o))
        if (IS_ERR(o))
-               return o;
+               RETURN(o);
 
        LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
 
        LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
+       CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
+
        cfs_hash_bd_lock(hs, &bd, 1);
 
        if (conf && conf->loc_flags & LOC_F_NEW)
        cfs_hash_bd_lock(hs, &bd, 1);
 
        if (conf && conf->loc_flags & LOC_F_NEW)
@@ -766,15 +809,41 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
                cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
                cfs_hash_bd_unlock(hs, &bd, 1);
 
                cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
                cfs_hash_bd_unlock(hs, &bd, 1);
 
+               /*
+                * This may result in rather complicated operations, including
+                * fld queries, inode loading, etc.
+                */
+               rc = lu_object_start(env, dev, o, conf);
+               if (rc) {
+                       lu_object_put_nocache(env, o);
+                       RETURN(ERR_PTR(rc));
+               }
+
+               wake_up_all(&bkt->lsb_waitq);
+
                lu_object_limit(env, dev);
 
                lu_object_limit(env, dev);
 
-               return o;
+               RETURN(o);
        }
 
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
        cfs_hash_bd_unlock(hs, &bd, 1);
        lu_object_free(env, o);
        }
 
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
        cfs_hash_bd_unlock(hs, &bd, 1);
        lu_object_free(env, o);
-       return shadow;
+
+       if (!(conf && conf->loc_flags & LOC_F_NEW) &&
+           !lu_object_is_inited(shadow->lo_header)) {
+               l_wait_event(bkt->lsb_waitq,
+                            lu_object_is_inited(shadow->lo_header) ||
+                            lu_object_is_dying(shadow->lo_header), &lwi);
+
+               if (lu_object_is_dying(shadow->lo_header)) {
+                       lu_object_put(env, shadow);
+
+                       RETURN(ERR_PTR(-ENOENT));
+               }
+       }
+
+       RETURN(shadow);
 }
 EXPORT_SYMBOL(lu_object_find_at);
 
 }
 EXPORT_SYMBOL(lu_object_find_at);
 
@@ -1061,7 +1130,7 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
                bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
                INIT_LIST_HEAD(&bkt->lsb_lru);
        cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
                bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
                INIT_LIST_HEAD(&bkt->lsb_lru);
-               init_waitqueue_head(&bkt->lsb_marche_funebre);
+               init_waitqueue_head(&bkt->lsb_waitq);
        }
 
         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
        }
 
         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
@@ -2387,11 +2456,19 @@ struct lu_object *lu_object_anon(const struct lu_env *env,
                                 struct lu_device *dev,
                                 const struct lu_object_conf *conf)
 {
                                 struct lu_device *dev,
                                 const struct lu_object_conf *conf)
 {
-       struct lu_fid     fid;
+       struct lu_fid fid;
        struct lu_object *o;
        struct lu_object *o;
+       int rc;
 
        fid_zero(&fid);
 
        fid_zero(&fid);
-       o = lu_object_alloc(env, dev, &fid, conf);
+       o = lu_object_alloc(env, dev, &fid);
+       if (!IS_ERR(o)) {
+               rc = lu_object_start(env, dev, o, conf);
+               if (rc) {
+                       lu_object_free(env, o);
+                       return ERR_PTR(rc);
+               }
+       }
 
        return o;
 }
 
        return o;
 }
index 516c390..1accf26 100755 (executable)
@@ -4368,6 +4368,31 @@ test_83() {
 }
 run_test 83 "access striped directory while it is being created/unlinked"
 
 }
 run_test 83 "access striped directory while it is being created/unlinked"
 
+test_84() {
+       [ $MDS1_VERSION -lt $(version_code 2.12.55) ] &&
+               skip "lustre < 2.12.55 does not contain LU-12485 fix"
+
+       local mtime
+
+       $MULTIOP $DIR/$tfile oO_RDWR:O_CREAT:O_LOV_DELAY_CREATE:c ||
+               error "create $tfile failed"
+       mtime=$(stat -c%Y $DIR/$tfile)
+       mtime=$((mtime + 200))
+
+       #define OBD_FAIL_OBD_0NLINK_RACE  0x60b
+       do_facet mds1 $LCTL set_param fail_loc=0x8000060b
+
+       touch -c -m $mtime $DIR/$tfile &
+       setattr_pid=$!
+       # sleep a while to let 'touch' run first
+       sleep 5
+       rm -f $DIR2/$tfile || error "unlink $tfile failed"
+
+       # touch may fail
+       wait $setattr_pid || true
+}
+run_test 84 "0-nlink race in lu_object_find()"
+
 test_90() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
        local pid1
 test_90() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
        local pid1