Whamcloud - gitweb
LU-12485 obdclass: 0-nlink race in lu_object_find_at()
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 0af0a10..3d09e47 100644 (file)
@@ -70,13 +70,14 @@ struct lu_site_bkt_data {
        struct list_head                lsb_lru;
        /**
         * Wait-queue signaled when an object in this site is ultimately
-        * destroyed (lu_object_free()). It is used by lu_object_find() to
-        * wait before re-trying when object in the process of destruction is
-        * found in the hash table.
+        * destroyed (lu_object_free()) or initialized (lu_object_start()).
+        * It is used by lu_object_find() to wait before re-trying when
+        * object in the process of destruction is found in the hash table;
+        * or wait object to be initialized by the allocator.
         *
         * \see htable_lookup().
         */
-       wait_queue_head_t               lsb_marche_funebre;
+       wait_queue_head_t               lsb_waitq;
 };
 
 enum {
@@ -121,7 +122,7 @@ lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
 
        cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
        bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
-       return &bkt->lsb_marche_funebre;
+       return &bkt->lsb_waitq;
 }
 EXPORT_SYMBOL(lu_site_wq_from_fid);
 
@@ -174,7 +175,7 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                         * somebody may be waiting for this, currently only
                         * used for cl_object, see cl_object_put_last().
                         */
-                       wake_up_all(&bkt->lsb_marche_funebre);
+                       wake_up_all(&bkt->lsb_waitq);
                }
                return;
        }
@@ -273,17 +274,9 @@ EXPORT_SYMBOL(lu_object_unhash);
  */
 static struct lu_object *lu_object_alloc(const struct lu_env *env,
                                         struct lu_device *dev,
-                                        const struct lu_fid *f,
-                                        const struct lu_object_conf *conf)
+                                        const struct lu_fid *f)
 {
-       struct lu_object *scan;
        struct lu_object *top;
-       struct list_head *layers;
-       unsigned int init_mask = 0;
-       unsigned int init_flag;
-       int clean;
-       int result;
-       ENTRY;
 
        /*
         * Create top-level object slice. This will also create
@@ -291,15 +284,36 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
         */
        top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
        if (top == NULL)
-               RETURN(ERR_PTR(-ENOMEM));
+               return ERR_PTR(-ENOMEM);
        if (IS_ERR(top))
-               RETURN(top);
-        /*
-         * This is the only place where object fid is assigned. It's constant
-         * after this point.
-         */
-        top->lo_header->loh_fid = *f;
-        layers = &top->lo_header->loh_layers;
+               return top;
+       /*
+        * This is the only place where object fid is assigned. It's constant
+        * after this point.
+        */
+       top->lo_header->loh_fid = *f;
+
+       return top;
+}
+
+/**
+ * Initialize object.
+ *
+ * This is called after object hash insertion to avoid returning an object with
+ * stale attributes.
+ */
+static int lu_object_start(const struct lu_env *env, struct lu_device *dev,
+                          struct lu_object *top,
+                          const struct lu_object_conf *conf)
+{
+       struct lu_object *scan;
+       struct list_head *layers;
+       unsigned int init_mask = 0;
+       unsigned int init_flag;
+       int clean;
+       int result;
+
+       layers = &top->lo_header->loh_layers;
 
        do {
                /*
@@ -314,10 +328,9 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
                        clean = 0;
                        scan->lo_header = top->lo_header;
                        result = scan->lo_ops->loo_object_init(env, scan, conf);
-                       if (result != 0) {
-                               lu_object_free(env, top);
-                               RETURN(ERR_PTR(result));
-                       }
+                       if (result)
+                               return result;
+
                        init_mask |= init_flag;
 next:
                        init_flag <<= 1;
@@ -325,17 +338,18 @@ next:
        } while (!clean);
 
        list_for_each_entry_reverse(scan, layers, lo_linkage) {
-                if (scan->lo_ops->loo_object_start != NULL) {
-                        result = scan->lo_ops->loo_object_start(env, scan);
-                        if (result != 0) {
-                                lu_object_free(env, top);
-                                RETURN(ERR_PTR(result));
-                        }
-                }
-        }
+               if (scan->lo_ops->loo_object_start != NULL) {
+                       result = scan->lo_ops->loo_object_start(env, scan);
+                       if (result)
+                               return result;
+               }
+       }
+
+       lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
 
-        lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
-        RETURN(top);
+       set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags);
+
+       return 0;
 }
 
 /**
@@ -633,7 +647,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
                                       const struct lu_fid *f,
                                       __u64 *version)
 {
-       struct lu_site_bkt_data *bkt;
        struct lu_object_header *h;
        struct hlist_node *hnode;
        __u64 ver = cfs_hash_bd_version_get(bd);
@@ -642,7 +655,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
                return ERR_PTR(-ENOENT);
 
        *version = ver;
-       bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
        /* cfs_hash_bd_peek_locked is a somehow "internal" function
         * of cfs_hash, it doesn't add refcount on object. */
        hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
@@ -715,7 +727,12 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
        struct lu_site *s;
        struct cfs_hash *hs;
        struct cfs_hash_bd bd;
+       struct lu_site_bkt_data *bkt;
+       struct l_wait_info lwi = { 0 };
        __u64 version = 0;
+       int rc;
+
+       ENTRY;
 
        /*
         * This uses standard index maintenance protocol:
@@ -737,25 +754,51 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
         */
        s  = dev->ld_site;
        hs = s->ls_obj_hash;
+
+       if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
+               lu_site_purge(env, s, -1);
+
        cfs_hash_bd_get(hs, f, &bd);
+       bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
        if (!(conf && conf->loc_flags & LOC_F_NEW)) {
                cfs_hash_bd_lock(hs, &bd, 1);
                o = htable_lookup(s, &bd, f, &version);
                cfs_hash_bd_unlock(hs, &bd, 1);
 
-               if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
-                       return o;
+               if (!IS_ERR(o)) {
+                       if (likely(lu_object_is_inited(o->lo_header)))
+                               RETURN(o);
+
+                       l_wait_event(bkt->lsb_waitq,
+                                    lu_object_is_inited(o->lo_header) ||
+                                    lu_object_is_dying(o->lo_header), &lwi);
+
+                       if (lu_object_is_dying(o->lo_header)) {
+                               lu_object_put(env, o);
+
+                               RETURN(ERR_PTR(-ENOENT));
+                       }
+
+                       RETURN(o);
+               }
+
+               if (PTR_ERR(o) != -ENOENT)
+                       RETURN(o);
        }
+
        /*
-        * Allocate new object. This may result in rather complicated
-        * operations, including fld queries, inode loading, etc.
+        * Allocate new object, NB, object is unitialized in case object
+        * is changed between allocation and hash insertion, thus the object
+        * with stale attributes is returned.
         */
-       o = lu_object_alloc(env, dev, f, conf);
+       o = lu_object_alloc(env, dev, f);
        if (IS_ERR(o))
-               return o;
+               RETURN(o);
 
        LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
+       CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
+
        cfs_hash_bd_lock(hs, &bd, 1);
 
        if (conf && conf->loc_flags & LOC_F_NEW)
@@ -766,15 +809,41 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
                cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
                cfs_hash_bd_unlock(hs, &bd, 1);
 
+               /*
+                * This may result in rather complicated operations, including
+                * fld queries, inode loading, etc.
+                */
+               rc = lu_object_start(env, dev, o, conf);
+               if (rc) {
+                       lu_object_put_nocache(env, o);
+                       RETURN(ERR_PTR(rc));
+               }
+
+               wake_up_all(&bkt->lsb_waitq);
+
                lu_object_limit(env, dev);
 
-               return o;
+               RETURN(o);
        }
 
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
        cfs_hash_bd_unlock(hs, &bd, 1);
        lu_object_free(env, o);
-       return shadow;
+
+       if (!(conf && conf->loc_flags & LOC_F_NEW) &&
+           !lu_object_is_inited(shadow->lo_header)) {
+               l_wait_event(bkt->lsb_waitq,
+                            lu_object_is_inited(shadow->lo_header) ||
+                            lu_object_is_dying(shadow->lo_header), &lwi);
+
+               if (lu_object_is_dying(shadow->lo_header)) {
+                       lu_object_put(env, shadow);
+
+                       RETURN(ERR_PTR(-ENOENT));
+               }
+       }
+
+       RETURN(shadow);
 }
 EXPORT_SYMBOL(lu_object_find_at);
 
@@ -1061,7 +1130,7 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
                bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
                INIT_LIST_HEAD(&bkt->lsb_lru);
-               init_waitqueue_head(&bkt->lsb_marche_funebre);
+               init_waitqueue_head(&bkt->lsb_waitq);
        }
 
         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
@@ -2387,11 +2456,19 @@ struct lu_object *lu_object_anon(const struct lu_env *env,
                                 struct lu_device *dev,
                                 const struct lu_object_conf *conf)
 {
-       struct lu_fid     fid;
+       struct lu_fid fid;
        struct lu_object *o;
+       int rc;
 
        fid_zero(&fid);
-       o = lu_object_alloc(env, dev, &fid, conf);
+       o = lu_object_alloc(env, dev, &fid);
+       if (!IS_ERR(o)) {
+               rc = lu_object_start(env, dev, o, conf);
+               if (rc) {
+                       lu_object_free(env, o);
+                       return ERR_PTR(rc);
+               }
+       }
 
        return o;
 }