Whamcloud - gitweb
LU-3321 clio: revert LU-2622 for removing global env list
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index ba45a0d..28a18ef 100644 (file)
@@ -103,17 +103,17 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
         cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
         bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
 
-        if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
-                if (lu_object_is_dying(top)) {
+       if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
+               if (lu_object_is_dying(top)) {
 
-                        /*
-                         * somebody may be waiting for this, currently only
-                         * used for cl_object, see cl_object_put_last().
-                         */
-                        cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
-                }
-                return;
-        }
+                       /*
+                        * somebody may be waiting for this, currently only
+                        * used for cl_object, see cl_object_put_last().
+                        */
+                       wake_up_all(&bkt->lsb_marche_funebre);
+               }
+               return;
+       }
 
         LASSERT(bkt->lsb_busy > 0);
         bkt->lsb_busy--;
@@ -195,16 +195,18 @@ EXPORT_SYMBOL(lu_object_unhash);
  * struct lu_device_operations definition.
  */
 static struct lu_object *lu_object_alloc(const struct lu_env *env,
-                                         struct lu_device *dev,
-                                         const struct lu_fid *f,
-                                         const struct lu_object_conf *conf)
-{
-        struct lu_object *scan;
-        struct lu_object *top;
-        cfs_list_t *layers;
-        int clean;
-        int result;
-        ENTRY;
+                                        struct lu_device *dev,
+                                        const struct lu_fid *f,
+                                        const struct lu_object_conf *conf)
+{
+       struct lu_object *scan;
+       struct lu_object *top;
+       cfs_list_t *layers;
+       unsigned int init_mask = 0;
+       unsigned int init_flag;
+       int clean;
+       int result;
+       ENTRY;
 
        /*
         * Create top-level object slice. This will also create
@@ -221,25 +223,29 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
          */
         top->lo_header->loh_fid = *f;
         layers = &top->lo_header->loh_layers;
-        do {
-                /*
-                 * Call ->loo_object_init() repeatedly, until no more new
-                 * object slices are created.
-                 */
-                clean = 1;
-                cfs_list_for_each_entry(scan, layers, lo_linkage) {
-                        if (scan->lo_flags & LU_OBJECT_ALLOCATED)
-                                continue;
-                        clean = 0;
-                        scan->lo_header = top->lo_header;
-                        result = scan->lo_ops->loo_object_init(env, scan, conf);
-                        if (result != 0) {
-                                lu_object_free(env, top);
-                                RETURN(ERR_PTR(result));
-                        }
-                        scan->lo_flags |= LU_OBJECT_ALLOCATED;
-                }
-        } while (!clean);
+
+       do {
+               /*
+                * Call ->loo_object_init() repeatedly, until no more new
+                * object slices are created.
+                */
+               clean = 1;
+               init_flag = 1;
+               cfs_list_for_each_entry(scan, layers, lo_linkage) {
+                       if (init_mask & init_flag)
+                               goto next;
+                       clean = 0;
+                       scan->lo_header = top->lo_header;
+                       result = scan->lo_ops->loo_object_init(env, scan, conf);
+                       if (result != 0) {
+                               lu_object_free(env, top);
+                               RETURN(ERR_PTR(result));
+                       }
+                       init_mask |= init_flag;
+next:
+                       init_flag <<= 1;
+               }
+       } while (!clean);
 
         cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
                 if (scan->lo_ops->loo_object_start != NULL) {
@@ -285,20 +291,20 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
          */
         CFS_INIT_LIST_HEAD(&splice);
         cfs_list_splice_init(layers, &splice);
-        while (!cfs_list_empty(&splice)) {
-                /*
-                 * Free layers in bottom-to-top order, so that object header
-                 * lives as long as possible and ->loo_object_free() methods
-                 * can look at its contents.
-                 */
-                o = container_of0(splice.prev, struct lu_object, lo_linkage);
-                cfs_list_del_init(&o->lo_linkage);
-                LASSERT(o->lo_ops->loo_object_free != NULL);
-                o->lo_ops->loo_object_free(env, o);
-        }
+       while (!cfs_list_empty(&splice)) {
+               /*
+                * Free layers in bottom-to-top order, so that object header
+                * lives as long as possible and ->loo_object_free() methods
+                * can look at its contents.
+                */
+               o = container_of0(splice.prev, struct lu_object, lo_linkage);
+               cfs_list_del_init(&o->lo_linkage);
+               LASSERT(o->lo_ops->loo_object_free != NULL);
+               o->lo_ops->loo_object_free(env, o);
+       }
 
-        if (cfs_waitq_active(&bkt->lsb_marche_funebre))
-                cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
+       if (waitqueue_active(&bkt->lsb_marche_funebre))
+               wake_up_all(&bkt->lsb_marche_funebre);
 }
 
 /**
@@ -355,13 +361,13 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
                         if (count > 0 && --count == 0)
                                 break;
 
-                }
-                cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
-                cfs_cond_resched();
-                /*
-                 * Free everything on the dispose list. This is safe against
-                 * races due to the reasons described in lu_object_put().
-                 */
+               }
+               cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
+               cond_resched();
+               /*
+                * Free everything on the dispose list. This is safe against
+                * races due to the reasons described in lu_object_put().
+                */
                 while (!cfs_list_empty(&dispose)) {
                         h = container_of0(dispose.next,
                                           struct lu_object_header, loh_lru);
@@ -486,28 +492,30 @@ EXPORT_SYMBOL(lu_object_header_print);
  * Print human readable representation of the \a o to the \a printer.
  */
 void lu_object_print(const struct lu_env *env, void *cookie,
-                     lu_printer_t printer, const struct lu_object *o)
+                    lu_printer_t printer, const struct lu_object *o)
 {
-        static const char ruler[] = "........................................";
-        struct lu_object_header *top;
-        int depth;
+       static const char ruler[] = "........................................";
+       struct lu_object_header *top;
+       int depth = 4;
 
-        top = o->lo_header;
-        lu_object_header_print(env, cookie, printer, top);
-        (*printer)(env, cookie, "{ \n");
-        cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
-                depth = o->lo_depth + 4;
+       top = o->lo_header;
+       lu_object_header_print(env, cookie, printer, top);
+       (*printer)(env, cookie, "{\n");
 
-                /*
-                 * print `.' \a depth times followed by type name and address
-                 */
-                (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
-                           o->lo_dev->ld_type->ldt_name, o);
-                if (o->lo_ops->loo_object_print != NULL)
-                        o->lo_ops->loo_object_print(env, cookie, printer, o);
-                (*printer)(env, cookie, "\n");
-        }
-        (*printer)(env, cookie, "} header@%p\n", top);
+       cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
+               /*
+                * print `.' \a depth times followed by type name and address
+                */
+               (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
+                          o->lo_dev->ld_type->ldt_name, o);
+
+               if (o->lo_ops->loo_object_print != NULL)
+                       (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
+
+               (*printer)(env, cookie, "\n");
+       }
+
+       (*printer)(env, cookie, "} header@%p\n", top);
 }
 EXPORT_SYMBOL(lu_object_print);
 
@@ -529,10 +537,10 @@ int lu_object_invariant(const struct lu_object *o)
 EXPORT_SYMBOL(lu_object_invariant);
 
 static struct lu_object *htable_lookup(struct lu_site *s,
-                                       cfs_hash_bd_t *bd,
-                                       const struct lu_fid *f,
-                                       cfs_waitlink_t *waiter,
-                                       __u64 *version)
+                                      cfs_hash_bd_t *bd,
+                                      const struct lu_fid *f,
+                                      wait_queue_t *waiter,
+                                      __u64 *version)
 {
         struct lu_site_bkt_data *bkt;
         struct lu_object_header *h;
@@ -566,11 +574,11 @@ static struct lu_object *htable_lookup(struct lu_site *s,
          * drained), and moreover, lookup has to wait until object is freed.
          */
 
-        cfs_waitlink_init(waiter);
-        cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
-        cfs_set_current_state(CFS_TASK_UNINT);
-        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
-        return ERR_PTR(-EAGAIN);
+       init_waitqueue_entry_current(waiter);
+       add_wait_queue(&bkt->lsb_marche_funebre, waiter);
+       set_current_state(TASK_UNINTERRUPTIBLE);
+       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
+       return ERR_PTR(-EAGAIN);
 }
 
 static struct lu_object *htable_lookup_nowait(struct lu_site *s,
@@ -638,17 +646,17 @@ static struct lu_object *lu_object_new(const struct lu_env *env,
  * Core logic of lu_object_find*() functions.
  */
 static struct lu_object *lu_object_find_try(const struct lu_env *env,
-                                            struct lu_device *dev,
-                                            const struct lu_fid *f,
-                                            const struct lu_object_conf *conf,
-                                            cfs_waitlink_t *waiter)
-{
-        struct lu_object      *o;
-        struct lu_object      *shadow;
-        struct lu_site        *s;
-        cfs_hash_t            *hs;
-        cfs_hash_bd_t          bd;
-        __u64                  version = 0;
+                                           struct lu_device *dev,
+                                           const struct lu_fid *f,
+                                           const struct lu_object_conf *conf,
+                                           wait_queue_t *waiter)
+{
+       struct lu_object      *o;
+       struct lu_object      *shadow;
+       struct lu_site        *s;
+       cfs_hash_t            *hs;
+       cfs_hash_bd_t          bd;
+       __u64                  version = 0;
 
         /*
          * This uses standard index maintenance protocol:
@@ -716,26 +724,26 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env,
  * objects of different "stacking" to be created within the same site.
  */
 struct lu_object *lu_object_find_at(const struct lu_env *env,
-                                    struct lu_device *dev,
-                                    const struct lu_fid *f,
-                                    const struct lu_object_conf *conf)
-{
-        struct lu_site_bkt_data *bkt;
-        struct lu_object        *obj;
-        cfs_waitlink_t           wait;
-
-        while (1) {
-                obj = lu_object_find_try(env, dev, f, conf, &wait);
-                if (obj != ERR_PTR(-EAGAIN))
-                        return obj;
-                /*
-                 * lu_object_find_try() already added waiter into the
-                 * wait queue.
-                 */
-                cfs_waitq_wait(&wait, CFS_TASK_UNINT);
-                bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
-                cfs_waitq_del(&bkt->lsb_marche_funebre, &wait);
-        }
+                                   struct lu_device *dev,
+                                   const struct lu_fid *f,
+                                   const struct lu_object_conf *conf)
+{
+       struct lu_site_bkt_data *bkt;
+       struct lu_object        *obj;
+       wait_queue_t           wait;
+
+       while (1) {
+               obj = lu_object_find_try(env, dev, f, conf, &wait);
+               if (obj != ERR_PTR(-EAGAIN))
+                       return obj;
+               /*
+                * lu_object_find_try() already added waiter into the
+                * wait queue.
+                */
+               waitq_wait(&wait, TASK_UNINTERRUPTIBLE);
+               bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
+               remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
+       }
 }
 EXPORT_SYMBOL(lu_object_find_at);
 
@@ -900,7 +908,7 @@ static int lu_htable_order(void)
          *
          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
          */
-       cache_size = num_physpages;
+       cache_size = totalram_pages;
 
 #if BITS_PER_LONG == 32
         /* limit hashtable size for lowmem systems to low RAM */
@@ -1055,11 +1063,11 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
                 return -ENOMEM;
         }
 
-        cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
-                bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
-                CFS_INIT_LIST_HEAD(&bkt->lsb_lru);
-                cfs_waitq_init(&bkt->lsb_marche_funebre);
-        }
+       cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
+               bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
+               CFS_INIT_LIST_HEAD(&bkt->lsb_lru);
+               init_waitqueue_head(&bkt->lsb_marche_funebre);
+       }
 
         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
         if (s->ls_stats == NULL) {
@@ -1403,8 +1411,8 @@ static void key_fini(struct lu_context *ctx, int index)
 
                LASSERT(key->lct_owner != NULL);
                if ((ctx->lc_tags & LCT_NOREF) == 0) {
-                       LINVRNT(cfs_module_refcount(key->lct_owner) > 0);
-                       cfs_module_put(key->lct_owner);
+                       LINVRNT(module_refcount(key->lct_owner) > 0);
+                       module_put(key->lct_owner);
                }
                ctx->lc_value[index] = NULL;
        }
@@ -1542,11 +1550,13 @@ static CFS_LIST_HEAD(lu_context_remembered);
 void lu_context_key_quiesce(struct lu_context_key *key)
 {
         struct lu_context *ctx;
+        extern unsigned cl_env_cache_purge(unsigned nr);
 
         if (!(key->lct_tags & LCT_QUIESCENT)) {
                 /*
                  * XXX layering violation.
                  */
+                cl_env_cache_purge(~0);
                 key->lct_tags |= LCT_QUIESCENT;
                 /*
                  * XXX memory barrier has to go here.
@@ -1607,11 +1617,11 @@ static int keys_fill(struct lu_context *ctx)
                         if (unlikely(IS_ERR(value)))
                                 return PTR_ERR(value);
 
-                        LASSERT(key->lct_owner != NULL);
-                        if (!(ctx->lc_tags & LCT_NOREF))
-                                cfs_try_module_get(key->lct_owner);
-                        lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
-                        cfs_atomic_inc(&key->lct_used);
+                       LASSERT(key->lct_owner != NULL);
+                       if (!(ctx->lc_tags & LCT_NOREF))
+                               try_module_get(key->lct_owner);
+                       lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
+                       cfs_atomic_inc(&key->lct_used);
                         /*
                          * This is the only place in the code, where an
                          * element of ctx->lc_value[] array is set to non-NULL
@@ -2151,7 +2161,7 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
        struct lu_fid           *old = &o->lo_header->loh_fid;
        struct lu_site_bkt_data *bkt;
        struct lu_object        *shadow;
-       cfs_waitlink_t           waiter;
+       wait_queue_t             waiter;
        cfs_hash_t              *hs;
        cfs_hash_bd_t            bd;
        __u64                    version = 0;