#include <lustre_fid.h>
#include <lustre_lmv.h>
#include <obd_lov.h>
+#include <md_object.h>
#include "lod_internal.h"
if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
if (!(attr->la_valid & (LA_UID | LA_GID)))
RETURN(rc);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
+ RETURN(0);
} else {
if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
LA_ATIME | LA_MTIME | LA_CTIME)))
}
}
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
+ dt_object_exists(next) != 0 &&
+ dt_object_remote(next) == 0)
+ dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
+ dt_object_exists(next) &&
+ dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_buf *buf = &info->lti_buf;
+
+ buf->lb_buf = info->lti_ea_store;
+ buf->lb_len = info->lti_ea_store_size;
+ dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
+ LU_XATTR_REPLACE, handle);
+ }
+
RETURN(rc);
}
if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
if (!(attr->la_valid & (LA_UID | LA_GID)))
RETURN(rc);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
+ RETURN(0);
} else {
if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
LA_ATIME | LA_MTIME | LA_CTIME)))
if ((attr->la_valid & LA_ATIME &&
attr->la_atime < la->la_atime) ||
(attr->la_valid & LA_CTIME &&
- attr->la_atime < la->la_ctime) ||
+ attr->la_ctime < la->la_ctime) ||
(attr->la_valid & LA_MTIME &&
- attr->la_atime < la->la_mtime))
+ attr->la_mtime < la->la_mtime))
setattr_time = true;
if (!setattr_time)
}
}
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
+ dt_object_exists(next) != 0 &&
+ dt_object_remote(next) == 0)
+ dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
+ dt_object_exists(next) &&
+ dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_buf *buf = &info->lti_buf;
+ struct ost_id *oi = &info->lti_ostid;
+ struct lu_fid *fid = &info->lti_fid;
+ struct lov_mds_md_v1 *lmm;
+ struct lov_ost_data_v1 *objs;
+ __u32 magic;
+ int rc1;
+
+ rc1 = lod_get_lov_ea(env, lo);
+ if (rc1 <= 0)
+ RETURN(rc);
+
+ buf->lb_buf = info->lti_ea_store;
+ buf->lb_len = info->lti_ea_store_size;
+ lmm = info->lti_ea_store;
+ magic = le32_to_cpu(lmm->lmm_magic);
+ if (magic == LOV_MAGIC_V1)
+ objs = &(lmm->lmm_objects[0]);
+ else
+ objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
+ ostid_le_to_cpu(&objs->l_ost_oi, oi);
+ ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
+ fid->f_oid--;
+ fid_to_ostid(fid, oi);
+ ostid_cpu_to_le(oi, &objs->l_ost_oi);
+ dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
+ LU_XATTR_REPLACE, handle, BYPASS_CAPA);
+ }
+
RETURN(rc);
}
struct lmv_mds_md_v1 *lmm1;
int stripe_count;
int lmm_size;
+ int type = LU_SEQ_RANGE_ANY;
int i;
int rc;
__u32 mdtidx;
lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
- &mdtidx, LU_SEQ_RANGE_MDT);
+ &mdtidx, &type);
if (rc != 0)
RETURN(rc);
int rc = 0;
ENTRY;
+ if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_MIGRATE)
+ RETURN(0);
+
if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
RETURN(-EINVAL);
for (i = 1; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
struct lod_tgt_desc *tgt;
int idx;
+ int type = LU_SEQ_RANGE_ANY;
struct dt_object *dto;
fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
- rc = lod_fld_lookup(env, lod, fid,
- &idx, LU_SEQ_RANGE_MDT);
+ rc = lod_fld_lookup(env, lod, fid, &idx, &type);
if (rc != 0)
GOTO(out, rc);
GOTO(out_put, rc);
}
- rc = dt_declare_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, 0, th);
+ rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
+ XATTR_NAME_LMV, 0, th);
if (rc != 0)
GOTO(out_put, rc);
capa);
}
- rc = dt_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, fl, th, capa);
+ rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
+ fl, th, capa);
RETURN(rc);
}
if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
GOTO(unlock, rc = 0);
+ CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
+ PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
+ (int)v1->lmm_stripe_count,
+ (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
+
lp->ldo_def_stripenr = v1->lmm_stripe_count;
lp->ldo_def_stripe_size = v1->lmm_stripe_size;
lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
GOTO(out, rc = -ENOMEM);
}
- /* choose OST and generate appropriate objects */
- rc = lod_qos_prep_create(env, lo, attr, lovea, th);
- if (rc) {
- /* failed to create striping, let's reset
- * config so that others don't get confused */
- lod_object_free_striping(env, lo);
- GOTO(out, rc);
- }
+ if (!dt_object_remote(next)) {
+ /* choose OST and generate appropriate objects */
+ rc = lod_qos_prep_create(env, lo, attr, lovea, th);
+ if (rc) {
+ /* failed to create striping, let's reset
+ * config so that others don't get confused */
+ lod_object_free_striping(env, lo);
+ GOTO(out, rc);
+ }
- /*
- * declare storage for striping data
- */
- info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
+ /*
+ * declare storage for striping data
+ */
+ info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1);
- rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV,
- 0, th);
+ } else {
+ /* LOD can not choose OST objects for remote objects, i.e.
+ * stripes must be ready before that. Right now, it can only
+ * happen during migrate, i.e. migrate process needs to create
+ * remote regular file (mdd_migrate_create), then the migrate
+ * process will provide stripeEA. */
+ LASSERT(lovea != NULL);
+ info->lti_buf = *lovea;
+ }
+
+ rc = dt_declare_xattr_set(env, next, &info->lti_buf,
+ XATTR_NAME_LOV, 0, th);
if (rc)
GOTO(out, rc);
rc = lod_declare_striped_object(env, dt, attr,
NULL, th);
} else if (dof->dof_type == DFT_DIR) {
- rc = lod_declare_dir_striping_create(env, dt, attr, dof, th);
+ /* Orphan object (like migrating object) does not have
+ * lod_dir_stripe, see lod_ah_init */
+ if (lo->ldo_dir_stripe != NULL)
+ rc = lod_declare_dir_striping_create(env, dt, attr,
+ dof, th);
}
out:
RETURN(rc);
rc = dt_create(env, next, attr, hint, dof, th);
if (rc == 0) {
- if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
+ if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
+ lo->ldo_dir_stripe != NULL)
rc = lod_dir_striping_create(env, dt, attr, dof, th);
else if (lo->ldo_stripe && dof->u.dof_reg.striped != 0)
rc = lod_striping_create(env, dt, attr, dof, th);
if (rc)
RETURN(rc);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
+ RETURN(0);
+
/*
* load striping information, notice we don't do this when object
* is being initialized as we don't need this information till
if (rc)
RETURN(rc);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
+ RETURN(0);
+
/* destroy all underlying objects */
for (i = 0; i < lo->ldo_stripenr; i++) {
LASSERT(lo->ldo_stripe[i]);
- rc = dt_destroy(env, lo->ldo_stripe[i], th);
- if (rc)
- break;
+ /* for striped directory, next == ldo_stripe[0] */
+ if (next != lo->ldo_stripe[i]) {
+ rc = dt_destroy(env, lo->ldo_stripe[i], th);
+ if (rc)
+ break;
+ }
}
RETURN(rc);
return dt_object_sync(env, dt_object_child(dt));
}
+struct lod_slave_locks {
+ int lsl_lock_count;
+ struct lustre_handle lsl_handle[0];
+};
+
+static int lod_object_unlock_internal(const struct lu_env *env,
+ struct dt_object *dt,
+ struct ldlm_enqueue_info *einfo,
+ ldlm_policy_data_t *policy)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
+ int rc = 0;
+ int i;
+ ENTRY;
+
+ if (slave_locks == NULL)
+ RETURN(0);
+
+ for (i = 0; i < slave_locks->lsl_lock_count; i++) {
+ if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
+ int rc1;
+
+ einfo->ei_cbdata = &slave_locks->lsl_handle[i];
+ rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
+ policy);
+ if (rc1 < 0)
+ rc = rc == 0 ? rc1 : rc;
+ }
+ }
+
+ RETURN(rc);
+}
+
+static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
+ struct ldlm_enqueue_info *einfo,
+ union ldlm_policy_data *policy)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
+ int slave_locks_size;
+ int rc;
+ ENTRY;
+
+ if (slave_locks == NULL)
+ RETURN(0);
+
+ rc = lod_load_striping(env, lo);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* Note: for remote lock for single stripe dir, MDT will cancel
+ * the lock by lockh directly */
+ if (lo->ldo_stripenr == 0 && dt_object_remote(dt_object_child(dt)))
+ RETURN(0);
+
+ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
+ RETURN(-ENOTDIR);
+
+ /* Only cancel slave lock for striped dir */
+ rc = lod_object_unlock_internal(env, dt, einfo, policy);
+
+ slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
+ sizeof(slave_locks->lsl_handle[0]);
+ OBD_FREE(slave_locks, slave_locks_size);
+ einfo->ei_cbdata = NULL;
+
+ RETURN(rc);
+}
+
static int lod_object_lock(const struct lu_env *env,
- struct dt_object *dt, struct lustre_handle *lh,
+ struct dt_object *dt,
+ struct lustre_handle *lh,
struct ldlm_enqueue_info *einfo,
- void *policy)
+ union ldlm_policy_data *policy)
{
- struct dt_object *next = dt_object_child(dt);
- int rc;
+ struct lod_object *lo = lod_dt_obj(dt);
+ int rc = 0;
+ int i;
+ int slave_locks_size;
+ struct lod_slave_locks *slave_locks = NULL;
ENTRY;
- /*
- * declare setattr on the local object
- */
- rc = dt_object_lock(env, next, lh, einfo, policy);
+ /* remote object lock */
+ if (!einfo->ei_enq_slave) {
+ LASSERT(dt_object_remote(dt));
+ return dt_object_lock(env, dt_object_child(dt), lh, einfo,
+ policy);
+ }
+
+ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
+ RETURN(-ENOTDIR);
+
+ rc = lod_load_striping(env, lo);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* No stripes */
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
+ slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
+ sizeof(slave_locks->lsl_handle[0]);
+ /* Freed in lod_object_unlock */
+ OBD_ALLOC(slave_locks, slave_locks_size);
+ if (slave_locks == NULL)
+ RETURN(-ENOMEM);
+ slave_locks->lsl_lock_count = lo->ldo_stripenr;
+
+ /* striped directory lock */
+ for (i = 0; i < lo->ldo_stripenr; i++) {
+ struct lustre_handle lockh;
+ struct ldlm_res_id *res_id;
+
+ res_id = &lod_env_info(env)->lti_res_id;
+ fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
+ res_id);
+ einfo->ei_res_id = res_id;
+
+ LASSERT(lo->ldo_stripe[i]);
+ rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
+ policy);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ slave_locks->lsl_handle[i] = lockh;
+ }
+
+ einfo->ei_cbdata = slave_locks;
+
+out:
+ if (rc != 0 && slave_locks != NULL) {
+ einfo->ei_cbdata = slave_locks;
+ lod_object_unlock_internal(env, dt, einfo, policy);
+ OBD_FREE(slave_locks, slave_locks_size);
+ einfo->ei_cbdata = NULL;
+ }
RETURN(rc);
}
.do_capa_get = lod_capa_get,
.do_object_sync = lod_object_sync,
.do_object_lock = lod_object_lock,
+ .do_object_unlock = lod_object_unlock,
};
static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
static ssize_t lod_declare_write(const struct lu_env *env,
struct dt_object *dt,
- const loff_t size, loff_t pos,
+ const struct lu_buf *buf, loff_t pos,
struct thandle *th)
{
return dt_declare_record_write(env, dt_object_child(dt),
- size, pos, th);
+ buf, pos, th);
}
static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
.dbo_write = lod_write
};
-static int lod_object_init(const struct lu_env *env, struct lu_object *o,
+static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
const struct lu_object_conf *conf)
{
- struct lod_device *d = lu2lod_dev(o->lo_dev);
- struct lu_object *below;
- struct lu_device *under;
+ struct lod_device *lod = lu2lod_dev(lo->lo_dev);
+ struct lu_device *cdev = NULL;
+ struct lu_object *cobj;
+ struct lod_tgt_descs *ltd = NULL;
+ struct lod_tgt_desc *tgt;
+ mdsno_t idx = 0;
+ int type = LU_SEQ_RANGE_ANY;
+ int rc;
ENTRY;
- /*
- * create local object
- */
- under = &d->lod_child->dd_lu_dev;
- below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
- if (below == NULL)
+ rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (type == LU_SEQ_RANGE_MDT &&
+ idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
+ cdev = &lod->lod_child->dd_lu_dev;
+ } else if (type == LU_SEQ_RANGE_MDT) {
+ ltd = &lod->lod_mdt_descs;
+ lod_getref(ltd);
+ } else if (type == LU_SEQ_RANGE_OST) {
+ ltd = &lod->lod_ost_descs;
+ lod_getref(ltd);
+ } else {
+ LBUG();
+ }
+
+ if (ltd != NULL) {
+ if (ltd->ltd_tgts_size > idx &&
+ cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
+ tgt = LTD_TGT(ltd, idx);
+
+ LASSERT(tgt != NULL);
+ LASSERT(tgt->ltd_tgt != NULL);
+
+ cdev = &(tgt->ltd_tgt->dd_lu_dev);
+ }
+ lod_putref(lod, ltd);
+ }
+
+ if (unlikely(cdev == NULL))
+ RETURN(-ENOENT);
+
+ cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
+ if (unlikely(cobj == NULL))
RETURN(-ENOMEM);
- lu_object_add(o, below);
+ lu_object_add(lo, cobj);
RETURN(0);
}
.loo_object_release = lod_object_release,
.loo_object_print = lod_object_print,
};
-
-/**
- * Init remote lod object
- */
-static int lod_robject_init(const struct lu_env *env, struct lu_object *lo,
- const struct lu_object_conf *conf)
-{
- struct lod_device *lod = lu2lod_dev(lo->lo_dev);
- struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
- struct lu_device *c_dev = NULL;
- struct lu_object *c_obj;
- int i;
- ENTRY;
-
- lod_getref(ltd);
- if (ltd->ltd_tgts_size > 0) {
- cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
- struct lod_tgt_desc *tgt;
- tgt = LTD_TGT(ltd, i);
- LASSERT(tgt && tgt->ltd_tgt);
- if (tgt->ltd_index ==
- lu2lod_obj(lo)->ldo_mds_num) {
- c_dev = &(tgt->ltd_tgt->dd_lu_dev);
- break;
- }
- }
- }
- lod_putref(lod, ltd);
-
- if (unlikely(c_dev == NULL))
- RETURN(-ENOENT);
-
- c_obj = c_dev->ld_ops->ldo_object_alloc(env, lo->lo_header, c_dev);
- if (unlikely(c_obj == NULL))
- RETURN(-ENOMEM);
-
- lu_object_add(lo, c_obj);
-
- RETURN(0);
-}
-
-struct lu_object_operations lod_lu_robj_ops = {
- .loo_object_init = lod_robject_init,
- .loo_object_start = lod_object_start,
- .loo_object_free = lod_object_free,
- .loo_object_release = lod_object_release,
- .loo_object_print = lod_object_print,
-};