* Copyright 2009 Sun Microsystems, Inc. All rights reserved
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
*/
/*
* lustre/lod/lod_object.c
* local OSD object interface to the MDD layer, and abstracts the
* addressing of local (OSD) and remote (OSP) objects. The API is
* described in the file lustre/include/dt_object.h and in
- * lustre/doc/osd-api.txt.
+ * Documentation/osd-api.txt.
*
* Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
*/
#include <obd.h>
#include <obd_class.h>
-#include <lustre_ver.h>
#include <obd_support.h>
-#include <lprocfs_status.h>
#include <lustre_fid.h>
-#include <lustre_param.h>
-#include <lustre_fid.h>
+#include <lustre_linkea.h>
#include <lustre_lmv.h>
+#include <lustre_param.h>
+#include <lustre_swab.h>
+#include <lustre_ver.h>
+#include <lprocfs_status.h>
#include <md_object.h>
-#include <lustre_linkea.h>
#include "lod_internal.h"
next = lo->ldo_stripe[it->lit_stripe_index];
LASSERT(next != NULL);
- LASSERT(next->do_index_ops != NULL);
-
rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
if (rc != 0)
RETURN(rc);
+ LASSERT(next->do_index_ops != NULL);
+
it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr);
if (!IS_ERR(it_next)) {
it->lit_it = it_next;
stripe_count = le32_to_cpu(lum->lum_stripe_count);
- /* shrink the stripe_count to the avaible MDT count */
- if (stripe_count > lod->lod_remote_mdt_count + 1 &&
- !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
- stripe_count = lod->lod_remote_mdt_count + 1;
-
OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
if (stripe == NULL)
RETURN(-ENOMEM);
idx, i, PFID(&fid));
idx_array[i] = idx;
/* Set the start index for next stripe allocation */
- if (i < stripe_count)
+ if (i < stripe_count - 1)
idx_array[i + 1] = (idx + 1) %
(lod->lod_remote_mdt_count + 1);
/* tgt_dt and fid must be ready after search avaible OSP
RETURN(rc);
}
-
/**
* Implementation of dt_object_operations::do_declare_xattr_set.
*
if (rc != 0)
RETURN(rc);
+ /* Note: Do not set LinkEA on sub-stripes, otherwise
+ * it will confuse the fid2path process(see mdt_path_current()).
+ * The linkEA between master and sub-stripes is set in
+ * lod_xattr_set_lmv(). */
+ if (strcmp(name, XATTR_NAME_LINK) == 0)
+ RETURN(0);
+
/* set xattr to each stripes, if needed */
rc = lod_load_striping(env, lo);
if (rc != 0)
RETURN(rc);
- /* Note: Do not set LinkEA on sub-stripes, otherwise
- * it will confuse the fid2path process(see mdt_path_current()).
- * The linkEA between master and sub-stripes is set in
- * lod_xattr_set_lmv(). */
- if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
+ if (lo->ldo_stripenr == 0)
RETURN(0);
for (i = 0; i < lo->ldo_stripenr; i++) {
}
/**
+ * Reset parent FID on OST object
+ *
+ * Replace parent FID with @dt object FID, which is only called during migration
+ * to reset the parent FID after the MDT object is migrated to the new MDT, i.e.
+ * the FID is changed.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt dt_object whose stripes's parent FID will be reset
+ * \parem[in] th thandle
+ * \param[in] declare if it is declare
+ *
+ * \retval 0 if reset succeeds
+ * \retval negative errno if reset fais
+ */
+static int lod_object_replace_parent_fid(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th, bool declare)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_buf *buf = &info->lti_buf;
+ struct filter_fid *ff;
+ int i, rc;
+ ENTRY;
+
+ LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr));
+
+ /* set xattr to each stripes, if needed */
+ rc = lod_load_striping(env, lo);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
+ if (info->lti_ea_store_size < sizeof(*ff)) {
+ rc = lod_ea_store_resize(info, sizeof(*ff));
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ buf->lb_buf = info->lti_ea_store;
+ buf->lb_len = info->lti_ea_store_size;
+
+ for (i = 0; i < lo->ldo_stripenr; i++) {
+ if (lo->ldo_stripe[i] == NULL)
+ continue;
+
+ rc = dt_xattr_get(env, lo->ldo_stripe[i], buf,
+ XATTR_NAME_FID);
+ if (rc < 0) {
+ rc = 0;
+ continue;
+ }
+
+ ff = buf->lb_buf;
+ fid_le_to_cpu(&ff->ff_parent, &ff->ff_parent);
+ ff->ff_parent.f_seq = lu_object_fid(&dt->do_lu)->f_seq;
+ ff->ff_parent.f_oid = lu_object_fid(&dt->do_lu)->f_oid;
+ fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent);
+
+ if (declare) {
+ rc = lod_sub_object_declare_xattr_set(env,
+ lo->ldo_stripe[i], buf,
+ XATTR_NAME_FID,
+ LU_XATTR_REPLACE, th);
+ } else {
+ rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i],
+ buf, XATTR_NAME_FID,
+ LU_XATTR_REPLACE, th);
+ }
+ if (rc < 0)
+ break;
+ }
+
+ RETURN(rc);
+}
+
+/**
* Implementation of dt_object_operations::do_declare_xattr_set.
*
* \see dt_object_operations::do_declare_xattr_set() in the API description
rc = lod_declare_striped_object(env, dt, attr, buf, th);
} else if (S_ISDIR(mode)) {
rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
+ } else if (strcmp(name, XATTR_NAME_FID) == 0) {
+ rc = lod_object_replace_parent_fid(env, dt, th, true);
} else {
rc = lod_sub_object_declare_xattr_set(env, next, buf, name,
fl, th);
rc = lod_striping_create(env, dt, NULL, NULL, th);
}
RETURN(rc);
+ } else if (strcmp(name, XATTR_NAME_FID) == 0) {
+ rc = lod_object_replace_parent_fid(env, dt, th, false);
+
+ RETURN(rc);
}
/* then all other xattr */
LASSERT(lc->ldo_stripenr == 0);
LASSERT(lc->ldo_stripe == NULL);
- /*
- * local object may want some hints
- * in case of late striping creation, ->ah_init()
- * can be called with local object existing
- */
- if (!dt_object_exists(nextc) || dt_object_remote(nextc)) {
- struct dt_object *obj;
-
- obj = (nextp != NULL && dt_object_remote(nextp)) ? NULL : nextp;
- nextc->do_ops->do_ah_init(env, ah, obj, nextc, child_mode);
- }
+ if (!dt_object_exists(nextc))
+ nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
if (S_ISDIR(child_mode)) {
if (lc->ldo_dir_stripe == NULL) {
rc = lod_verify_md_striping(d, lum1);
if (rc == 0 &&
le32_to_cpu(lum1->lum_stripe_count) > 1) {
- /* Directory will be striped only if
- * stripe_count > 1 */
lc->ldo_stripenr =
le32_to_cpu(lum1->lum_stripe_count);
lc->ldo_dir_stripe_offset =
lc->ldo_dir_stripe_offset = -1;
}
+ /* shrink the stripe_count to the avaible MDT count */
+ if (lc->ldo_stripenr > d->lod_remote_mdt_count + 1 &&
+ !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
+ lc->ldo_stripenr = d->lod_remote_mdt_count + 1;
+
+ /* Directory will be striped only if stripe_count > 1, if
+ * stripe_count == 1, let's reset stripenr = 0 to avoid
+ * create single master stripe and also help to unify the
+ * stripe handling of directories and files */
+ if (lc->ldo_stripenr == 1)
+ lc->ldo_stripenr = 0;
+
CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
LASSERT(lo->ldo_stripe_size > 0);
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
rc = dt_attr_get(env, next, attr);
LASSERT(attr->la_valid & LA_SIZE);
if (rc)
* Note: if dah_eadata != NULL, it means creating the
* striped directory with specified stripeEA, then it
* should ignore the default stripeEA */
- if ((hint == NULL || hint->dah_eadata == NULL) &&
- lo->ldo_dir_stripe_offset != -1 &&
- lo->ldo_dir_stripe_offset != ss->ss_node_id)
- GOTO(out, rc = -EREMOTE);
+ if (hint != NULL && hint->dah_eadata == NULL) {
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT))
+ GOTO(out, rc = -EREMOTE);
+
+ if (lo->ldo_dir_stripe_offset == -1) {
+ /* child and parent should be in the same MDT */
+ if (hint->dah_parent != NULL &&
+ dt_object_remote(hint->dah_parent))
+ GOTO(out, rc = -EREMOTE);
+ } else if (lo->ldo_dir_stripe_offset !=
+ ss->ss_node_id) {
+ struct lod_device *lod;
+ struct lod_tgt_descs *ltd;
+ struct lod_tgt_desc *tgt = NULL;
+ bool found_mdt = false;
+ int i;
+
+ lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ ltd = &lod->lod_mdt_descs;
+ cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+ tgt = LTD_TGT(ltd, i);
+ if (tgt->ltd_index ==
+ lo->ldo_dir_stripe_offset) {
+ found_mdt = true;
+ break;
+ }
+ }
+
+ /* If the MDT indicated by stripe_offset can be
+ * found, then tell client to resend the create
+ * request to the correct MDT, otherwise return
+ * error to client */
+ if (found_mdt)
+ GOTO(out, rc = -EREMOTE);
+ else
+ GOTO(out, rc = -EINVAL);
+ }
+ }
/* Orphan object (like migrating object) does not have
* lod_dir_stripe, see lod_ah_init */
return dt_object_sync(env, dt_object_child(dt), start, end);
}
-struct lod_slave_locks {
- int lsl_lock_count;
- struct lustre_handle lsl_handle[0];
-};
-
/**
* Release LDLM locks on the stripes of a striped directory.
*
static int lod_object_unlock_internal(const struct lu_env *env,
struct dt_object *dt,
struct ldlm_enqueue_info *einfo,
- ldlm_policy_data_t *policy)
+ union ldlm_policy_data *policy)
{
- struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
+ struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
int rc = 0;
int i;
ENTRY;
if (slave_locks == NULL)
RETURN(0);
- for (i = 1; i < slave_locks->lsl_lock_count; i++) {
- if (lustre_handle_is_used(&slave_locks->lsl_handle[i]))
- ldlm_lock_decref(&slave_locks->lsl_handle[i],
+ for (i = 1; i < slave_locks->count; i++) {
+ if (lustre_handle_is_used(&slave_locks->handles[i]))
+ ldlm_lock_decref(&slave_locks->handles[i],
einfo->ei_mode);
}
struct ldlm_enqueue_info *einfo,
union ldlm_policy_data *policy)
{
- struct lod_object *lo = lod_dt_obj(dt);
- struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
- int slave_locks_size;
- int rc;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
+ int slave_locks_size;
+ int i;
ENTRY;
if (slave_locks == NULL)
RETURN(0);
- if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
- RETURN(-ENOTDIR);
-
+ LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
+ LASSERT(lo->ldo_stripenr > 1);
/* Note: for remote lock for single stripe dir, MDT will cancel
* the lock by lockh directly */
- if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
- RETURN(0);
+ LASSERT(!dt_object_remote(dt_object_child(dt)));
- /* Only cancel slave lock for striped dir */
- rc = lod_object_unlock_internal(env, dt, einfo, policy);
+ /* locks were unlocked in MDT layer */
+ for (i = 1; i < slave_locks->count; i++)
+ LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
- slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
- sizeof(slave_locks->lsl_handle[0]);
+ slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
+ sizeof(slave_locks->handles[0]);
OBD_FREE(slave_locks, slave_locks_size);
einfo->ei_cbdata = NULL;
- RETURN(rc);
+ RETURN(0);
}
/**
int rc = 0;
int i;
int slave_locks_size;
- struct lod_slave_locks *slave_locks = NULL;
+ struct lustre_handle_array *slave_locks = NULL;
ENTRY;
/* remote object lock */
RETURN(0);
slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
- sizeof(slave_locks->lsl_handle[0]);
+ sizeof(slave_locks->handles[0]);
/* Freed in lod_object_unlock */
OBD_ALLOC(slave_locks, slave_locks_size);
if (slave_locks == NULL)
RETURN(-ENOMEM);
- slave_locks->lsl_lock_count = lo->ldo_stripenr;
+ slave_locks->count = lo->ldo_stripenr;
/* striped directory lock */
for (i = 1; i < lo->ldo_stripenr; i++) {
ldlm_completion_callback completion = einfo->ei_cb_cp;
__u64 dlmflags = LDLM_FL_ATOMIC_CB;
+ if (einfo->ei_mode == LCK_PW ||
+ einfo->ei_mode == LCK_EX)
+ dlmflags |= LDLM_FL_COS_INCOMPAT;
+
/* This only happens if there are mulitple stripes
* on the master MDT, i.e. except stripe0, there are
* other stripes on the Master MDT as well, Only
}
if (rc != 0)
GOTO(out, rc);
- slave_locks->lsl_handle[i] = lockh;
+ slave_locks->handles[i] = lockh;
}
einfo->ei_cbdata = slave_locks;