Whamcloud - gitweb
LU-5916 lod: inherit default pool setting properly
[fs/lustre-release.git] / lustre / lod / lod_object.c
index f082c91..176ea7d 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2013, Intel Corporation.
  */
 /*
  * lustre/lod/lod_object.c
@@ -31,9 +31,6 @@
  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <obd.h>
@@ -49,7 +46,7 @@
 
 #include "lod_internal.h"
 
-extern cfs_mem_cache_t *lod_object_kmem;
+extern struct kmem_cache *lod_object_kmem;
 static const struct dt_body_operations lod_body_lnk_ops;
 
 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
@@ -124,8 +121,6 @@ static struct dt_it *lod_it_init(const struct lu_env *env,
 
 #define LOD_CHECK_IT(env, it)                                  \
 {                                                              \
-       /* IT is supposed to be in thread info always */        \
-       LASSERT((it) == &lod_env_info(env)->lti_it);            \
        LASSERT((it)->lit_obj != NULL);                         \
        LASSERT((it)->lit_it != NULL);                          \
 } while(0)
@@ -293,6 +288,16 @@ static int lod_declare_attr_set(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
+       /* osp_declare_attr_set() ignores all attributes other than
+        * UID, GID, and size, and osp_attr_set() ignores all but UID
+        * and GID.  Declaration of size attr setting happens through
+        * lod_declare_init_size(), and not through this function.
+        * Therefore we need not load striping unless ownership is
+        * changing.  This should save memory and (we hope) speed up
+        * rename(). */
+       if (!(attr->la_valid & (LA_UID | LA_GID)))
+               RETURN(rc);
+
        /*
         * load striping information, notice we don't do this when object
         * is being initialized as we don't need this information till
@@ -315,6 +320,19 @@ static int lod_declare_attr_set(const struct lu_env *env,
                }
        }
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_MAKE_LOVEA_HOLE) &&
+           dt_object_exists(next) &&
+           dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
+               struct lod_thread_info *info = lod_env_info(env);
+               struct lu_buf *buf = &info->lti_buf;
+
+               buf->lb_buf = info->lti_ea_store;
+               buf->lb_len = info->lti_ea_store_size;
+               dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
+                                    LU_XATTR_REPLACE, handle);
+       }
+
+
        RETURN(rc);
 }
 
@@ -336,6 +354,9 @@ static int lod_attr_set(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
+       if (!(attr->la_valid & (LA_UID | LA_GID)))
+               RETURN(rc);
+
        /*
         * if object is striped, apply changes to all the stripes
         */
@@ -349,6 +370,35 @@ static int lod_attr_set(const struct lu_env *env,
                }
        }
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_MAKE_LOVEA_HOLE) &&
+           dt_object_exists(next) &&
+           dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
+               struct lod_thread_info *info = lod_env_info(env);
+               struct lu_buf *buf = &info->lti_buf;
+               struct lov_mds_md_v1 *lmm;
+               struct lov_ost_data_v1 *objs;
+               __u32 magic;
+               int rc1;
+
+               rc1 = lod_get_lov_ea(env, lo);
+               if (rc1  <= 0)
+                       RETURN(rc);
+
+               buf->lb_buf = info->lti_ea_store;
+               buf->lb_len = info->lti_ea_store_size;
+               lmm = info->lti_ea_store;
+               magic = le32_to_cpu(lmm->lmm_magic);
+               if (le16_to_cpu(lmm->lmm_stripe_count) >= 2) {
+                       if (magic == LOV_MAGIC_V1)
+                               objs = &(lmm->lmm_objects[1]);
+                       else
+                               objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[1];
+                       memset(objs, 0, sizeof(*objs));
+                       dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
+                                    LU_XATTR_REPLACE, handle, BYPASS_CAPA);
+               }
+       }
+
        RETURN(rc);
 }
 
@@ -379,15 +429,19 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
                struct lov_desc    *desc = &dev->lod_desc;
 
                if (buf->lb_buf == NULL) {
-                       rc = sizeof(struct lov_user_md_v1);
-               } else if (buf->lb_len >= sizeof(struct lov_user_md_v1)) {
-                       lum->lmm_magic = LOV_USER_MAGIC_V1;
-                       lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
-                       lum->lmm_pattern = desc->ld_pattern;
-                       lum->lmm_stripe_size = desc->ld_default_stripe_size;
-                       lum->lmm_stripe_count = desc->ld_default_stripe_count;
-                       lum->lmm_stripe_offset = desc->ld_default_stripe_offset;
-                       rc = sizeof(struct lov_user_md_v1);
+                       rc = sizeof(*lum);
+               } else if (buf->lb_len >= sizeof(*lum)) {
+                       lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
+                       lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
+                       lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
+                       lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
+                       lum->lmm_stripe_size = cpu_to_le32(
+                                               desc->ld_default_stripe_size);
+                       lum->lmm_stripe_count = cpu_to_le16(
+                                               desc->ld_default_stripe_count);
+                       lum->lmm_stripe_offset = cpu_to_le16(
+                                               desc->ld_default_stripe_offset);
+                       rc = sizeof(*lum);
                } else {
                        rc = -ERANGE;
                }
@@ -435,7 +489,7 @@ static int lod_declare_xattr_set(const struct lu_env *env,
                        if (rc)
                                RETURN(rc);
                } else {
-                       memset(attr, 0, sizeof(attr));
+                       memset(attr, 0, sizeof(*attr));
                        attr->la_valid = LA_TYPE | LA_MODE;
                        attr->la_mode = S_IFREG;
                }
@@ -461,6 +515,7 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
        struct lod_object       *l = lod_dt_obj(dt);
        struct lov_user_md_v1   *lum;
        struct lov_user_md_v3   *v3 = NULL;
+       const char              *pool_name = NULL;
        int                      rc;
        ENTRY;
 
@@ -471,16 +526,18 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
        l->ldo_def_stripe_size = 0;
        l->ldo_def_stripenr = 0;
 
-       LASSERT(buf);
-       LASSERT(buf->lb_buf);
+       LASSERT(buf != NULL && buf->lb_buf != NULL);
        lum = buf->lb_buf;
 
-       rc = lod_verify_striping(d, buf, 0);
+       rc = lod_verify_striping(d, buf, false);
        if (rc)
                RETURN(rc);
 
-       if (lum->lmm_magic == LOV_USER_MAGIC_V3)
+       if (lum->lmm_magic == LOV_USER_MAGIC_V3) {
                v3 = buf->lb_buf;
+               if (v3->lmm_pool_name[0] != '\0')
+                       pool_name = v3->lmm_pool_name;
+       }
 
        /* if { size, offset, count } = { 0, -1, 0 } and no pool
         * (i.e. all default values specified) then delete default
@@ -492,10 +549,8 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
                (int)lum->lmm_stripe_offset,
                v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
 
-       if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
-                               (lum->lmm_stripe_count),
-                               (lum->lmm_stripe_offset)) &&
-                       lum->lmm_magic == LOV_USER_MAGIC_V1) {
+       if (LOVEA_DELETE_VALUES(lum->lmm_stripe_size, lum->lmm_stripe_count,
+                               lum->lmm_stripe_offset, pool_name)) {
                rc = dt_xattr_del(env, next, name, th, capa);
                if (rc == -ENODATA)
                        rc = 0;
@@ -530,10 +585,13 @@ static int lod_xattr_set(const struct lu_env *env,
                 * already have during req replay, declare_xattr_set()
                 * defines striping, then create() does the work
                */
-               if (fl & LU_XATTR_REPLACE)
+               if (fl & LU_XATTR_REPLACE) {
+                       /* free stripes, then update disk */
+                       lod_object_free_striping(env, lod_dt_obj(dt));
                        rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
-               else
+               } else {
                        rc = lod_striping_create(env, dt, NULL, NULL, th);
+               }
                RETURN(rc);
        } else {
                /*
@@ -556,6 +614,8 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
                         const char *name, struct thandle *th,
                         struct lustre_capa *capa)
 {
+       if (!strcmp(name, XATTR_NAME_LOV))
+               lod_object_free_striping(env, lod_dt_obj(dt));
        return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
 }
 
@@ -661,7 +721,7 @@ static void lod_ah_init(const struct lu_env *env,
                        struct dt_allocation_hint *ah,
                        struct dt_object *parent,
                        struct dt_object *child,
-                       cfs_umode_t child_mode)
+                       umode_t child_mode)
 {
        struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
        struct dt_object  *nextp = NULL;
@@ -689,8 +749,9 @@ static void lod_ah_init(const struct lu_env *env,
         * in case of late striping creation, ->ah_init()
         * can be called with local object existing
         */
-       if (!dt_object_exists(nextc))
-               nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
+       if (!dt_object_exists(nextc) || dt_object_remote(nextc))
+               nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
+                                         NULL : nextp, nextc, child_mode);
 
        if (S_ISDIR(child_mode)) {
                if (lp->ldo_striping_cached == 0) {
@@ -876,7 +937,6 @@ static int lod_declare_object_create(const struct lu_env *env,
        LASSERT(dof);
        LASSERT(attr);
        LASSERT(th);
-       LASSERT(!dt_object_exists(next));
 
        /*
         * first of all, we declare creation of local object
@@ -908,7 +968,8 @@ static int lod_declare_object_create(const struct lu_env *env,
 
                if (LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
                                        lo->ldo_def_stripenr,
-                                       lo->ldo_def_stripe_offset))
+                                       lo->ldo_def_stripe_offset,
+                                       lo->ldo_pool))
                        RETURN(0);
 
                OBD_ALLOC_PTR(v3);
@@ -917,8 +978,8 @@ static int lod_declare_object_create(const struct lu_env *env,
 
                v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3);
                v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
-               v3->lmm_object_id = fid_oid(lu_object_fid(&dt->do_lu));
-               v3->lmm_object_seq = fid_seq(lu_object_fid(&dt->do_lu));
+               fid_to_lmm_oi(lu_object_fid(&dt->do_lu), &v3->lmm_oi);
+               lmm_oi_cpu_to_le(&v3->lmm_oi, &v3->lmm_oi);
                v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
                v3->lmm_stripe_count = cpu_to_le32(lo->ldo_def_stripenr);
                v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
@@ -947,8 +1008,6 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
        int                rc = 0, i;
        ENTRY;
 
-       LASSERT(lo->ldo_stripe);
-       LASSERT(lo->ldo_stripe > 0);
        LASSERT(lo->ldo_striping_cached == 0);
 
        /* create all underlying objects */
@@ -1102,6 +1161,23 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt)
        return dt_object_sync(env, dt_object_child(dt));
 }
 
+static int lod_object_lock(const struct lu_env *env,
+                          struct dt_object *dt, struct lustre_handle *lh,
+                          struct ldlm_enqueue_info *einfo,
+                          void *policy)
+{
+       struct dt_object   *next = dt_object_child(dt);
+       int              rc;
+       ENTRY;
+
+       /*
+        * declare setattr on the local object
+        */
+       rc = dt_object_lock(env, next, lh, einfo, policy);
+
+       RETURN(rc);
+}
+
 struct dt_object_operations lod_obj_ops = {
        .do_read_lock           = lod_object_read_lock,
        .do_write_lock          = lod_object_write_lock,
@@ -1129,6 +1205,7 @@ struct dt_object_operations lod_obj_ops = {
        .do_ref_del             = lod_ref_del,
        .do_capa_get            = lod_capa_get,
        .do_object_sync         = lod_object_sync,
+       .do_object_lock         = lod_object_lock,
 };
 
 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
@@ -1202,6 +1279,7 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
                lo->ldo_stripes_allocated = 0;
        }
        lo->ldo_stripenr = 0;
+       lo->ldo_pattern = 0;
 }
 
 /*