* Copyright 2009 Sun Microsystems, Inc. All rights reserved
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
*/
/*
* lustre/lod/lod_object.c
* local OSD object interface to the MDD layer, and abstracts the
* addressing of local (OSD) and remote (OSP) objects. The API is
* described in the file lustre/include/dt_object.h and in
- * lustre/doc/osd-api.txt.
+ * Documentation/osd-api.txt.
*
* Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
*/
#include <obd.h>
#include <obd_class.h>
-#include <lustre_ver.h>
#include <obd_support.h>
-#include <lprocfs_status.h>
#include <lustre_fid.h>
-#include <lustre_param.h>
-#include <lustre_fid.h>
+#include <lustre_linkea.h>
#include <lustre_lmv.h>
+#include <lustre_param.h>
+#include <lustre_swab.h>
+#include <lustre_ver.h>
+#include <lprocfs_status.h>
#include <md_object.h>
-#include <lustre_linkea.h>
#include "lod_internal.h"
static const char dotdot[] = "..";
static const struct dt_body_operations lod_body_lnk_ops;
+static const struct dt_body_operations lod_body_ops;
/**
* Implementation of dt_index_operations::dio_lookup
next = lo->ldo_stripe[it->lit_stripe_index];
LASSERT(next != NULL);
- LASSERT(next->do_index_ops != NULL);
-
rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
if (rc != 0)
RETURN(rc);
+ LASSERT(next->do_index_ops != NULL);
+
it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr);
if (!IS_ERR(it_next)) {
it->lit_it = it_next;
}
/**
- * Create a striped directory.
+ * Declare create a striped directory.
*
- * Create a striped directory with a given stripe pattern on the specified MDTs.
- * A striped directory is represented as a regular directory - an index listing
- * all the stripes. The stripes point back to the master object with ".." and
- * LinkEA. The master object gets LMV EA which identifies it as a striped
- * directory. The function allocates FIDs for all the stripes.
+ * Declare creating a striped directory with a given stripe pattern on the
+ * specified MDTs. A striped directory is represented as a regular directory
+ * - an index listing all the stripes. The stripes point back to the master
+ * object with ".." and LinkEA. The master object gets LMV EA which
+ * identifies it as a striped directory. The function allocates FIDs
+ * for all stripes.
*
* \param[in] env execution environment
* \param[in] dt object
* \param[in] attr attributes to initialize the objects with
- * \param[in] lum a pattern specifying the number of stripes and
- * MDT to start from
* \param[in] dof type of objects to be created
* \param[in] th transaction handle
*
struct dt_object **stripe;
__u32 stripe_count;
int *idx_array;
+ __u32 master_index;
int rc = 0;
__u32 i;
__u32 j;
stripe_count = le32_to_cpu(lum->lum_stripe_count);
- /* shrink the stripe_count to the avaible MDT count */
- if (stripe_count > lod->lod_remote_mdt_count + 1)
- stripe_count = lod->lod_remote_mdt_count + 1;
-
OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
if (stripe == NULL)
RETURN(-ENOMEM);
if (idx_array == NULL)
GOTO(out_free, rc = -ENOMEM);
+ /* Start index will be the master MDT */
+ master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
+ idx_array[0] = master_index;
for (i = 0; i < stripe_count; i++) {
struct lod_tgt_desc *tgt = NULL;
struct dt_object *dto;
struct lu_object_conf conf = { 0 };
struct dt_device *tgt_dt = NULL;
- if (i == 0) {
- /* Right now, master stripe and master object are
- * on the same MDT */
- idx = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
- rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
- NULL);
- if (rc < 0)
- GOTO(out_put, rc);
- tgt_dt = lod->lod_child;
- goto next;
- }
-
- idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
-
+ /* Try to find next avaible target */
+ idx = idx_array[i];
for (j = 0; j < lod->lod_remote_mdt_count;
j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
bool already_allocated = false;
__u32 k;
- CDEBUG(D_INFO, "try idx %d, mdt cnt %u,"
- " allocated %u, last allocated %d\n", idx,
- lod->lod_remote_mdt_count, i, idx_array[i - 1]);
+ CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n",
+ idx, lod->lod_remote_mdt_count + 1, i);
+ if (idx == master_index) {
+ /* Allocate the FID locally */
+ rc = obd_fid_alloc(env, lod->lod_child_exp,
+ &fid, NULL);
+ if (rc < 0)
+ GOTO(out_put, rc);
+ tgt_dt = lod->lod_child;
+ break;
+ }
/* Find next available target */
if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
continue;
- /* check whether the idx already exists
- * in current allocated array */
- for (k = 0; k < i; k++) {
- if (idx_array[k] == idx) {
- already_allocated = true;
- break;
+ if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) {
+ /* check whether the idx already exists
+ * in current allocated array */
+ for (k = 0; k < i; k++) {
+ if (idx_array[k] == idx) {
+ already_allocated = true;
+ break;
+ }
}
- }
- if (already_allocated)
- continue;
+ if (already_allocated)
+ continue;
+ }
/* check the status of the OSP */
tgt = LTD_TGT(ltd, idx);
break;
}
- CDEBUG(D_INFO, "idx %d, mdt cnt %u,"
- " allocated %u, last allocated %d\n", idx,
- lod->lod_remote_mdt_count, i, idx_array[i - 1]);
-
-next:
+ CDEBUG(D_INFO, "Get idx %d, for stripe %d "DFID"\n",
+ idx, i, PFID(&fid));
+ idx_array[i] = idx;
+ /* Set the start index for next stripe allocation */
+ if (i < stripe_count - 1)
+ idx_array[i + 1] = (idx + 1) %
+ (lod->lod_remote_mdt_count + 1);
/* tgt_dt and fid must be ready after search avaible OSP
* in the above loop */
LASSERT(tgt_dt != NULL);
if (IS_ERR(dto))
GOTO(out_put, rc = PTR_ERR(dto));
stripe[i] = dto;
- idx_array[i] = idx;
}
lo->ldo_dir_striped = 1;
RETURN(rc);
}
-
/**
* Implementation of dt_object_operations::do_declare_xattr_set.
*
if (rc != 0)
RETURN(rc);
+ /* Note: Do not set LinkEA on sub-stripes, otherwise
+ * it will confuse the fid2path process(see mdt_path_current()).
+ * The linkEA between master and sub-stripes is set in
+ * lod_xattr_set_lmv(). */
+ if (strcmp(name, XATTR_NAME_LINK) == 0)
+ RETURN(0);
+
/* set xattr to each stripes, if needed */
rc = lod_load_striping(env, lo);
if (rc != 0)
RETURN(rc);
- /* Note: Do not set LinkEA on sub-stripes, otherwise
- * it will confuse the fid2path process(see mdt_path_current()).
- * The linkEA between master and sub-stripes is set in
- * lod_xattr_set_lmv(). */
- if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
+ if (lo->ldo_stripenr == 0)
RETURN(0);
for (i = 0; i < lo->ldo_stripenr; i++) {
}
/**
+ * Reset parent FID on OST object
+ *
+ * Replace parent FID with @dt object FID, which is only called during migration
+ * to reset the parent FID after the MDT object is migrated to the new MDT, i.e.
+ * the FID is changed.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt dt_object whose stripes's parent FID will be reset
+ * \parem[in] th thandle
+ * \param[in] declare if it is declare
+ *
+ * \retval 0 if reset succeeds
+ * \retval negative errno if reset fais
+ */
+static int lod_object_replace_parent_fid(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th, bool declare)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_buf *buf = &info->lti_buf;
+ struct filter_fid *ff;
+ int i, rc;
+ ENTRY;
+
+ LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr));
+
+ /* set xattr to each stripes, if needed */
+ rc = lod_load_striping(env, lo);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
+ if (info->lti_ea_store_size < sizeof(*ff)) {
+ rc = lod_ea_store_resize(info, sizeof(*ff));
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ buf->lb_buf = info->lti_ea_store;
+ buf->lb_len = info->lti_ea_store_size;
+
+ for (i = 0; i < lo->ldo_stripenr; i++) {
+ if (lo->ldo_stripe[i] == NULL)
+ continue;
+
+ rc = dt_xattr_get(env, lo->ldo_stripe[i], buf,
+ XATTR_NAME_FID);
+ if (rc < 0) {
+ rc = 0;
+ continue;
+ }
+
+ ff = buf->lb_buf;
+ fid_le_to_cpu(&ff->ff_parent, &ff->ff_parent);
+ ff->ff_parent.f_seq = lu_object_fid(&dt->do_lu)->f_seq;
+ ff->ff_parent.f_oid = lu_object_fid(&dt->do_lu)->f_oid;
+ fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent);
+
+ if (declare) {
+ rc = lod_sub_object_declare_xattr_set(env,
+ lo->ldo_stripe[i], buf,
+ XATTR_NAME_FID,
+ LU_XATTR_REPLACE, th);
+ } else {
+ rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i],
+ buf, XATTR_NAME_FID,
+ LU_XATTR_REPLACE, th);
+ }
+ if (rc < 0)
+ break;
+ }
+
+ RETURN(rc);
+}
+
+/**
* Implementation of dt_object_operations::do_declare_xattr_set.
*
* \see dt_object_operations::do_declare_xattr_set() in the API description
rc = lod_declare_striped_object(env, dt, attr, buf, th);
} else if (S_ISDIR(mode)) {
rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
+ } else if (strcmp(name, XATTR_NAME_FID) == 0) {
+ rc = lod_object_replace_parent_fid(env, dt, th, true);
} else {
rc = lod_sub_object_declare_xattr_set(env, next, buf, name,
fl, th);
static int lod_xattr_set_internal(const struct lu_env *env,
struct dt_object *dt,
const struct lu_buf *buf,
- const char *name, int fl, struct thandle *th)
+ const char *name, int fl,
+ struct thandle *th)
{
struct dt_object *next = dt_object_child(dt);
struct lod_object *lo = lod_dt_obj(dt);
rc = lod_striping_create(env, dt, NULL, NULL, th);
}
RETURN(rc);
+ } else if (strcmp(name, XATTR_NAME_FID) == 0) {
+ rc = lod_object_replace_parent_fid(env, dt, th, false);
+
+ RETURN(rc);
}
/* then all other xattr */
LASSERT(lc->ldo_stripenr == 0);
LASSERT(lc->ldo_stripe == NULL);
- /*
- * local object may want some hints
- * in case of late striping creation, ->ah_init()
- * can be called with local object existing
- */
- if (!dt_object_exists(nextc) || dt_object_remote(nextc)) {
- struct dt_object *obj;
-
- obj = (nextp != NULL && dt_object_remote(nextp)) ? NULL : nextp;
- nextc->do_ops->do_ah_init(env, ah, obj, nextc, child_mode);
- }
+ if (!dt_object_exists(nextc))
+ nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
if (S_ISDIR(child_mode)) {
if (lc->ldo_dir_stripe == NULL) {
rc = lod_verify_md_striping(d, lum1);
if (rc == 0 &&
le32_to_cpu(lum1->lum_stripe_count) > 1) {
- /* Directory will be striped only if
- * stripe_count > 1 */
lc->ldo_stripenr =
le32_to_cpu(lum1->lum_stripe_count);
lc->ldo_dir_stripe_offset =
lc->ldo_dir_stripe_offset = -1;
}
+ /* shrink the stripe_count to the avaible MDT count */
+ if (lc->ldo_stripenr > d->lod_remote_mdt_count + 1 &&
+ !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
+ lc->ldo_stripenr = d->lod_remote_mdt_count + 1;
+
+ /* Directory will be striped only if stripe_count > 1, if
+ * stripe_count == 1, let's reset stripenr = 0 to avoid
+ * create single master stripe and also help to unify the
+ * stripe handling of directories and files */
+ if (lc->ldo_stripenr == 1)
+ lc->ldo_stripenr = 0;
+
CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
LASSERT(lo->ldo_stripe_size > 0);
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
rc = dt_attr_get(env, next, attr);
LASSERT(attr->la_valid & LA_SIZE);
if (rc)
if (dof->dof_type == DFT_SYM)
dt->do_body_ops = &lod_body_lnk_ops;
+ else if (dof->dof_type == DFT_REGULAR)
+ dt->do_body_ops = &lod_body_ops;
/*
* it's lod_ah_init() that has decided the object will be striped
* Note: if dah_eadata != NULL, it means creating the
* striped directory with specified stripeEA, then it
* should ignore the default stripeEA */
- if ((hint == NULL || hint->dah_eadata == NULL) &&
- lo->ldo_dir_stripe_offset != -1 &&
- lo->ldo_dir_stripe_offset != ss->ss_node_id)
- GOTO(out, rc = -EREMOTE);
+ if (hint != NULL && hint->dah_eadata == NULL) {
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT))
+ GOTO(out, rc = -EREMOTE);
+
+ if (lo->ldo_dir_stripe_offset == -1) {
+ /* child and parent should be in the same MDT */
+ if (hint->dah_parent != NULL &&
+ dt_object_remote(hint->dah_parent))
+ GOTO(out, rc = -EREMOTE);
+ } else if (lo->ldo_dir_stripe_offset !=
+ ss->ss_node_id) {
+ struct lod_device *lod;
+ struct lod_tgt_descs *ltd;
+ struct lod_tgt_desc *tgt = NULL;
+ bool found_mdt = false;
+ int i;
+
+ lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ ltd = &lod->lod_mdt_descs;
+ cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+ tgt = LTD_TGT(ltd, i);
+ if (tgt->ltd_index ==
+ lo->ldo_dir_stripe_offset) {
+ found_mdt = true;
+ break;
+ }
+ }
+
+ /* If the MDT indicated by stripe_offset can be
+ * found, then tell client to resend the create
+ * request to the correct MDT, otherwise return
+ * error to client */
+ if (found_mdt)
+ GOTO(out, rc = -EREMOTE);
+ else
+ GOTO(out, rc = -EINVAL);
+ }
+ }
/* Orphan object (like migrating object) does not have
* lod_dir_stripe, see lod_ah_init */
return dt_object_sync(env, dt_object_child(dt), start, end);
}
-struct lod_slave_locks {
- int lsl_lock_count;
- struct lustre_handle lsl_handle[0];
-};
-
/**
* Release LDLM locks on the stripes of a striped directory.
*
static int lod_object_unlock_internal(const struct lu_env *env,
struct dt_object *dt,
struct ldlm_enqueue_info *einfo,
- ldlm_policy_data_t *policy)
+ union ldlm_policy_data *policy)
{
- struct lod_object *lo = lod_dt_obj(dt);
- struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
+ struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
int rc = 0;
int i;
ENTRY;
if (slave_locks == NULL)
RETURN(0);
- for (i = 1; i < slave_locks->lsl_lock_count; i++) {
- if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
- int rc1;
-
- einfo->ei_cbdata = &slave_locks->lsl_handle[i];
- rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
- policy);
- if (rc1 < 0)
- rc = rc == 0 ? rc1 : rc;
- }
+ for (i = 1; i < slave_locks->count; i++) {
+ if (lustre_handle_is_used(&slave_locks->handles[i]))
+ ldlm_lock_decref(&slave_locks->handles[i],
+ einfo->ei_mode);
}
RETURN(rc);
struct ldlm_enqueue_info *einfo,
union ldlm_policy_data *policy)
{
- struct lod_object *lo = lod_dt_obj(dt);
- struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
- int slave_locks_size;
- int rc;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
+ int slave_locks_size;
+ int i;
ENTRY;
if (slave_locks == NULL)
RETURN(0);
- if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
- RETURN(-ENOTDIR);
-
- rc = lod_load_striping(env, lo);
- if (rc != 0)
- RETURN(rc);
-
+ LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
+ LASSERT(lo->ldo_stripenr > 1);
/* Note: for remote lock for single stripe dir, MDT will cancel
* the lock by lockh directly */
- if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
- RETURN(0);
+ LASSERT(!dt_object_remote(dt_object_child(dt)));
- /* Only cancel slave lock for striped dir */
- rc = lod_object_unlock_internal(env, dt, einfo, policy);
+ /* locks were unlocked in MDT layer */
+ for (i = 1; i < slave_locks->count; i++)
+ LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
- slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
- sizeof(slave_locks->lsl_handle[0]);
+ slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
+ sizeof(slave_locks->handles[0]);
OBD_FREE(slave_locks, slave_locks_size);
einfo->ei_cbdata = NULL;
- RETURN(rc);
+ RETURN(0);
}
/**
int rc = 0;
int i;
int slave_locks_size;
- struct lod_slave_locks *slave_locks = NULL;
+ struct lustre_handle_array *slave_locks = NULL;
ENTRY;
/* remote object lock */
RETURN(0);
slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
- sizeof(slave_locks->lsl_handle[0]);
+ sizeof(slave_locks->handles[0]);
/* Freed in lod_object_unlock */
OBD_ALLOC(slave_locks, slave_locks_size);
if (slave_locks == NULL)
RETURN(-ENOMEM);
- slave_locks->lsl_lock_count = lo->ldo_stripenr;
+ slave_locks->count = lo->ldo_stripenr;
/* striped directory lock */
for (i = 1; i < lo->ldo_stripenr; i++) {
res_id);
einfo->ei_res_id = res_id;
- LASSERT(lo->ldo_stripe[i]);
- rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
- policy);
+ LASSERT(lo->ldo_stripe[i] != NULL);
+ if (likely(dt_object_remote(lo->ldo_stripe[i]))) {
+ rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
+ einfo, policy);
+ } else {
+ struct ldlm_namespace *ns = einfo->ei_namespace;
+ ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
+ ldlm_completion_callback completion = einfo->ei_cb_cp;
+ __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+
+ if (einfo->ei_mode == LCK_PW ||
+ einfo->ei_mode == LCK_EX)
+ dlmflags |= LDLM_FL_COS_INCOMPAT;
+
+ /* This only happens if there are mulitple stripes
+ * on the master MDT, i.e. except stripe0, there are
+ * other stripes on the Master MDT as well, Only
+ * happens in the test case right now. */
+ LASSERT(ns != NULL);
+ rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
+ policy, einfo->ei_mode,
+ &dlmflags, blocking,
+ completion, NULL,
+ NULL, 0, LVB_T_NONE,
+ NULL, &lockh);
+ }
if (rc != 0)
GOTO(out, rc);
- slave_locks->lsl_handle[i] = lockh;
+ slave_locks->handles[i] = lockh;
}
einfo->ei_cbdata = slave_locks;
return lod_sub_object_write(env, dt_object_child(dt), buf, pos, th, iq);
}
+static int lod_declare_punch(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th)
+{
+ if (dt_object_remote(dt))
+ return -ENOTSUPP;
+
+ return lod_sub_object_declare_punch(env, dt_object_child(dt), start,
+ end, th);
+}
+
+static int lod_punch(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th)
+{
+ if (dt_object_remote(dt))
+ return -ENOTSUPP;
+
+ return lod_sub_object_punch(env, dt_object_child(dt), start, end, th);
+}
+
static const struct dt_body_operations lod_body_lnk_ops = {
.dbo_read = lod_read,
.dbo_declare_write = lod_declare_write,
.dbo_write = lod_write
};
+static const struct dt_body_operations lod_body_ops = {
+ .dbo_read = lod_read,
+ .dbo_declare_write = lod_declare_write,
+ .dbo_write = lod_write,
+ .dbo_declare_punch = lod_declare_punch,
+ .dbo_punch = lod_punch,
+};
+
/**
* Implementation of lu_object_operations::loo_object_init.
*
*/
static int lod_object_start(const struct lu_env *env, struct lu_object *o)
{
- if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
+ if (S_ISLNK(o->lo_header->loh_attr & S_IFMT)) {
lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
+ } else if (S_ISREG(o->lo_header->loh_attr & S_IFMT) ||
+ fid_is_local_file(lu_object_fid(o))) {
+ /* Note: some local file (like last rcvd) is created
+ * through bottom layer (OSD), so the object initialization
+ * comes to lod, it does not set loh_attr yet, so
+ * set do_body_ops for local file anyway */
+ lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_ops;
+ }
return 0;
}