* Copyright 2009 Sun Microsystems, Inc. All rights reserved
* Use is subject to license terms.
*
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2013, Intel Corporation.
*/
/*
* lustre/lod/lod_object.c
* Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
#define DEBUG_SUBSYSTEM S_MDS
#include <obd.h>
#include "lod_internal.h"
-extern cfs_mem_cache_t *lod_object_kmem;
+extern struct kmem_cache *lod_object_kmem;
static const struct dt_body_operations lod_body_lnk_ops;
static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
#define LOD_CHECK_IT(env, it) \
{ \
- /* IT is supposed to be in thread info always */ \
- LASSERT((it) == &lod_env_info(env)->lti_it); \
LASSERT((it)->lit_obj != NULL); \
LASSERT((it)->lit_it != NULL); \
} while(0)
if (rc)
RETURN(rc);
+ /* osp_declare_attr_set() ignores all attributes other than
+ * UID, GID, and size, and osp_attr_set() ignores all but UID
+ * and GID. Declaration of size attr setting happens through
+ * lod_declare_init_size(), and not through this function.
+ * Therefore we need not load striping unless ownership is
+ * changing. This should save memory and (we hope) speed up
+ * rename(). */
+ if (!(attr->la_valid & (LA_UID | LA_GID)))
+ RETURN(rc);
+
/*
* load striping information, notice we don't do this when object
* is being initialized as we don't need this information till
if (rc)
RETURN(rc);
+ if (!(attr->la_valid & (LA_UID | LA_GID)))
+ RETURN(rc);
+
/*
* if object is striped, apply changes to all the stripes
*/
struct lov_desc *desc = &dev->lod_desc;
if (buf->lb_buf == NULL) {
- rc = sizeof(struct lov_user_md_v1);
- } else if (buf->lb_len >= sizeof(struct lov_user_md_v1)) {
- lum->lmm_magic = LOV_USER_MAGIC_V1;
- lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
- lum->lmm_pattern = desc->ld_pattern;
- lum->lmm_stripe_size = desc->ld_default_stripe_size;
- lum->lmm_stripe_count = desc->ld_default_stripe_count;
- lum->lmm_stripe_offset = desc->ld_default_stripe_offset;
- rc = sizeof(struct lov_user_md_v1);
+ rc = sizeof(*lum);
+ } else if (buf->lb_len >= sizeof(*lum)) {
+ lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
+ lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
+ lmm_oi_set_id(&lum->lmm_oi, 0);
+ lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
+ lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
+ lum->lmm_stripe_size = cpu_to_le32(
+ desc->ld_default_stripe_size);
+ lum->lmm_stripe_count = cpu_to_le16(
+ desc->ld_default_stripe_count);
+ lum->lmm_stripe_offset = cpu_to_le16(
+ desc->ld_default_stripe_offset);
+ rc = sizeof(*lum);
} else {
rc = -ERANGE;
}
if (rc)
RETURN(rc);
} else {
- memset(attr, 0, sizeof(attr));
+ memset(attr, 0, sizeof(*attr));
attr->la_valid = LA_TYPE | LA_MODE;
attr->la_mode = S_IFREG;
}
l->ldo_def_stripe_size = 0;
l->ldo_def_stripenr = 0;
- LASSERT(buf);
- LASSERT(buf->lb_buf);
+ LASSERT(buf != NULL && buf->lb_buf != NULL);
lum = buf->lb_buf;
rc = lod_verify_striping(d, buf, 0);
* already have during req replay, declare_xattr_set()
* defines striping, then create() does the work
*/
- if (fl & LU_XATTR_REPLACE)
+ if (fl & LU_XATTR_REPLACE) {
+ /* free stripes, then update disk */
+ lod_object_free_striping(env, lod_dt_obj(dt));
rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
- else
+ } else {
rc = lod_striping_create(env, dt, NULL, NULL, th);
+ }
RETURN(rc);
} else {
/*
const char *name, struct thandle *th,
struct lustre_capa *capa)
{
+ if (!strcmp(name, XATTR_NAME_LOV))
+ lod_object_free_striping(env, lod_dt_obj(dt));
return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
}
struct dt_allocation_hint *ah,
struct dt_object *parent,
struct dt_object *child,
- cfs_umode_t child_mode)
+ umode_t child_mode)
{
struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
struct dt_object *nextp = NULL;
* in case of late striping creation, ->ah_init()
* can be called with local object existing
*/
- if (!dt_object_exists(nextc))
- nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
+ if (!dt_object_exists(nextc) || dt_object_remote(nextc))
+ nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
+ NULL : nextp, nextc, child_mode);
if (S_ISDIR(child_mode)) {
if (lp->ldo_striping_cached == 0) {
lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
lc->ldo_striping_cached = 1;
lc->ldo_def_striping_set = 1;
- CDEBUG(D_OTHER, "inherite striping defaults\n");
+ CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
+ (int)lc->ldo_def_stripenr,
+ (int)lc->ldo_def_stripe_size,
+ (int)lc->ldo_def_stripe_offset);
}
return;
}
LASSERT(dof);
LASSERT(attr);
LASSERT(th);
- LASSERT(!dt_object_exists(next));
/*
* first of all, we declare creation of local object
} else if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) {
struct lod_thread_info *info = lod_env_info(env);
- info->lti_buf.lb_buf = NULL;
- info->lti_buf.lb_len = sizeof(struct lov_user_md_v3);
+ struct lov_user_md_v3 *v3;
+
+ if (LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
+ lo->ldo_def_stripenr,
+ lo->ldo_def_stripe_offset))
+ RETURN(0);
+
+ OBD_ALLOC_PTR(v3);
+ if (v3 == NULL)
+ RETURN(-ENOMEM);
+
+ v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3);
+ v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
+ fid_to_lmm_oi(lu_object_fid(&dt->do_lu), &v3->lmm_oi);
+ lmm_oi_cpu_to_le(&v3->lmm_oi, &v3->lmm_oi);
+ v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
+ v3->lmm_stripe_count = cpu_to_le32(lo->ldo_def_stripenr);
+ v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
+ if (lo->ldo_pool)
+ strncpy(v3->lmm_pool_name, lo->ldo_pool,
+ LOV_MAXPOOLNAME);
+
+ info->lti_buf.lb_buf = v3;
+ info->lti_buf.lb_len = sizeof(*v3);
+
/* to transfer default striping from the parent */
rc = dt_declare_xattr_set(env, next, &info->lti_buf,
XATTR_NAME_LOV, 0, th);
+ OBD_FREE_PTR(v3);
}
out:
int rc = 0, i;
ENTRY;
- LASSERT(lo->ldo_stripe);
- LASSERT(lo->ldo_stripe > 0);
LASSERT(lo->ldo_striping_cached == 0);
/* create all underlying objects */
return dt_object_sync(env, dt_object_child(dt));
}
+static int lod_object_lock(const struct lu_env *env,
+ struct dt_object *dt, struct lustre_handle *lh,
+ struct ldlm_enqueue_info *einfo,
+ void *policy)
+{
+ struct dt_object *next = dt_object_child(dt);
+ int rc;
+ ENTRY;
+
+ /*
+ * declare setattr on the local object
+ */
+ rc = dt_object_lock(env, next, lh, einfo, policy);
+
+ RETURN(rc);
+}
+
struct dt_object_operations lod_obj_ops = {
.do_read_lock = lod_object_read_lock,
.do_write_lock = lod_object_write_lock,
.do_ref_del = lod_ref_del,
.do_capa_get = lod_capa_get,
.do_object_sync = lod_object_sync,
+ .do_object_lock = lod_object_lock,
};
static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
lo->ldo_stripes_allocated = 0;
}
lo->ldo_stripenr = 0;
+ lo->ldo_pattern = 0;
}
/*
.loo_object_release = lod_object_release,
.loo_object_print = lod_object_print,
};
+
+/**
+ * Init remote lod object
+ */
+static int lod_robject_init(const struct lu_env *env, struct lu_object *lo,
+ const struct lu_object_conf *conf)
+{
+ struct lod_device *lod = lu2lod_dev(lo->lo_dev);
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lu_device *c_dev = NULL;
+ struct lu_object *c_obj;
+ int i;
+ ENTRY;
+
+ lod_getref(ltd);
+ if (ltd->ltd_tgts_size > 0) {
+ cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+ struct lod_tgt_desc *tgt;
+ tgt = LTD_TGT(ltd, i);
+ LASSERT(tgt && tgt->ltd_tgt);
+ if (tgt->ltd_index ==
+ lu2lod_obj(lo)->ldo_mds_num) {
+ c_dev = &(tgt->ltd_tgt->dd_lu_dev);
+ break;
+ }
+ }
+ }
+ lod_putref(lod, ltd);
+
+ if (unlikely(c_dev == NULL))
+ RETURN(-ENOENT);
+
+ c_obj = c_dev->ld_ops->ldo_object_alloc(env, lo->lo_header, c_dev);
+ if (unlikely(c_obj == NULL))
+ RETURN(-ENOMEM);
+
+ lu_object_add(lo, c_obj);
+
+ RETURN(0);
+}
+
+struct lu_object_operations lod_lu_robj_ops = {
+ .loo_object_init = lod_robject_init,
+ .loo_object_start = lod_object_start,
+ .loo_object_free = lod_object_free,
+ .loo_object_release = lod_object_release,
+ .loo_object_print = lod_object_print,
+};