Whamcloud - gitweb
LU-8346 obdclass: Set lc_version
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index cc89029..751e3d1 100644 (file)
@@ -92,16 +92,16 @@ static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
  */
 void lu_object_put(const struct lu_env *env, struct lu_object *o)
 {
-        struct lu_site_bkt_data *bkt;
-        struct lu_object_header *top;
-        struct lu_site          *site;
-        struct lu_object        *orig;
-       struct cfs_hash_bd            bd;
-       const struct lu_fid     *fid;
+       struct lu_site_bkt_data *bkt;
+       struct lu_object_header *top;
+       struct lu_site *site;
+       struct lu_object *orig;
+       struct cfs_hash_bd bd;
+       const struct lu_fid *fid;
 
-        top  = o->lo_header;
-        site = o->lo_dev->ld_site;
-        orig = o;
+       top  = o->lo_header;
+       site = o->lo_dev->ld_site;
+       orig = o;
 
        /*
         * till we have full fids-on-OST implemented anonymous objects
@@ -123,12 +123,11 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                return;
        }
 
-        cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
-        bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
+       cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
+       bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
 
        if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
                if (lu_object_is_dying(top)) {
-
                        /*
                         * somebody may be waiting for this, currently only
                         * used for cl_object, see cl_object_put_last().
@@ -138,14 +137,14 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                return;
        }
 
-        /*
-         * When last reference is released, iterate over object
-         * layers, and notify them that object is no longer busy.
-         */
+       /*
+        * When last reference is released, iterate over object
+        * layers, and notify them that object is no longer busy.
+        */
        list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
-                if (o->lo_ops->loo_object_release != NULL)
-                        o->lo_ops->loo_object_release(env, o);
-        }
+               if (o->lo_ops->loo_object_release != NULL)
+                       o->lo_ops->loo_object_release(env, o);
+       }
 
        if (!lu_object_is_dying(top) &&
            (lu_object_exists(orig) || lu_object_is_cl(orig))) {
@@ -156,29 +155,29 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                CDEBUG(D_INODE, "Add %p to site lru. hash: %p, bkt: %p, "
                       "lru_len: %ld\n",
                       o, site->ls_obj_hash, bkt, bkt->lsb_lru_len);
-                cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
-                return;
-        }
+               cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
+               return;
+       }
 
-        /*
+       /*
         * If object is dying (will not be cached) then remove it
-         * from hash table and LRU.
-         *
-         * This is done with hash table and LRU lists locked. As the only
-         * way to acquire first reference to previously unreferenced
-         * object is through hash-table lookup (lu_object_find()),
-         * or LRU scanning (lu_site_purge()), that are done under hash-table
-         * and LRU lock, no race with concurrent object lookup is possible
-         * and we can safely destroy object below.
-         */
+        * from hash table and LRU.
+        *
+        * This is done with hash table and LRU lists locked. As the only
+        * way to acquire first reference to previously unreferenced
+        * object is through hash-table lookup (lu_object_find()),
+        * or LRU scanning (lu_site_purge()), that are done under hash-table
+        * and LRU lock, no race with concurrent object lookup is possible
+        * and we can safely destroy object below.
+        */
        if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
                cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
-        cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
-        /*
-         * Object was already removed from hash and lru above, can
-         * kill it.
-         */
-        lu_object_free(env, orig);
+       cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
+       /*
+        * Object was already removed from hash and lru above, can
+        * kill it.
+        */
+       lu_object_free(env, orig);
 }
 EXPORT_SYMBOL(lu_object_put);
 
@@ -590,53 +589,35 @@ int lu_object_invariant(const struct lu_object *o)
 static struct lu_object *htable_lookup(struct lu_site *s,
                                       struct cfs_hash_bd *bd,
                                       const struct lu_fid *f,
-                                      wait_queue_t *waiter,
                                       __u64 *version)
 {
        struct lu_site_bkt_data *bkt;
        struct lu_object_header *h;
-       struct hlist_node       *hnode;
-       __u64  ver = cfs_hash_bd_version_get(bd);
+       struct hlist_node *hnode;
+       __u64 ver = cfs_hash_bd_version_get(bd);
 
-        if (*version == ver)
+       if (*version == ver)
                return ERR_PTR(-ENOENT);
 
-        *version = ver;
-        bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
+       *version = ver;
+       bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
        /* cfs_hash_bd_peek_locked is a somehow "internal" function
         * of cfs_hash, it doesn't add refcount on object. */
        hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
-        if (hnode == NULL) {
-                lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
+       if (!hnode) {
+               lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
                return ERR_PTR(-ENOENT);
-        }
-
-        h = container_of0(hnode, struct lu_object_header, loh_hash);
-        if (likely(!lu_object_is_dying(h))) {
-               cfs_hash_get(s->ls_obj_hash, hnode);
-                lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
-               if (!list_empty(&h->loh_lru)) {
-                       list_del_init(&h->loh_lru);
-                       bkt->lsb_lru_len--;
-                       percpu_counter_dec(&s->ls_lru_len_counter);
-               }
-                return lu_object_top(h);
-        }
-
-        /*
-         * Lookup found an object being destroyed this object cannot be
-         * returned (to assure that references to dying objects are eventually
-         * drained), and moreover, lookup has to wait until object is freed.
-         */
-
-       if (likely(waiter != NULL)) {
-               init_waitqueue_entry(waiter, current);
-               add_wait_queue(&bkt->lsb_marche_funebre, waiter);
-               set_current_state(TASK_UNINTERRUPTIBLE);
-               lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
        }
 
-       return ERR_PTR(-EAGAIN);
+       h = container_of0(hnode, struct lu_object_header, loh_hash);
+       cfs_hash_get(s->ls_obj_hash, hnode);
+       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
+       if (!list_empty(&h->loh_lru)) {
+               list_del_init(&h->loh_lru);
+               bkt->lsb_lru_len--;
+               percpu_counter_dec(&s->ls_lru_len_counter);
+       }
+       return lu_object_top(h);
 }
 
 /**
@@ -677,132 +658,104 @@ static void lu_object_limit(const struct lu_env *env,
 }
 
 static struct lu_object *lu_object_new(const struct lu_env *env,
-                                       struct lu_device *dev,
-                                       const struct lu_fid *f,
-                                       const struct lu_object_conf *conf)
+                                      struct lu_device *dev,
+                                      const struct lu_fid *f,
+                                      const struct lu_object_conf *conf)
 {
-        struct lu_object        *o;
-       struct cfs_hash              *hs;
-       struct cfs_hash_bd            bd;
+       struct lu_object *o;
+       struct cfs_hash *hs;
+       struct cfs_hash_bd bd;
 
-        o = lu_object_alloc(env, dev, f, conf);
-        if (unlikely(IS_ERR(o)))
-                return o;
+       o = lu_object_alloc(env, dev, f, conf);
+       if (unlikely(IS_ERR(o)))
+               return o;
 
-        hs = dev->ld_site->ls_obj_hash;
-        cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-        cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-        cfs_hash_bd_unlock(hs, &bd, 1);
+       hs = dev->ld_site->ls_obj_hash;
+       cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
+       cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
+       cfs_hash_bd_unlock(hs, &bd, 1);
 
        lu_object_limit(env, dev);
 
-        return o;
+       return o;
 }
 
 /**
  * Core logic of lu_object_find*() functions.
+ *
+ * Much like lu_object_find(), but top level device of object is specifically
+ * \a dev rather than top level device of the site. This interface allows
+ * objects of different "stacking" to be created within the same site.
  */
-static struct lu_object *lu_object_find_try(const struct lu_env *env,
-                                           struct lu_device *dev,
-                                           const struct lu_fid *f,
-                                           const struct lu_object_conf *conf,
-                                           wait_queue_t *waiter)
+struct lu_object *lu_object_find_at(const struct lu_env *env,
+                                   struct lu_device *dev,
+                                   const struct lu_fid *f,
+                                   const struct lu_object_conf *conf)
 {
-       struct lu_object      *o;
-       struct lu_object      *shadow;
-       struct lu_site        *s;
-       struct cfs_hash            *hs;
-       struct cfs_hash_bd          bd;
-       __u64                  version = 0;
+       struct lu_object *o;
+       struct lu_object *shadow;
+       struct lu_site *s;
+       struct cfs_hash *hs;
+       struct cfs_hash_bd bd;
+       __u64 version = 0;
 
-        /*
-         * This uses standard index maintenance protocol:
-         *
-         *     - search index under lock, and return object if found;
-         *     - otherwise, unlock index, allocate new object;
-         *     - lock index and search again;
-         *     - if nothing is found (usual case), insert newly created
-         *       object into index;
-         *     - otherwise (race: other thread inserted object), free
-         *       object just allocated.
-         *     - unlock index;
-         *     - return object.
-         *
-         * For "LOC_F_NEW" case, we are sure the object is new established.
-         * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
-         * just alloc and insert directly.
-         *
-         * If dying object is found during index search, add @waiter to the
-         * site wait-queue and return ERR_PTR(-EAGAIN).
-         */
-        if (conf != NULL && conf->loc_flags & LOC_F_NEW)
-                return lu_object_new(env, dev, f, conf);
-
-        s  = dev->ld_site;
-        hs = s->ls_obj_hash;
-        cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-        o = htable_lookup(s, &bd, f, waiter, &version);
-        cfs_hash_bd_unlock(hs, &bd, 1);
+       /*
+        * This uses standard index maintenance protocol:
+        *
+        *     - search index under lock, and return object if found;
+        *     - otherwise, unlock index, allocate new object;
+        *     - lock index and search again;
+        *     - if nothing is found (usual case), insert newly created
+        *       object into index;
+        *     - otherwise (race: other thread inserted object), free
+        *       object just allocated.
+        *     - unlock index;
+        *     - return object.
+        *
+        * For "LOC_F_NEW" case, we are sure the object is new established.
+        * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
+        * just alloc and insert directly.
+        *
+        * If dying object is found during index search, add @waiter to the
+        * site wait-queue and return ERR_PTR(-EAGAIN).
+        */
+       if (conf && conf->loc_flags & LOC_F_NEW)
+               return lu_object_new(env, dev, f, conf);
+
+       s  = dev->ld_site;
+       hs = s->ls_obj_hash;
+       cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
+       o = htable_lookup(s, &bd, f, &version);
+       cfs_hash_bd_unlock(hs, &bd, 1);
        if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
-                return o;
+               return o;
 
-        /*
-         * Allocate new object. This may result in rather complicated
-         * operations, including fld queries, inode loading, etc.
-         */
-        o = lu_object_alloc(env, dev, f, conf);
-        if (unlikely(IS_ERR(o)))
-                return o;
+       /*
+        * Allocate new object. This may result in rather complicated
+        * operations, including fld queries, inode loading, etc.
+        */
+       o = lu_object_alloc(env, dev, f, conf);
+       if (unlikely(IS_ERR(o)))
+               return o;
 
-        LASSERT(lu_fid_eq(lu_object_fid(o), f));
+       LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
-        cfs_hash_bd_lock(hs, &bd, 1);
+       cfs_hash_bd_lock(hs, &bd, 1);
 
-        shadow = htable_lookup(s, &bd, f, waiter, &version);
+       shadow = htable_lookup(s, &bd, f, &version);
        if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
-                cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-                cfs_hash_bd_unlock(hs, &bd, 1);
+               cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
+               cfs_hash_bd_unlock(hs, &bd, 1);
 
                lu_object_limit(env, dev);
 
-                return o;
-        }
-
-        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
-        cfs_hash_bd_unlock(hs, &bd, 1);
-        lu_object_free(env, o);
-        return shadow;
-}
-
-/**
- * Much like lu_object_find(), but top level device of object is specifically
- * \a dev rather than top level device of the site. This interface allows
- * objects of different "stacking" to be created within the same site.
- */
-struct lu_object *lu_object_find_at(const struct lu_env *env,
-                                   struct lu_device *dev,
-                                   const struct lu_fid *f,
-                                   const struct lu_object_conf *conf)
-{
-       struct lu_site_bkt_data *bkt;
-       struct lu_object        *obj;
-       wait_queue_t           wait;
-
-       if (conf != NULL && conf->loc_flags & LOC_F_NOWAIT)
-               return lu_object_find_try(env, dev, f, conf, NULL);
-
-       while (1) {
-               obj = lu_object_find_try(env, dev, f, conf, &wait);
-               if (obj != ERR_PTR(-EAGAIN))
-                       return obj;
-               /*
-                * lu_object_find_try() already added waiter into the
-                * wait queue.
-                */
-               schedule();
-               bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
-               remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
+               return o;
        }
+
+       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
+       cfs_hash_bd_unlock(hs, &bd, 1);
+       lu_object_free(env, o);
+       return shadow;
 }
 EXPORT_SYMBOL(lu_object_find_at);
 
@@ -1713,6 +1666,8 @@ refill:
                goto refill;
        }
 
+       ctx->lc_version = key_set_version;
+
        atomic_dec(&lu_key_initing_cnt);
        read_unlock(&lu_keys_guard);
        return 0;
@@ -2298,10 +2253,10 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
        cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
        {
-               __u64                    version = 0;
-               wait_queue_t             waiter;
-               struct lu_object        *shadow;
-               shadow = htable_lookup(s, &bd, fid, &waiter, &version);
+               __u64 version = 0;
+               struct lu_object *shadow;
+
+               shadow = htable_lookup(s, &bd, fid, &version);
                /* supposed to be unique */
                LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
        }