Whamcloud - gitweb
LU-2059 llog: MGC to use OSD API for backup logs
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index dd46c1b..ba45a0d 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -57,9 +57,8 @@
 #include <lustre_disk.h>
 #include <lustre_fid.h>
 #include <lu_object.h>
+#include <lu_ref.h>
 #include <libcfs/list.h>
-/* lu_time_global_{init,fini}() */
-#include <lu_time.h>
 
 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
 
@@ -145,7 +144,8 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
          * and LRU lock, no race with concurrent object lookup is possible
          * and we can safely destroy object below.
          */
-        cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
+       if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
+               cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
         cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
         /*
          * Object was already removed from hash and lru above, can
@@ -161,13 +161,34 @@ EXPORT_SYMBOL(lu_object_put);
  */
 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
 {
-       set_bit(LU_OBJECT_HEARD_BANSHEE,
-                   &o->lo_header->loh_flags);
+       set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
        return lu_object_put(env, o);
 }
 EXPORT_SYMBOL(lu_object_put_nocache);
 
 /**
+ * Kill the object and take it out of LRU cache.
+ * Currently used by client code for layout change.
+ */
+void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
+{
+       struct lu_object_header *top;
+
+       top = o->lo_header;
+       set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
+       if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
+               cfs_hash_t *obj_hash = o->lo_dev->ld_site->ls_obj_hash;
+               cfs_hash_bd_t bd;
+
+               cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
+               cfs_list_del_init(&top->loh_lru);
+               cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
+               cfs_hash_bd_unlock(obj_hash, &bd, 1);
+       }
+}
+EXPORT_SYMBOL(lu_object_unhash);
+
+/**
  * Allocate new object.
  *
  * This follows object creation protocol, described in the comment within
@@ -185,18 +206,19 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
         int result;
         ENTRY;
 
-        /*
-         * Create top-level object slice. This will also create
-         * lu_object_header.
-         */
-        top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
-        if (top == NULL)
-                RETURN(ERR_PTR(-ENOMEM));
+       /*
+        * Create top-level object slice. This will also create
+        * lu_object_header.
+        */
+       top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
+       if (top == NULL)
+               RETURN(ERR_PTR(-ENOMEM));
+       if (IS_ERR(top))
+               RETURN(top);
         /*
          * This is the only place where object fid is assigned. It's constant
          * after this point.
          */
-        LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
         top->lo_header->loh_fid = *f;
         layers = &top->lo_header->loh_layers;
         do {
@@ -296,6 +318,9 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
         int                      bnr;
         int                      i;
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
+               RETURN(0);
+
         CFS_INIT_LIST_HEAD(&dispose);
         /*
          * Under LRU list lock, scan LRU list and move unreferenced objects to
@@ -400,10 +425,10 @@ LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
  * lu_global_init().
  */
 struct lu_context_key lu_global_key = {
-        .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
-                    LCT_MG_THREAD | LCT_CL_THREAD,
-        .lct_init = lu_global_key_init,
-        .lct_fini = lu_global_key_fini
+       .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
+                   LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
+       .lct_init = lu_global_key_init,
+       .lct_fini = lu_global_key_fini
 };
 
 /**
@@ -515,20 +540,21 @@ static struct lu_object *htable_lookup(struct lu_site *s,
         __u64  ver = cfs_hash_bd_version_get(bd);
 
         if (*version == ver)
-                return NULL;
+               return ERR_PTR(-ENOENT);
 
         *version = ver;
         bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
-        /* cfs_hash_bd_lookup_intent is a somehow "internal" function
-         * of cfs_hash, but we don't want refcount on object right now */
-        hnode = cfs_hash_bd_lookup_locked(s->ls_obj_hash, bd, (void *)f);
+       /* cfs_hash_bd_peek_locked is a somehow "internal" function
+        * of cfs_hash, it doesn't add refcount on object. */
+       hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
         if (hnode == NULL) {
                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
-                return NULL;
+               return ERR_PTR(-ENOENT);
         }
 
         h = container_of0(hnode, struct lu_object_header, loh_hash);
         if (likely(!lu_object_is_dying(h))) {
+               cfs_hash_get(s->ls_obj_hash, hnode);
                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
                 cfs_list_del_init(&h->loh_lru);
                 return lu_object_top(h);
@@ -539,7 +565,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
          * returned (to assure that references to dying objects are eventually
          * drained), and moreover, lookup has to wait until object is freed.
          */
-        cfs_atomic_dec(&h->loh_ref);
 
         cfs_waitlink_init(waiter);
         cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
@@ -548,6 +573,31 @@ static struct lu_object *htable_lookup(struct lu_site *s,
         return ERR_PTR(-EAGAIN);
 }
 
+static struct lu_object *htable_lookup_nowait(struct lu_site *s,
+                                             cfs_hash_bd_t *bd,
+                                             const struct lu_fid *f)
+{
+       cfs_hlist_node_t        *hnode;
+       struct lu_object_header *h;
+
+       /* cfs_hash_bd_peek_locked is a somehow "internal" function
+        * of cfs_hash, it doesn't add refcount on object. */
+       hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
+       if (hnode == NULL) {
+               lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
+               return ERR_PTR(-ENOENT);
+       }
+
+       h = container_of0(hnode, struct lu_object_header, loh_hash);
+       if (unlikely(lu_object_is_dying(h)))
+               return ERR_PTR(-ENOENT);
+
+       cfs_hash_get(s->ls_obj_hash, hnode);
+       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
+       cfs_list_del_init(&h->loh_lru);
+       return lu_object_top(h);
+}
+
 /**
  * Search cache for an object with the fid \a f. If such object is found,
  * return it. Otherwise, create new object, insert it into cache and return
@@ -628,7 +678,7 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env,
         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
         o = htable_lookup(s, &bd, f, waiter, &version);
         cfs_hash_bd_unlock(hs, &bd, 1);
-        if (o != NULL)
+       if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
                 return o;
 
         /*
@@ -644,7 +694,7 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env,
         cfs_hash_bd_lock(hs, &bd, 1);
 
         shadow = htable_lookup(s, &bd, f, waiter, &version);
-        if (likely(shadow == NULL)) {
+       if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
                 struct lu_site_bkt_data *bkt;
 
                 bkt = cfs_hash_bd_extra_get(hs, &bd);
@@ -690,6 +740,30 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
 EXPORT_SYMBOL(lu_object_find_at);
 
 /**
+ * Try to find the object in cache without waiting for the dead object
+ * to be released nor allocating object if no cached one was found.
+ *
+ * The found object will be set as LU_OBJECT_HEARD_BANSHEE for purging.
+ */
+void lu_object_purge(const struct lu_env *env, struct lu_device *dev,
+                    const struct lu_fid *f)
+{
+       struct lu_site          *s  = dev->ld_site;
+       cfs_hash_t              *hs = s->ls_obj_hash;
+       cfs_hash_bd_t            bd;
+       struct lu_object        *o;
+
+       cfs_hash_bd_get_and_lock(hs, f, &bd, 1);
+       o = htable_lookup_nowait(s, &bd, f);
+       cfs_hash_bd_unlock(hs, &bd, 1);
+       if (!IS_ERR(o)) {
+               set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
+               lu_object_put(env, o);
+       }
+}
+EXPORT_SYMBOL(lu_object_purge);
+
+/**
  * Find object with given fid, and return its slice belonging to given device.
  */
 struct lu_object *lu_object_find_slice(const struct lu_env *env,
@@ -718,20 +792,22 @@ static CFS_LIST_HEAD(lu_device_types);
 
 int lu_device_type_init(struct lu_device_type *ldt)
 {
-        int result;
+       int result = 0;
 
-        CFS_INIT_LIST_HEAD(&ldt->ldt_linkage);
-        result = ldt->ldt_ops->ldto_init(ldt);
-        if (result == 0)
-                cfs_list_add(&ldt->ldt_linkage, &lu_device_types);
-        return result;
+       CFS_INIT_LIST_HEAD(&ldt->ldt_linkage);
+       if (ldt->ldt_ops->ldto_init)
+               result = ldt->ldt_ops->ldto_init(ldt);
+       if (result == 0)
+               cfs_list_add(&ldt->ldt_linkage, &lu_device_types);
+       return result;
 }
 EXPORT_SYMBOL(lu_device_type_init);
 
 void lu_device_type_fini(struct lu_device_type *ldt)
 {
-        cfs_list_del_init(&ldt->ldt_linkage);
-        ldt->ldt_ops->ldto_fini(ldt);
+       cfs_list_del_init(&ldt->ldt_linkage);
+       if (ldt->ldt_ops->ldto_fini)
+               ldt->ldt_ops->ldto_fini(ldt);
 }
 EXPORT_SYMBOL(lu_device_type_fini);
 
@@ -739,10 +815,10 @@ void lu_types_stop(void)
 {
         struct lu_device_type *ldt;
 
-        cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
-                if (ldt->ldt_device_nr == 0)
-                        ldt->ldt_ops->ldto_stop(ldt);
-        }
+       cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
+               if (ldt->ldt_device_nr == 0 && ldt->ldt_ops->ldto_stop)
+                       ldt->ldt_ops->ldto_stop(ldt);
+       }
 }
 EXPORT_SYMBOL(lu_types_stop);
 
@@ -824,12 +900,12 @@ static int lu_htable_order(void)
          *
          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
          */
-        cache_size = cfs_num_physpages;
+       cache_size = num_physpages;
 
 #if BITS_PER_LONG == 32
         /* limit hashtable size for lowmem systems to low RAM */
-        if (cache_size > 1 << (30 - CFS_PAGE_SHIFT))
-                cache_size = 1 << (30 - CFS_PAGE_SHIFT) * 3 / 4;
+       if (cache_size > 1 << (30 - PAGE_CACHE_SHIFT))
+               cache_size = 1 << (30 - PAGE_CACHE_SHIFT) * 3 / 4;
 #endif
 
         /* clear off unreasonable cache setting. */
@@ -842,7 +918,7 @@ static int lu_htable_order(void)
                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
         }
         cache_size = cache_size / 100 * lu_cache_percent *
-                (CFS_PAGE_SIZE / 1024);
+               (PAGE_CACHE_SIZE / 1024);
 
         for (bits = 1; (1 << bits) < cache_size; ++bits) {
                 ;
@@ -1122,16 +1198,17 @@ EXPORT_SYMBOL(lu_device_fini);
  * Initialize object \a o that is part of compound object \a h and was created
  * by device \a d.
  */
-int lu_object_init(struct lu_object *o,
-                   struct lu_object_header *h, struct lu_device *d)
+int lu_object_init(struct lu_object *o, struct lu_object_header *h,
+                  struct lu_device *d)
 {
-        memset(o, 0, sizeof *o);
-        o->lo_header = h;
-        o->lo_dev    = d;
-        lu_device_get(d);
-        o->lo_dev_ref = lu_ref_add(&d->ld_reference, "lu_object", o);
-        CFS_INIT_LIST_HEAD(&o->lo_linkage);
-        return 0;
+       memset(o, 0, sizeof(*o));
+       o->lo_header = h;
+       o->lo_dev = d;
+       lu_device_get(d);
+       lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
+       CFS_INIT_LIST_HEAD(&o->lo_linkage);
+
+       return 0;
 }
 EXPORT_SYMBOL(lu_object_init);
 
@@ -1140,16 +1217,16 @@ EXPORT_SYMBOL(lu_object_init);
  */
 void lu_object_fini(struct lu_object *o)
 {
-        struct lu_device *dev = o->lo_dev;
+       struct lu_device *dev = o->lo_dev;
 
-        LASSERT(cfs_list_empty(&o->lo_linkage));
+       LASSERT(cfs_list_empty(&o->lo_linkage));
 
-        if (dev != NULL) {
-                lu_ref_del_at(&dev->ld_reference,
-                              o->lo_dev_ref , "lu_object", o);
-                lu_device_put(dev);
-                o->lo_dev = NULL;
-        }
+       if (dev != NULL) {
+               lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
+                             "lu_object", o);
+               lu_device_put(dev);
+               o->lo_dev = NULL;
+       }
 }
 EXPORT_SYMBOL(lu_object_fini);
 
@@ -1465,13 +1542,11 @@ static CFS_LIST_HEAD(lu_context_remembered);
 void lu_context_key_quiesce(struct lu_context_key *key)
 {
         struct lu_context *ctx;
-        extern unsigned cl_env_cache_purge(unsigned nr);
 
         if (!(key->lct_tags & LCT_QUIESCENT)) {
                 /*
                  * XXX layering violation.
                  */
-                cl_env_cache_purge(~0);
                 key->lct_tags |= LCT_QUIESCENT;
                 /*
                  * XXX memory barrier has to go here.
@@ -1758,7 +1833,7 @@ int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
 }
 EXPORT_SYMBOL(lu_env_refill_by_tags);
 
-static struct cfs_shrinker *lu_site_shrinker = NULL;
+static struct shrinker *lu_site_shrinker;
 
 typedef struct lu_site_stats{
         unsigned        lss_populated;
@@ -1893,9 +1968,9 @@ int lu_printk_printer(const struct lu_env *env,
         return 0;
 }
 
-void lu_debugging_setup(void)
+int lu_debugging_setup(void)
 {
-        lu_env_init(&lu_debugging_env, ~0);
+       return lu_env_init(&lu_debugging_env, ~0);
 }
 
 void lu_context_keys_dump(void)
@@ -1925,63 +2000,6 @@ static int lu_cache_shrink(int nr, unsigned int gfp_mask)
 }
 #endif /* __KERNEL__ */
 
-int  cl_global_init(void);
-void cl_global_fini(void);
-int  lu_ref_global_init(void);
-void lu_ref_global_fini(void);
-
-int dt_global_init(void);
-void dt_global_fini(void);
-
-int llo_global_init(void);
-void llo_global_fini(void);
-
-/* context key constructor/destructor: lu_ucred_key_init, lu_ucred_key_fini */
-LU_KEY_INIT_FINI(lu_ucred, struct lu_ucred);
-
-static struct lu_context_key lu_ucred_key = {
-       .lct_tags = LCT_SESSION,
-       .lct_init = lu_ucred_key_init,
-       .lct_fini = lu_ucred_key_fini
-};
-
-/**
- * Get ucred key if session exists and ucred key is allocated on it.
- * Return NULL otherwise.
- */
-struct lu_ucred *lu_ucred(const struct lu_env *env)
-{
-       if (!env->le_ses)
-               return NULL;
-       return lu_context_key_get(env->le_ses, &lu_ucred_key);
-}
-EXPORT_SYMBOL(lu_ucred);
-
-/**
- * Get ucred key and check if it is properly initialized.
- * Return NULL otherwise.
- */
-struct lu_ucred *lu_ucred_check(const struct lu_env *env)
-{
-       struct lu_ucred *uc = lu_ucred(env);
-       if (uc && uc->uc_valid != UCRED_OLD && uc->uc_valid != UCRED_NEW)
-               return NULL;
-       return uc;
-}
-EXPORT_SYMBOL(lu_ucred_check);
-
-/**
- * Get ucred key, which must exist and must be properly initialized.
- * Assert otherwise.
- */
-struct lu_ucred *lu_ucred_assert(const struct lu_env *env)
-{
-       struct lu_ucred *uc = lu_ucred_check(env);
-       LASSERT(uc != NULL);
-       return uc;
-}
-EXPORT_SYMBOL(lu_ucred_assert);
-
 /**
  * Initialization of global lu_* data.
  */
@@ -2000,11 +2018,6 @@ int lu_global_init(void)
         if (result != 0)
                 return result;
 
-       LU_CONTEXT_KEY_INIT(&lu_ucred_key);
-       result = lu_context_key_register(&lu_ucred_key);
-       if (result != 0)
-               return result;
-
         /*
          * At this level, we don't know what tags are needed, so allocate them
          * conservatively. This should not be too bad, because this
@@ -2021,26 +2034,10 @@ int lu_global_init(void)
          * inode, one for ea. Unfortunately setting this high value results in
          * lu_object/inode cache consuming all the memory.
          */
-        lu_site_shrinker = cfs_set_shrinker(CFS_DEFAULT_SEEKS, lu_cache_shrink);
+       lu_site_shrinker = set_shrinker(DEFAULT_SEEKS, lu_cache_shrink);
         if (lu_site_shrinker == NULL)
                 return -ENOMEM;
 
-        result = lu_time_global_init();
-        if (result)
-                GOTO(out, result);
-
-#ifdef __KERNEL__
-        result = dt_global_init();
-        if (result)
-                GOTO(out, result);
-
-        result = llo_global_init();
-        if (result)
-                GOTO(out, result);
-#endif
-        result = cl_global_init();
-out:
-
         return result;
 }
 
@@ -2049,19 +2046,12 @@ out:
  */
 void lu_global_fini(void)
 {
-        cl_global_fini();
-#ifdef __KERNEL__
-        llo_global_fini();
-        dt_global_fini();
-#endif
-        lu_time_global_fini();
         if (lu_site_shrinker != NULL) {
-                cfs_remove_shrinker(lu_site_shrinker);
+               remove_shrinker(lu_site_shrinker);
                 lu_site_shrinker = NULL;
         }
 
-        lu_context_key_degister(&lu_global_key);
-       lu_context_key_degister(&lu_ucred_key);
+       lu_context_key_degister(&lu_global_key);
 
         /*
          * Tear shrinker environment down _after_ de-registering
@@ -2074,12 +2064,6 @@ void lu_global_fini(void)
         lu_ref_global_fini();
 }
 
-struct lu_buf LU_BUF_NULL = {
-        .lb_buf = NULL,
-        .lb_len = 0
-};
-EXPORT_SYMBOL(LU_BUF_NULL);
-
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
 {
 #ifdef LPROCFS
@@ -2118,13 +2102,6 @@ int lu_site_stats_print(const struct lu_site *s, char *page, int count)
 }
 EXPORT_SYMBOL(lu_site_stats_print);
 
-const char *lu_time_names[LU_TIME_NR] = {
-        [LU_TIME_FIND_LOOKUP] = "find_lookup",
-        [LU_TIME_FIND_ALLOC]  = "find_alloc",
-        [LU_TIME_FIND_INSERT] = "find_insert"
-};
-EXPORT_SYMBOL(lu_time_names);
-
 /**
  * Helper function to initialize a number of kmem slab caches at once.
  */
@@ -2134,9 +2111,9 @@ int lu_kmem_init(struct lu_kmem_descr *caches)
         struct lu_kmem_descr *iter = caches;
 
         for (result = 0; iter->ckd_cache != NULL; ++iter) {
-                *iter->ckd_cache = cfs_mem_cache_create(iter->ckd_name,
-                                                        iter->ckd_size,
-                                                        0, 0);
+               *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
+                                                    iter->ckd_size,
+                                                    0, 0, NULL);
                 if (*iter->ckd_cache == NULL) {
                         result = -ENOMEM;
                         /* free all previously allocated caches */
@@ -2154,13 +2131,9 @@ EXPORT_SYMBOL(lu_kmem_init);
  */
 void lu_kmem_fini(struct lu_kmem_descr *caches)
 {
-        int rc;
-
         for (; caches->ckd_cache != NULL; ++caches) {
                 if (*caches->ckd_cache != NULL) {
-                        rc = cfs_mem_cache_destroy(*caches->ckd_cache);
-                        LASSERTF(rc == 0, "couldn't destroy %s slab\n",
-                                 caches->ckd_name);
+                       kmem_cache_destroy(*caches->ckd_cache);
                         *caches->ckd_cache = NULL;
                 }
         }
@@ -2189,7 +2162,7 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
        cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
        shadow = htable_lookup(s, &bd, fid, &waiter, &version);
        /* supposed to be unique */
-       LASSERT(shadow == NULL);
+       LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
        *old = *fid;
        bkt = cfs_hash_bd_extra_get(hs, &bd);
        cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
@@ -2216,3 +2189,81 @@ struct lu_object *lu_object_anon(const struct lu_env *env,
        return o;
 }
 EXPORT_SYMBOL(lu_object_anon);
+
+struct lu_buf LU_BUF_NULL = {
+       .lb_buf = NULL,
+       .lb_len = 0
+};
+EXPORT_SYMBOL(LU_BUF_NULL);
+
+void lu_buf_free(struct lu_buf *buf)
+{
+       LASSERT(buf);
+       if (buf->lb_buf) {
+               LASSERT(buf->lb_len > 0);
+               OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
+               buf->lb_buf = NULL;
+               buf->lb_len = 0;
+       }
+}
+EXPORT_SYMBOL(lu_buf_free);
+
+void lu_buf_alloc(struct lu_buf *buf, int size)
+{
+       LASSERT(buf);
+       LASSERT(buf->lb_buf == NULL);
+       LASSERT(buf->lb_len == 0);
+       OBD_ALLOC_LARGE(buf->lb_buf, size);
+       if (likely(buf->lb_buf))
+               buf->lb_len = size;
+}
+EXPORT_SYMBOL(lu_buf_alloc);
+
+void lu_buf_realloc(struct lu_buf *buf, int size)
+{
+       lu_buf_free(buf);
+       lu_buf_alloc(buf, size);
+}
+EXPORT_SYMBOL(lu_buf_realloc);
+
+struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, int len)
+{
+       if (buf->lb_buf == NULL && buf->lb_len == 0)
+               lu_buf_alloc(buf, len);
+
+       if ((len > buf->lb_len) && (buf->lb_buf != NULL))
+               lu_buf_realloc(buf, len);
+
+       return buf;
+}
+EXPORT_SYMBOL(lu_buf_check_and_alloc);
+
+/**
+ * Increase the size of the \a buf.
+ * preserves old data in buffer
+ * old buffer remains unchanged on error
+ * \retval 0 or -ENOMEM
+ */
+int lu_buf_check_and_grow(struct lu_buf *buf, int len)
+{
+       char *ptr;
+
+       if (len <= buf->lb_len)
+               return 0;
+
+       OBD_ALLOC_LARGE(ptr, len);
+       if (ptr == NULL)
+               return -ENOMEM;
+
+       /* Free the old buf */
+       if (buf->lb_buf != NULL) {
+               memcpy(ptr, buf->lb_buf, buf->lb_len);
+               OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
+       }
+
+       buf->lb_buf = ptr;
+       buf->lb_len = len;
+       return 0;
+}
+EXPORT_SYMBOL(lu_buf_check_and_grow);
+