*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2015, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <obd_support.h>
#include <lustre_mds.h>
#include <lustre_fid.h>
+#include <lustre_lmv.h>
+#include <lustre_idmap.h>
#include "mdd_internal.h"
static const char dotdot[] = "..";
static struct lu_name lname_dotdot = {
- (char *) dotdot,
- sizeof(dotdot) - 1
+ .ln_name = (char *) dotdot,
+ .ln_namelen = sizeof(dotdot) - 1,
};
static inline int
const struct lu_attr *pattr, const struct lu_name *lname,
struct lu_fid* fid, int mask)
{
- const char *name = lname->ln_name;
- const struct dt_key *key = (const struct dt_key *)name;
- struct mdd_object *mdd_obj = md2mdd_obj(pobj);
- struct mdd_device *m = mdo2mdd(pobj);
- struct dt_object *dir = mdd_object_child(mdd_obj);
- int rc;
+ const char *name = lname->ln_name;
+ const struct dt_key *key = (const struct dt_key *)name;
+ struct mdd_object *mdd_obj = md2mdd_obj(pobj);
+ struct dt_object *dir = mdd_object_child(mdd_obj);
+ int rc;
+
ENTRY;
if (unlikely(mdd_is_dead_obj(mdd_obj)))
RETURN(-ESTALE);
- if (mdd_object_remote(mdd_obj)) {
- CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
- mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
- } else if (!mdd_object_exists(mdd_obj)) {
+ if (!mdd_object_exists(mdd_obj))
RETURN(-ESTALE);
+
+ if (mdd_object_remote(mdd_obj)) {
+ CDEBUG(D_INFO, "%s: Object "DFID" located on remote server\n",
+ mdd_obj_dev_name(mdd_obj),
+ PFID(mdd_object_fid(mdd_obj)));
}
rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
- MOR_TGT_PARENT);
+ DT_TGT_PARENT);
if (rc)
RETURN(rc);
RETURN(rc);
}
+/** Read the link EA into a temp buffer.
+ * Uses the mdd_thread_info::mti_big_buf since it is generally large.
+ * A pointer to the buffer is stored in \a ldata::ld_buf.
+ *
+ * \retval 0 or error
+ */
+static int __mdd_links_read(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ struct linkea_data *ldata)
+{
+ int rc;
+
+ if (!mdd_object_exists(mdd_obj))
+ return -ENODATA;
+
+ /* First try a small buf */
+ LASSERT(env != NULL);
+ ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
+ PAGE_SIZE);
+ if (ldata->ld_buf->lb_buf == NULL)
+ return -ENOMEM;
+
+ rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
+ if (rc == -ERANGE) {
+ /* Buf was too small, figure out what we need. */
+ lu_buf_free(ldata->ld_buf);
+ rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+ XATTR_NAME_LINK);
+ if (rc < 0)
+ return rc;
+ ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
+ if (ldata->ld_buf->lb_buf == NULL)
+ return -ENOMEM;
+ rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+ XATTR_NAME_LINK);
+ }
+ if (rc < 0) {
+ lu_buf_free(ldata->ld_buf);
+ ldata->ld_buf = NULL;
+ return rc;
+ }
+
+ return linkea_init(ldata);
+}
+
+static int mdd_links_read(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ struct linkea_data *ldata)
+{
+ int rc;
+
+ rc = __mdd_links_read(env, mdd_obj, ldata);
+ if (!rc)
+ rc = linkea_init(ldata);
+
+ return rc;
+}
+
+static int mdd_links_read_with_rec(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ struct linkea_data *ldata)
+{
+ int rc;
+
+ rc = __mdd_links_read(env, mdd_obj, ldata);
+ if (!rc)
+ rc = linkea_init_with_rec(ldata);
+
+ return rc;
+}
+
/**
* Get parent FID of the directory
*
ENTRY;
- LASSERT(S_ISDIR(mdd_object_type(obj)));
+ LASSERTF(S_ISDIR(mdd_object_type(obj)),
+ "%s: FID "DFID" is not a directory type = %o\n",
+ mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)),
+ mdd_object_type(obj));
buf = lu_buf_check_and_alloc(buf, PATH_MAX);
if (buf->lb_buf == NULL)
GOTO(lookup, rc = 0);
ldata.ld_buf = buf;
- rc = mdd_links_read(env, obj, &ldata);
+ rc = mdd_links_read_with_rec(env, obj, &ldata);
if (rc != 0)
GOTO(lookup, rc);
+ /* the obj is not locked, don't cache attributes */
+ mdd_invalidate(env, &obj->mod_obj);
+
LASSERT(ldata.ld_leh != NULL);
/* Directory should only have 1 parent */
if (ldata.ld_leh->leh_reccount > 1)
}
/*
- * return 1: if lf is the fid of the ancestor of p1;
+ * return 1: if \a tfid is the fid of the ancestor of \a mo;
* return 0: if not;
- *
- * return -EREMOTE: if remote object is found, in this
- * case fid of remote object is saved to @pf;
- *
* otherwise: values < 0, errors.
*/
static int mdd_is_parent(const struct lu_env *env,
struct mdd_device *mdd,
- struct mdd_object *p1,
+ struct mdd_object *mo,
const struct lu_attr *attr,
- const struct lu_fid *lf,
- struct lu_fid *pf)
+ const struct lu_fid *tfid)
{
- struct mdd_object *parent = NULL;
- struct lu_fid *pfid;
- int rc;
- ENTRY;
+ struct mdd_object *mp;
+ struct lu_fid *pfid;
+ int rc;
+
+ LASSERT(!lu_fid_eq(mdd_object_fid(mo), tfid));
+ pfid = &mdd_env_info(env)->mti_fid;
+
+ if (mdd_is_root(mdd, mdd_object_fid(mo)))
+ return 0;
+
+ if (mdd_is_root(mdd, tfid))
+ return 1;
+
+ rc = mdd_parent_fid(env, mo, attr, pfid);
+ if (rc)
+ return rc;
+
+ while (1) {
+ if (lu_fid_eq(pfid, tfid))
+ return 1;
+
+ if (mdd_is_root(mdd, pfid))
+ return 0;
- LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
- pfid = &mdd_env_info(env)->mti_fid;
+ mp = mdd_object_find(env, mdd, pfid);
+ if (IS_ERR(mp))
+ return PTR_ERR(mp);
- /* Check for root first. */
- if (mdd_is_root(mdd, mdo2fid(p1)))
- RETURN(0);
+ if (!mdd_object_exists(mp)) {
+ mdd_object_put(env, mp);
+ return -ENOENT;
+ }
- for(;;) {
- /* this is done recursively */
- rc = mdd_parent_fid(env, p1, attr, pfid);
+ rc = mdd_parent_fid(env, mp, attr, pfid);
+ mdd_object_put(env, mp);
if (rc)
- GOTO(out, rc);
- if (mdd_is_root(mdd, pfid))
- GOTO(out, rc = 0);
- if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
- GOTO(out, rc = 0);
- if (lu_fid_eq(pfid, lf))
- GOTO(out, rc = 1);
- if (parent != NULL)
- mdd_object_put(env, parent);
-
- parent = mdd_object_find(env, mdd, pfid);
- if (IS_ERR(parent))
- GOTO(out, rc = PTR_ERR(parent));
- p1 = parent;
- }
- EXIT;
-out:
- if (parent && !IS_ERR(parent))
- mdd_object_put(env, parent);
- return rc;
+ return rc;
+ }
+
+ return 0;
}
/*
*
* returns 1: if fid is ancestor of @mo;
* returns 0: if fid is not an ancestor of @mo;
- *
- * returns EREMOTE if remote object is found, fid of remote object is saved to
- * @fid;
- *
* returns < 0: if error
*/
int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
- const struct lu_fid *fid, struct lu_fid *sfid)
+ const struct lu_fid *fid)
{
struct mdd_device *mdd = mdo2mdd(mo);
struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
int rc;
ENTRY;
+ if (!mdd_object_exists(md2mdd_obj(mo)))
+ RETURN(-ENOENT);
+
if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
- RETURN(0);
+ RETURN(-ENOTDIR);
rc = mdd_la_get(env, md2mdd_obj(mo), attr);
if (rc != 0)
RETURN(rc);
- rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
- if (rc == 0) {
- /* found root */
- fid_zero(sfid);
- } else if (rc == 1) {
- /* found @fid is parent */
- *sfid = *fid;
- rc = 0;
- }
+ rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid);
RETURN(rc);
}
iops->put(env, it);
iops->fini(env, it);
- } else
+ } else {
result = PTR_ERR(it);
+ /* -ENODEV means no valid stripe */
+ if (result == -ENODEV)
+ RETURN(0);
+ }
RETURN(result);
}
const struct lu_attr *pattr, struct mdd_object *cobj,
bool check_perm)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct lu_buf *xbuf;
int rc = 0;
ENTRY;
if (mdd_is_dead_obj(pobj))
RETURN(-ENOENT);
- /* If the parent is a sub-stripe, check whether it is dead */
- xbuf = mdd_buf_get(env, info->mti_key, sizeof(info->mti_key));
- rc = mdo_xattr_get(env, pobj, xbuf, XATTR_NAME_LMV);
- if (unlikely(rc > 0)) {
- struct lmv_mds_md_v1 *lmv1 = xbuf->lb_buf;
-
- if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE &&
- le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_DEAD)
- RETURN(-ESTALE);
- }
- rc = 0;
-
if (check_perm)
rc = mdd_permission_internal_locked(env, pobj, pattr,
MAY_WRITE | MAY_EXEC,
- MOR_TGT_PARENT);
+ DT_TGT_PARENT);
RETURN(rc);
}
rc = mdd_permission_internal_locked(env, pobj, pattr,
MAY_WRITE | MAY_EXEC,
- MOR_TGT_PARENT);
+ DT_TGT_PARENT);
if (rc != 0)
RETURN(rc);
int rc;
rc = mdd_permission_internal_locked(env, pobj, pattr,
MAY_WRITE | MAY_EXEC,
- MOR_TGT_PARENT);
+ DT_TGT_PARENT);
if (rc)
RETURN(rc);
}
/* additional check the rename case */
if (cattr) {
if (S_ISDIR(cattr->la_mode)) {
- struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
-
if (!S_ISDIR(tattr->la_mode))
RETURN(-ENOTDIR);
- if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
+ if (mdd_is_root(mdo2mdd(&tobj->mod_obj),
+ mdd_object_fid(tobj)))
RETURN(-EBUSY);
} else if (S_ISDIR(tattr->la_mode))
RETURN(-EISDIR);
if (dt_try_as_dir(env, next)) {
struct dt_insert_rec *rec = &mdd_env_info(env)->mti_dt_rec;
- struct lu_ucred *uc = lu_ucred_check(env);
- int ignore_quota;
rec->rec_fid = lf;
rec->rec_type = type;
- ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
rc = dt_insert(env, next, (const struct dt_rec *)rec,
- (const struct dt_key *)name, handle,
- ignore_quota);
+ (const struct dt_key *)name, handle);
} else {
rc = -ENOTDIR;
}
rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
if (rc == 0 && S_ISDIR(type)) {
- mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+ mdd_write_lock(env, pobj, DT_TGT_PARENT);
mdo_ref_add(env, pobj, handle);
mdd_write_unlock(env, pobj);
}
const char *name, int is_dir,
struct thandle *handle)
{
- int rc;
- ENTRY;
+ int rc;
+ ENTRY;
rc = __mdd_index_delete_only(env, pobj, name, handle);
- if (rc == 0 && is_dir) {
- mdd_write_lock(env, pobj, MOR_TGT_PARENT);
- mdo_ref_del(env, pobj, handle);
- mdd_write_unlock(env, pobj);
- }
+ if (rc == 0 && is_dir) {
+ mdd_write_lock(env, pobj, DT_TGT_PARENT);
+ mdo_ref_del(env, pobj, handle);
+ mdd_write_unlock(env, pobj);
+ }
- RETURN(rc);
+ RETURN(rc);
}
static int mdd_llog_record_calc_size(const struct lu_env *env,
const struct lu_name *sname)
{
const struct lu_ucred *uc = lu_ucred(env);
- enum changelog_rec_flags crf = 0;
- size_t hdr_size = sizeof(struct llog_changelog_rec) -
- sizeof(struct changelog_rec);
+ enum changelog_rec_flags clf_flags = CLF_EXTRA_FLAGS;
+ enum changelog_rec_extra_flags crfe = CLFE_UIDGID | CLFE_NID;
if (sname != NULL)
- crf |= CLF_RENAME;
+ clf_flags |= CLF_RENAME;
if (uc != NULL && uc->uc_jobid[0] != '\0')
- crf |= CLF_JOBID;
+ clf_flags |= CLF_JOBID;
- return llog_data_len(hdr_size + changelog_rec_offset(crf) +
+ return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
+ changelog_rec_offset(clf_flags, crfe) +
(tname != NULL ? tname->ln_namelen : 0) +
(sname != NULL ? 1 + sname->ln_namelen : 0));
}
int mdd_declare_changelog_store(const struct lu_env *env,
struct mdd_device *mdd,
+ enum changelog_rec_type type,
const struct lu_name *tname,
const struct lu_name *sname,
struct thandle *handle)
int reclen;
int rc;
- /* Not recording */
- if (!(mdd->mdd_cl.mc_flags & CLM_ON))
+ if (!mdd_changelog_enabled(env, mdd, type))
return 0;
reclen = mdd_llog_record_calc_size(env, tname, sname);
return rc;
}
+int mdd_changelog_write_rec(const struct lu_env *env,
+ struct llog_handle *loghandle,
+ struct llog_rec_hdr *r,
+ struct llog_cookie *cookie,
+ int idx, struct thandle *th)
+{
+ int rc;
+
+ if (r->lrh_type == CHANGELOG_REC) {
+ struct mdd_device *mdd;
+ struct llog_changelog_rec *rec;
+
+ mdd = lu2mdd_dev(loghandle->lgh_ctxt->loc_obd->obd_lu_dev);
+ rec = container_of0(r, struct llog_changelog_rec, cr_hdr);
+
+ spin_lock(&mdd->mdd_cl.mc_lock);
+ rec->cr.cr_index = mdd->mdd_cl.mc_index + 1;
+ spin_unlock(&mdd->mdd_cl.mc_lock);
+
+ rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
+ cookie, idx, th);
+
+ /*
+ * if current llog is full, we will generate a new
+ * llog, and since it's actually not an error, let's
+ * avoid increasing index so that userspace apps
+ * should not see a gap in the changelog sequence
+ */
+ if (!(rc == -ENOSPC && llog_is_full(loghandle))) {
+ spin_lock(&mdd->mdd_cl.mc_lock);
+ ++mdd->mdd_cl.mc_index;
+ spin_unlock(&mdd->mdd_cl.mc_lock);
+ }
+ } else {
+ rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
+ cookie, idx, th);
+ }
+
+ return rc;
+}
+
/** Add a changelog entry \a rec to the changelog llog
* \param mdd
* \param rec
rec->cr_hdr.lrh_type = CHANGELOG_REC;
rec->cr.cr_time = cl_time();
- spin_lock(&mdd->mdd_cl.mc_lock);
- /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
- * but as long as the MDD transactions are ordered correctly for e.g.
- * rename conflicts, I don't think this should matter. */
- rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
- spin_unlock(&mdd->mdd_cl.mc_lock);
-
ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
if (ctxt == NULL)
return -ENXIO;
if (IS_ERR(llog_th))
GOTO(out_put, rc = PTR_ERR(llog_th));
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_CHANGELOG_REORDER, cfs_fail_val);
/* nested journal transaction */
rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
+ /* time to recover some space ?? */
+ if (likely(!mdd->mdd_changelog_gc ||
+ mdd->mdd_cl.mc_gc_task != MDD_CHLG_GC_NONE ||
+ mdd->mdd_changelog_min_gc_interval >=
+ ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time))
+ /* save a spin_lock trip */
+ goto out_put;
+ spin_lock(&mdd->mdd_cl.mc_lock);
+ if (likely(mdd->mdd_changelog_gc &&
+ mdd->mdd_cl.mc_gc_task == MDD_CHLG_GC_NONE &&
+ ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time >
+ mdd->mdd_changelog_min_gc_interval)) {
+ if (unlikely(llog_cat_free_space(ctxt->loc_handle) <=
+ mdd->mdd_changelog_min_free_cat_entries ||
+ OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD))) {
+ CWARN("%s:%s low on changelog_catalog free entries, "
+ "starting ChangeLog garbage collection thread\n",
+ obd->obd_name,
+ OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD) ?
+ " simulate" : "");
+
+ /* indicate further kthread run will occur outside
+ * right after current journal transaction filling has
+ * completed
+ */
+ mdd->mdd_cl.mc_gc_task = MDD_CHLG_GC_NEED;
+ }
+ /* next check in mdd_changelog_min_gc_interval anyway
+ */
+ mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
+ }
+ spin_unlock(&mdd->mdd_cl.mc_lock);
out_put:
llog_ctxt_put(ctxt);
if (rc > 0)
const struct lu_fid *spfid,
const struct lu_name *sname)
{
- struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
- size_t extsize = sname->ln_namelen + 1;
+ struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
+ size_t extsize;
LASSERT(sfid != NULL);
LASSERT(spfid != NULL);
LASSERT(sname != NULL);
+ extsize = sname->ln_namelen + 1;
+
rnm->cr_sfid = *sfid;
rnm->cr_spfid = *spfid;
void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
{
- struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
+ struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
if (jobid == NULL || jobid[0] == '\0')
return;
strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
}
+void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
+{
+ struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
+
+ ef->cr_extra_flags = eflags;
+}
+
+void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
+ __u64 uid, __u64 gid)
+{
+ struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
+
+ uidgid->cr_uid = uid;
+ uidgid->cr_gid = gid;
+}
+
+void mdd_changelog_rec_extra_nid(struct changelog_rec *rec,
+ lnet_nid_t nid)
+{
+ struct changelog_ext_nid *clnid = changelog_rec_nid(rec);
+
+ clnid->cr_nid = nid;
+}
+
+void mdd_changelog_rec_extra_omode(struct changelog_rec *rec, u32 flags)
+{
+ struct changelog_ext_openmode *omd = changelog_rec_openmode(rec);
+
+ omd->cr_openflags = flags;
+}
+
+void mdd_changelog_rec_extra_xattr(struct changelog_rec *rec,
+ const char *xattr_name)
+{
+ struct changelog_ext_xattr *xattr = changelog_rec_xattr(rec);
+
+ strlcpy(xattr->cr_xattr, xattr_name, sizeof(xattr->cr_xattr));
+}
+
/** Store a namespace change changelog record
* If this fails, we must fail the whole transaction; we don't
* want the change to commit without the log entry.
int mdd_changelog_ns_store(const struct lu_env *env,
struct mdd_device *mdd,
enum changelog_rec_type type,
- enum changelog_rec_flags crf,
+ enum changelog_rec_flags clf_flags,
struct mdd_object *target,
const struct lu_fid *tpfid,
const struct lu_fid *sfid,
struct llog_changelog_rec *rec;
struct lu_buf *buf;
int reclen;
+ __u64 xflags = CLFE_INVALID;
int rc;
ENTRY;
- /* Not recording */
- if (!(mdd->mdd_cl.mc_flags & CLM_ON))
- RETURN(0);
-
- if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
+ if (!mdd_changelog_enabled(env, mdd, type))
RETURN(0);
LASSERT(tpfid != NULL);
RETURN(-ENOMEM);
rec = buf->lb_buf;
- crf &= CLF_FLAGMASK;
+ clf_flags &= CLF_FLAGMASK;
+ clf_flags |= CLF_EXTRA_FLAGS;
- if (uc != NULL && uc->uc_jobid[0] != '\0')
- crf |= CLF_JOBID;
+ if (uc) {
+ if (uc->uc_jobid[0] != '\0')
+ clf_flags |= CLF_JOBID;
+ xflags |= CLFE_UIDGID;
+ xflags |= CLFE_NID;
+ }
if (sname != NULL)
- crf |= CLF_RENAME;
+ clf_flags |= CLF_RENAME;
else
- crf |= CLF_VERSION;
+ clf_flags |= CLF_VERSION;
+
+ rec->cr.cr_flags = clf_flags;
+
+ if (clf_flags & CLF_EXTRA_FLAGS) {
+ mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
+ if (xflags & CLFE_UIDGID)
+ mdd_changelog_rec_extra_uidgid(&rec->cr,
+ uc->uc_uid, uc->uc_gid);
+ if (xflags & CLFE_NID)
+ mdd_changelog_rec_extra_nid(&rec->cr, uc->uc_nid);
+ }
- rec->cr.cr_flags = crf;
rec->cr.cr_type = (__u32)type;
rec->cr.cr_pfid = *tpfid;
rec->cr.cr_namelen = tname->ln_namelen;
memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
- if (crf & CLF_RENAME)
+ if (clf_flags & CLF_RENAME)
mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
- if (crf & CLF_JOBID)
+ if (clf_flags & CLF_JOBID)
mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
if (likely(target != NULL)) {
- rec->cr.cr_tfid = *mdo2fid(target);
- target->mod_cltime = cfs_time_current_64();
+ rec->cr.cr_tfid = *mdd_object_fid(target);
+ target->mod_cltime = ktime_get();
} else {
fid_zero(&rec->cr.cr_tfid);
}
struct linkea_data *ldata)
{
int rc = 0;
- int rc2 = 0;
ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
- return 0;
+ RETURN(0);
LASSERT(oldpfid != NULL || newpfid != NULL);
- if (mdd_obj->mod_flags & DEAD_OBJ) {
- /* Prevent linkea to be updated which is NOT necessary. */
- ldata->ld_reclen = 0;
- /* No more links, don't bother */
+ if (mdd_obj->mod_flags & DEAD_OBJ)
+ /* Unnecessary to update linkEA for dead object. */
RETURN(0);
- }
if (oldpfid != NULL) {
rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
if (rc) {
- if ((check == 1) ||
- (rc != -ENODATA && rc != -ENOENT))
+ if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
RETURN(rc);
+
/* No changes done. */
rc = 0;
}
}
/* If renaming, add the new record */
- if (newpfid != NULL) {
- /* even if the add fails, we still delete the out-of-date
- * old link */
- rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
- first, check);
- }
-
- rc = rc != 0 ? rc : rc2;
+ if (newpfid != NULL)
+ rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
+ first, check);
RETURN(rc);
}
ldata = &mdd_env_info(env)->mti_link_data;
memset(ldata, 0, sizeof(*ldata));
rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
- newpfid, newlname, first, check,
- ldata);
- if (rc != 0)
+ newpfid, newlname, first, check, ldata);
+ if (rc)
GOTO(out, rc);
}
- if (ldata->ld_reclen != 0)
+ if (!(mdd_obj->mod_flags & DEAD_OBJ))
rc = mdd_links_write(env, mdd_obj, ldata, handle);
- EXIT;
+
+ GOTO(out, rc);
+
out:
if (rc != 0) {
- int error = 1;
- if (rc == -EOVERFLOW || rc == -ENOSPC)
- error = 0;
if (newlname == NULL)
- CDEBUG(error ? D_ERROR : D_OTHER,
- "link_ea add failed %d "DFID"\n",
+ CERROR("link_ea add failed %d "DFID"\n",
rc, PFID(mdd_object_fid(mdd_obj)));
else if (oldpfid == NULL)
- CDEBUG(error ? D_ERROR : D_OTHER,
- "link_ea add '%.*s' failed %d "DFID"\n",
- newlname->ln_namelen, newlname->ln_name,
- rc, PFID(mdd_object_fid(mdd_obj)));
+ CERROR("link_ea add '%.*s' failed %d "DFID"\n",
+ newlname->ln_namelen, newlname->ln_name, rc,
+ PFID(mdd_object_fid(mdd_obj)));
else if (newpfid == NULL)
- CDEBUG(error ? D_ERROR : D_OTHER,
- "link_ea del '%.*s' failed %d "DFID"\n",
- oldlname->ln_namelen, oldlname->ln_name,
- rc, PFID(mdd_object_fid(mdd_obj)));
+ CERROR("link_ea del '%.*s' failed %d "DFID"\n",
+ oldlname->ln_namelen, oldlname->ln_name, rc,
+ PFID(mdd_object_fid(mdd_obj)));
else
- CDEBUG(error ? D_ERROR : D_OTHER,
- "link_ea rename '%.*s'->'%.*s' failed %d "
- DFID"\n",
- oldlname->ln_namelen, oldlname->ln_name,
- newlname->ln_namelen, newlname->ln_name,
- rc, PFID(mdd_object_fid(mdd_obj)));
+ CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
+ "\n", oldlname->ln_namelen, oldlname->ln_name,
+ newlname->ln_namelen, newlname->ln_name, rc,
+ PFID(mdd_object_fid(mdd_obj)));
}
if (is_vmalloc_addr(ldata->ld_buf))
}
/** Read the link EA into a temp buffer.
- * Uses the mdd_thread_info::mti_big_buf since it is generally large.
- * A pointer to the buffer is stored in \a ldata::ld_buf.
- *
- * \retval 0 or error
- */
-int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct linkea_data *ldata)
-{
- int rc;
-
- if (!mdd_object_exists(mdd_obj))
- return -ENODATA;
-
- /* First try a small buf */
- LASSERT(env != NULL);
- ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
- PAGE_CACHE_SIZE);
- if (ldata->ld_buf->lb_buf == NULL)
- return -ENOMEM;
-
- rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
- if (rc == -ERANGE) {
- /* Buf was too small, figure out what we need. */
- lu_buf_free(ldata->ld_buf);
- rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
- XATTR_NAME_LINK);
- if (rc < 0)
- return rc;
- ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
- if (ldata->ld_buf->lb_buf == NULL)
- return -ENOMEM;
- rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
- XATTR_NAME_LINK);
- }
- if (rc < 0) {
- lu_buf_free(ldata->ld_buf);
- ldata->ld_buf = NULL;
- return rc;
- }
-
- return linkea_init(ldata);
-}
-
-/** Read the link EA into a temp buffer.
* Uses the name_buf since it is generally large.
* \retval IS_ERR err
* \retval ptr to \a lu_buf (always \a mti_big_buf)
int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
struct linkea_data *ldata, struct thandle *handle)
{
- const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
- ldata->ld_leh->leh_len);
+ const struct lu_buf *buf;
int rc;
+ if (ldata == NULL || ldata->ld_buf == NULL ||
+ ldata->ld_leh == NULL)
+ return 0;
+
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
return 0;
+again:
+ buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
+ ldata->ld_leh->leh_len);
rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
- if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
- mdd_object_remote(mdd_obj) == 0) {
- struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
- struct thandle *sub_th;
-
- /* XXX: If the linkEA is overflow, then we need to notify the
- * namespace LFSCK to skip "nlink" attribute verification
- * on this object to avoid the "nlink" to be shrinked by
- * wrong. It may be not good an interaction with LFSCK
- * like this. We will consider to replace it with other
- * mechanism in future. LU-5802. */
- lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
- LFSCK_TYPE_NAMESPACE);
-
- sub_th = thandle_get_sub_by_dt(env, handle,
- mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
- lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
- lr, sub_th);
+ if (unlikely(rc == -ENOSPC)) {
+ rc = linkea_overflow_shrink(ldata);
+ if (likely(rc > 0))
+ goto again;
}
return rc;
}
-int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct thandle *handle, struct linkea_data *ldata,
- enum mdd_links_add_overflow overflow)
+static int mdd_declare_links_add(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ struct thandle *handle,
+ struct linkea_data *ldata)
{
int rc;
int ea_len;
ea_len = ldata->ld_leh->leh_len;
linkea = ldata->ld_buf->lb_buf;
} else {
- ea_len = DEFAULT_LINKEA_SIZE;
+ ea_len = MAX_LINKEA_SIZE;
linkea = NULL;
}
- /* XXX: max size? */
rc = mdo_declare_xattr_set(env, mdd_obj,
mdd_buf_get_const(env, linkea, ea_len),
XATTR_NAME_LINK, 0, handle);
- if (rc != 0)
- return rc;
-
- if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
- struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
- struct thandle *sub_th;
-
- /* XXX: If the linkEA is overflow, then we need to notify the
- * namespace LFSCK to skip "nlink" attribute verification
- * on this object to avoid the "nlink" to be shrinked by
- * wrong. It may be not good an interaction with LFSCK
- * like this. We will consider to replace it with other
- * mechanism in future. LU-5802. */
- lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
- LFSCK_TYPE_NAMESPACE);
-
- sub_th = thandle_get_sub_by_dt(env, handle,
- mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
- rc = lfsck_in_notify(env,
- mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
- lr, sub_th);
- }
return rc;
}
/* For directory, the linkEA will be removed together
* with the object. */
if (!S_ISDIR(mdd_object_type(c)))
- rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
+ rc = mdd_declare_links_add(env, c, handle, NULL);
return rc;
}
struct lu_attr *la,
struct linkea_data *data)
{
+ struct lu_fid tfid = *mdd_object_fid(c);
int rc;
- rc = mdo_declare_index_insert(env, p, mdo2fid(c), mdd_object_type(c),
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
+ tfid.f_oid = cfs_fail_val;
+
+ rc = mdo_declare_index_insert(env, p, &tfid, mdd_object_type(c),
name->ln_name, handle);
if (rc != 0)
return rc;
if (rc != 0)
return rc;
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
- rc = mdo_declare_ref_add(env, c, handle);
- if (rc != 0)
- return rc;
- }
-
la->la_valid = LA_CTIME | LA_MTIME;
rc = mdo_declare_attr_set(env, p, la, handle);
if (rc != 0)
if (rc != 0)
return rc;
- rc = mdd_declare_links_add(env, c, handle, data,
- S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
+ rc = mdd_declare_links_add(env, c, handle, data);
if (rc != 0)
return rc;
- rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
+ rc = mdd_declare_changelog_store(env, mdd, CL_HARDLINK, name, NULL,
+ handle);
return rc;
}
struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
- struct mdd_device *mdd = mdo2mdd(src_obj);
- struct thandle *handle;
+ struct mdd_device *mdd = mdo2mdd(src_obj);
+ struct thandle *handle;
+ struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
int rc;
ENTRY;
if (rc != 0)
RETURN(rc);
+ /*
+ * If we are using project inheritance, we only allow hard link
+ * creation in our tree when the project IDs are the same;
+ * otherwise the tree quota mechanism could be circumvented.
+ */
+ if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
+ (tattr->la_projid != cattr->la_projid))
+ RETURN(-EXDEV);
+
handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
GOTO(out_pending, rc = PTR_ERR(handle));
LASSERT(ma->ma_attr.la_valid & LA_CTIME);
la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+ /* Note: even this function will change ldata, but it comes from
+ * thread_info, which is completely temporary and only seen in
+ * this function, so we do not need reset ldata once it fails.*/
+ rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
+ mdd_object_fid(mdd_tobj), lname, 0, 0, ldata);
+ if (rc != 0)
+ GOTO(stop, rc);
+
rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
la, ldata);
if (rc)
if (rc)
GOTO(stop, rc);
- mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
+ mdd_write_lock(env, mdd_sobj, DT_TGT_CHILD);
rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
cattr);
if (rc)
GOTO(out_unlock, rc);
}
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
- rc = mdo_ref_add(env, mdd_sobj, handle);
- if (rc != 0)
- GOTO(out_unlock, rc);
- }
-
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3)) {
- struct lu_fid tfid = *mdo2fid(mdd_sobj);
-
- tfid.f_oid++;
- rc = __mdd_index_insert_only(env, mdd_tobj, &tfid,
- mdd_object_type(mdd_sobj),
- name, handle);
- } else {
- rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
- mdd_object_type(mdd_sobj),
- name, handle);
- }
+ *tfid = *mdd_object_fid(mdd_sobj);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
+ tfid->f_oid = cfs_fail_val;
+ rc = __mdd_index_insert_only(env, mdd_tobj, tfid,
+ mdd_object_type(mdd_sobj), name, handle);
if (rc != 0) {
mdo_ref_del(env, mdd_sobj, handle);
GOTO(out_unlock, rc);
la->la_valid = LA_CTIME;
rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
- if (rc == 0) {
- rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
- mdo2fid(mdd_tobj), lname, 0, 0,
- ldata);
- if (rc == 0)
- mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
- lname, handle, ldata, 0);
- /* The failure of links_add should not cause the link
- * failure, reset rc here */
- rc = 0;
- }
- EXIT;
+ if (rc == 0)
+ /* Note: The failure of links_add should not cause the
+ * link failure, so do not check return value. */
+ mdd_links_add(env, mdd_sobj, mdd_object_fid(mdd_tobj),
+ lname, handle, ldata, 0);
+
+ EXIT;
out_unlock:
- mdd_write_unlock(env, mdd_sobj);
- if (rc == 0)
+ mdd_write_unlock(env, mdd_sobj);
+ if (rc == 0)
rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
- mdo2fid(mdd_tobj), NULL, NULL,
- lname, NULL, handle);
+ mdd_object_fid(mdd_tobj), NULL,
+ NULL, lname, NULL, handle);
stop:
- mdd_trans_stop(env, mdd, rc, handle);
-
+ rc = mdd_trans_stop(env, mdd, rc, handle);
if (is_vmalloc_addr(ldata->ld_buf))
/* if we vmalloced a large buffer drop it */
lu_buf_free(ldata->ld_buf);
out_pending:
- return rc;
+ return rc;
}
-static int mdd_mark_dead_object(const struct lu_env *env,
+static int mdd_mark_orphan_object(const struct lu_env *env,
struct mdd_object *obj, struct thandle *handle,
bool declare)
{
struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
int rc;
- if (!declare)
- obj->mod_flags |= DEAD_OBJ;
-
- if (!S_ISDIR(mdd_object_type(obj)))
- return 0;
-
attr->la_valid = LA_FLAGS;
- attr->la_flags = LUSTRE_SLAVE_DEAD_FL;
+ attr->la_flags = LUSTRE_ORPHAN_FL;
if (declare)
rc = mdo_declare_attr_set(env, obj, attr, handle);
struct mdd_object *obj,
struct thandle *handle)
{
- int rc;
+ int rc;
- rc = mdd_mark_dead_object(env, obj, handle, true);
+ /* Sigh, we do not know if the unlink object will become orphan in
+ * declare phase, but fortunately the flags here does not matter
+ * in current declare implementation */
+ rc = mdd_mark_orphan_object(env, obj, handle, true);
if (rc != 0)
return rc;
- rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
+ rc = mdo_declare_destroy(env, obj, handle);
if (rc != 0)
return rc;
- rc = mdo_declare_destroy(env, obj, handle);
+ rc = mdd_orphan_declare_insert(env, obj, mdd_object_type(obj), handle);
if (rc != 0)
return rc;
/* caller should take a lock before calling */
int mdd_finish_unlink(const struct lu_env *env,
struct mdd_object *obj, struct md_attr *ma,
- const struct mdd_object *pobj,
+ struct mdd_object *pobj,
const struct lu_name *lname,
struct thandle *th)
{
int rc = 0;
- int is_dir = S_ISDIR(ma->ma_attr.la_mode);
- ENTRY;
+ int is_dir = S_ISDIR(ma->ma_attr.la_mode);
+ ENTRY;
- LASSERT(mdd_write_locked(env, obj) != 0);
+ LASSERT(mdd_write_locked(env, obj) != 0);
if (ma->ma_attr.la_nlink == 0 || is_dir) {
- rc = mdd_mark_dead_object(env, obj, th, false);
- if (rc != 0)
- RETURN(rc);
-
- /* add new orphan and the object
- * will be deleted during mdd_close() */
- if (obj->mod_count) {
- rc = __mdd_orphan_add(env, obj, th);
- if (rc == 0)
- CDEBUG(D_HA, "Object "DFID" is inserted into "
- "orphan list, open count = %d\n",
- PFID(mdd_object_fid(obj)),
- obj->mod_count);
- else
- CERROR("Object "DFID" fail to be an orphan, "
- "open count = %d, maybe cause failed "
- "open replay\n",
- PFID(mdd_object_fid(obj)),
- obj->mod_count);
- } else {
+ /* add new orphan and the object
+ * will be deleted during mdd_close() */
+ obj->mod_flags |= DEAD_OBJ;
+ if (obj->mod_count) {
+ rc = mdd_orphan_insert(env, obj, th);
+ if (rc == 0)
+ CDEBUG(D_HA, "Object "DFID" is inserted into "
+ "orphan list, open count = %d\n",
+ PFID(mdd_object_fid(obj)),
+ obj->mod_count);
+ else
+ CERROR("Object "DFID" fail to be an orphan, "
+ "open count = %d, maybe cause failed "
+ "open replay\n",
+ PFID(mdd_object_fid(obj)),
+ obj->mod_count);
+
+ /* mark object as an orphan here, not
+ * before mdd_orphan_insert() as racing
+ * mdd_la_get() may propagate ORPHAN_OBJ
+ * causing the asserition */
+ rc = mdd_mark_orphan_object(env, obj, th, false);
+ } else {
rc = mdo_destroy(env, obj, th);
- }
+ }
} else if (!is_dir) {
/* old files may not have link ea; ignore errors */
- mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
+ mdd_links_del(env, obj, mdd_object_fid(pobj), lname, th);
}
RETURN(rc);
return rc;
/* FIXME: need changelog for remove entry */
- rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
+ rc = mdd_declare_changelog_store(env, mdd, CL_UNLINK, name,
+ NULL, handle);
}
return rc;
if (rc < 0)
RETURN(false);
- ma->ma_valid = MA_HSM;
+ ma->ma_valid |= MA_HSM;
}
if (ma->ma_hsm.mh_flags & HS_EXISTS)
RETURN(true);
struct mdd_object *mdd_cobj = NULL;
struct mdd_device *mdd = mdo2mdd(pobj);
struct thandle *handle;
- int rc, is_dir = 0;
+ int rc, is_dir = 0, cl_flags = 0;
ENTRY;
+ /* let shutdown to start */
+ CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 1);
+
/* cobj == NULL means only delete name entry */
if (likely(cobj != NULL)) {
mdd_cobj = md2mdd_obj(cobj);
RETURN(rc);
is_dir = S_ISDIR(cattr->la_mode);
+ /* search for an existing archive.
+ * we should check ahead as the object
+ * can be destroyed in this transaction */
+ if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
+ cl_flags |= CLF_UNLINK_HSM_EXISTS;
}
rc = mdd_unlink_sanity_check(env, mdd_pobj, pattr, mdd_cobj, cattr);
GOTO(stop, rc);
if (likely(mdd_cobj != NULL))
- mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
+ mdd_write_lock(env, mdd_cobj, DT_TGT_CHILD);
if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
rc = mdo_ref_del(env, mdd_cobj, handle);
if (rc != 0) {
__mdd_index_insert_only(env, mdd_pobj,
- mdo2fid(mdd_cobj),
+ mdd_object_fid(mdd_cobj),
mdd_object_type(mdd_cobj),
name, handle);
GOTO(cleanup, rc);
ma->ma_attr = *cattr;
ma->ma_valid |= MA_INODE;
rc = mdd_finish_unlink(env, mdd_cobj, ma, mdd_pobj, lname, handle);
+ if (rc != 0)
+ GOTO(cleanup, rc);
/* fetch updated nlink */
- if (rc == 0)
- rc = mdd_la_get(env, mdd_cobj, cattr);
+ rc = mdd_la_get(env, mdd_cobj, cattr);
+ /* if object is removed then we can't get its attrs,
+ * use last get */
+ if (rc == -ENOENT) {
+ cattr->la_nlink = 0;
+ rc = 0;
+ }
- /* if object is removed then we can't get its attrs, use last get */
if (cattr->la_nlink == 0) {
ma->ma_attr = *cattr;
ma->ma_valid |= MA_INODE;
}
+
EXIT;
cleanup:
if (likely(mdd_cobj != NULL))
mdd_write_unlock(env, mdd_cobj);
if (rc == 0) {
- int cl_flags = 0;
-
- if (cattr->la_nlink == 0) {
+ if (cattr->la_nlink == 0)
cl_flags |= CLF_UNLINK_LAST;
- /* search for an existing archive */
- if (mdd_hsm_archive_exists(env, mdd_cobj, ma))
- cl_flags |= CLF_UNLINK_HSM_EXISTS;
- }
+ else
+ cl_flags &= ~CLF_UNLINK_HSM_EXISTS;
rc = mdd_changelog_ns_store(env, mdd,
is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
- mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
- handle);
+ mdd_cobj, mdd_object_fid(mdd_pobj), NULL, NULL,
+ lname, NULL, handle);
}
stop:
- mdd_trans_stop(env, mdd, rc, handle);
+ rc = mdd_trans_stop(env, mdd, rc, handle);
return rc;
}
RETURN(0);
}
-static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
- struct md_object *cobj, const struct md_op_spec *spec,
- struct md_attr *ma)
+static int mdd_create_data(const struct lu_env *env,
+ struct md_object *pobj,
+ struct md_object *cobj,
+ const struct md_op_spec *spec,
+ struct md_attr *ma)
{
struct mdd_device *mdd = mdo2mdd(cobj);
struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
/* calling ->ah_make_hint() is used to transfer information from parent */
mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
- handle = mdd_trans_create(env, mdd);
- if (IS_ERR(handle))
- GOTO(out_free, rc = PTR_ERR(handle));
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ GOTO(out_free, rc = PTR_ERR(handle));
- /*
- * XXX: Setting the lov ea is not locked but setting the attr is locked?
- * Should this be fixed?
- */
- CDEBUG(D_OTHER, "ea %p/%u, cr_flags "LPO64", no_create %u\n",
+ /*
+ * XXX: Setting the lov ea is not locked but setting the attr is locked?
+ * Should this be fixed?
+ */
+ CDEBUG(D_OTHER, "ea %p/%u, cr_flags %#llo, no_create %u\n",
spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
spec->sp_cr_flags, spec->no_create);
if (rc)
GOTO(stop, rc);
- rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
+ rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
+ handle);
if (rc)
GOTO(stop, rc);
rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
stop:
- mdd_trans_stop(env, mdd, rc, handle);
+ rc = mdd_trans_stop(env, mdd, rc, handle);
+
out_free:
RETURN(rc);
}
static int mdd_declare_object_initialize(const struct lu_env *env,
struct mdd_object *parent,
struct mdd_object *child,
- struct lu_attr *attr,
+ const struct lu_attr *attr,
struct thandle *handle)
{
int rc;
ENTRY;
- /*
- * inode mode has been set in creation time, and it's based on umask,
- * la_mode and acl, don't set here again! (which will go wrong
- * because below function doesn't consider umask).
- * I'd suggest set all object attributes in creation time, see above.
- */
LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
- attr->la_valid &= ~(LA_MODE | LA_TYPE);
- rc = mdo_declare_attr_set(env, child, attr, handle);
- attr->la_valid |= LA_MODE | LA_TYPE;
- if (rc != 0 || !S_ISDIR(attr->la_mode))
- RETURN(rc);
+ if (!S_ISDIR(attr->la_mode))
+ RETURN(0);
- rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
- dot, handle);
+ rc = mdo_declare_index_insert(env, child, mdd_object_fid(child),
+ S_IFDIR, dot, handle);
if (rc != 0)
RETURN(rc);
if (rc != 0)
RETURN(rc);
- rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
- dotdot, handle);
+ rc = mdo_declare_index_insert(env, child, mdd_object_fid(parent),
+ S_IFDIR, dotdot, handle);
RETURN(rc);
}
static int mdd_object_initialize(const struct lu_env *env,
const struct lu_fid *pfid,
struct mdd_object *child,
- struct lu_attr *attr, struct thandle *handle,
- const struct md_op_spec *spec)
+ struct lu_attr *attr,
+ struct thandle *handle)
{
int rc = 0;
ENTRY;
if (S_ISDIR(attr->la_mode)) {
- /* Add "." and ".." for newly created dir */
- mdo_ref_add(env, child, handle);
- rc = __mdd_index_insert_only(env, child, mdo2fid(child),
+ /* Add "." and ".." for newly created dir */
+ mdo_ref_add(env, child, handle);
+ rc = __mdd_index_insert_only(env, child, mdd_object_fid(child),
S_IFDIR, dot, handle);
if (rc == 0)
rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
int rc;
ENTRY;
- /* EEXIST check */
- if (mdd_is_dead_obj(obj))
- RETURN(-ENOENT);
+ /* EEXIST check */
+ if (mdd_is_dead_obj(obj))
+ RETURN(-ENOENT);
/*
* In some cases this lookup is not needed - we know before if name
check_perm = false;
}
+ if (S_ISDIR(cattr->la_mode) &&
+ unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
+ spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
+ const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
+
+ if (!lmv_magic_supported(le32_to_cpu(lum->lum_magic)) &&
+ le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
+ rc = -EINVAL;
+ CERROR("%s: invalid lmv_user_md: magic = %x, "
+ "stripe_offset = %d, stripe_count = %u: "
+ "rc = %d\n", mdd2obd_dev(m)->obd_name,
+ le32_to_cpu(lum->lum_magic),
+ (int)le32_to_cpu(lum->lum_stripe_offset),
+ le32_to_cpu(lum->lum_stripe_count), rc);
+ return rc;
+ }
+ }
+
rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
if (rc != 0)
RETURN(rc);
- /* sgid check */
+ /* sgid check */
if (pattr->la_mode & S_ISGID) {
+ struct lu_ucred *uc = lu_ucred(env);
+
cattr->la_gid = pattr->la_gid;
+
+ /* Directories are special, and always inherit S_ISGID */
if (S_ISDIR(cattr->la_mode)) {
cattr->la_mode |= S_ISGID;
cattr->la_valid |= LA_MODE;
+ } else if ((cattr->la_mode & (S_ISGID | S_IXGRP))
+ == (S_ISGID | S_IXGRP) &&
+ !lustre_in_group_p(uc,
+ (cattr->la_valid & LA_GID) ?
+ cattr->la_gid : pattr->la_gid) &&
+ !md_capable(uc, CFS_CAP_FSETID)) {
+ cattr->la_mode &= ~S_ISGID;
+ cattr->la_valid |= LA_MODE;
+ }
+ }
+
+ /* Inherit project ID from parent directory */
+ if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
+ cattr->la_projid = pattr->la_projid;
+ if (S_ISDIR(cattr->la_mode)) {
+ cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
+ cattr->la_valid |= LA_FLAGS;
}
+ cattr->la_valid |= LA_PROJID;
}
rc = mdd_name_check(m, lname);
RETURN(rc);
switch (cattr->la_mode & S_IFMT) {
- case S_IFLNK: {
- unsigned int symlen = strlen(spec->u.sp_symname) + 1;
+ case S_IFLNK: {
+ unsigned int symlen = strlen(spec->u.sp_symname) + 1;
- if (symlen > (1 << m->mdd_dt_conf.ddp_block_shift))
- RETURN(-ENAMETOOLONG);
- else
- RETURN(0);
- }
+ if (symlen > m->mdd_dt_conf.ddp_symlink_max)
+ RETURN(-ENAMETOOLONG);
+ else
+ RETURN(0);
+ }
case S_IFDIR:
case S_IFREG:
case S_IFCHR:
RETURN(rc);
}
-static int mdd_declare_object_create(const struct lu_env *env,
+static int mdd_declare_create_object(const struct lu_env *env,
struct mdd_device *mdd,
struct mdd_object *p, struct mdd_object *c,
struct lu_attr *attr,
const struct md_op_spec *spec,
struct lu_buf *def_acl_buf,
struct lu_buf *acl_buf,
+ struct lu_buf *hsm_buf,
struct dt_allocation_hint *hint)
{
+ const struct lu_buf *buf;
int rc;
- rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec,
+ rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
hint);
- if (rc)
- GOTO(out, rc);
+ if (rc)
+ GOTO(out, rc);
-#ifdef CONFIG_FS_POSIX_ACL
- if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
+ if (def_acl_buf && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
/* if dir, then can inherit default ACl */
rc = mdo_declare_xattr_set(env, c, def_acl_buf,
XATTR_NAME_ACL_DEFAULT,
GOTO(out, rc);
}
- if (acl_buf->lb_len > 0) {
+ if (acl_buf && acl_buf->lb_len > 0) {
rc = mdo_declare_attr_set(env, c, attr, handle);
if (rc)
GOTO(out, rc);
GOTO(out, rc);
/* replay case, create LOV EA from client data */
- if (spec->no_create ||
+ if ((!(spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) && spec->no_create) ||
(spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
- const struct lu_buf *buf;
-
buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
spec->u.sp_ea.eadatalen);
- rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
- handle);
+ rc = mdo_declare_xattr_set(env, c, buf,
+ S_ISDIR(attr->la_mode) ?
+ XATTR_NAME_LMV : XATTR_NAME_LOV,
+ 0, handle);
if (rc)
GOTO(out, rc);
+
+ if (spec->sp_cr_flags & MDS_OPEN_PCC) {
+ rc = mdo_declare_xattr_set(env, c, hsm_buf,
+ XATTR_NAME_HSM,
+ 0, handle);
+ if (rc)
+ GOTO(out, rc);
+ }
}
if (S_ISLNK(attr->la_mode)) {
if (rc)
GOTO(out, rc);
}
+
+ if (spec->sp_cr_file_secctx_name != NULL) {
+ buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
+ spec->sp_cr_file_secctx_size);
+ rc = mdo_declare_xattr_set(env, c, buf,
+ spec->sp_cr_file_secctx_name, 0,
+ handle);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
out:
return rc;
}
struct linkea_data *ldata,
struct lu_buf *def_acl_buf,
struct lu_buf *acl_buf,
+ struct lu_buf *hsm_buf,
struct dt_allocation_hint *hint)
{
int rc;
- rc = mdd_declare_object_create(env, mdd, p, c, attr, handle, spec,
- def_acl_buf, acl_buf, hint);
+ rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
+ def_acl_buf, acl_buf, hsm_buf, hint);
if (rc)
GOTO(out, rc);
}
if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
- rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
+ rc = mdd_orphan_declare_insert(env, c, attr->la_mode, handle);
if (rc)
GOTO(out, rc);
} else {
- struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
+ struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
+ enum changelog_rec_type type;
- rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
- name->ln_name, handle);
+ rc = mdo_declare_index_insert(env, p, mdd_object_fid(c),
+ attr->la_mode, name->ln_name,
+ handle);
if (rc != 0)
return rc;
- rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
+ rc = mdd_declare_links_add(env, c, handle, ldata);
if (rc)
return rc;
if (rc)
return rc;
- rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
+ type = S_ISDIR(attr->la_mode) ? CL_MKDIR :
+ S_ISREG(attr->la_mode) ? CL_CREATE :
+ S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD;
+
+ rc = mdd_declare_changelog_store(env, mdd, type, name, NULL,
+ handle);
if (rc)
return rc;
}
RETURN(0);
}
- mdd_read_lock(env, pobj, MOR_TGT_PARENT);
+ mdd_read_lock(env, pobj, DT_TGT_PARENT);
rc = mdo_xattr_get(env, pobj, def_acl_buf,
XATTR_NAME_ACL_DEFAULT);
mdd_read_unlock(env, pobj);
/**
* Create a metadata object and initialize it, set acl, xattr.
**/
-static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
+static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
struct mdd_object *son, struct lu_attr *attr,
struct md_op_spec *spec, struct lu_buf *acl_buf,
struct lu_buf *def_acl_buf,
+ struct lu_buf *hsm_buf,
struct dt_allocation_hint *hint,
- struct thandle *handle)
+ struct thandle *handle, bool initsecctx)
{
- int rc;
+ const struct lu_buf *buf;
+ int rc;
- mdd_write_lock(env, son, MOR_TGT_CHILD);
- rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec,
+ mdd_write_lock(env, son, DT_TGT_CHILD);
+ rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
hint);
if (rc)
GOTO(unlock, rc);
* created in declare phase, they also needs to be added to master
* object as sub-directory entry. So it has to initialize the master
* object, then set dir striped EA.(in mdo_xattr_set) */
- rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
- spec);
+ rc = mdd_object_initialize(env, mdd_object_fid(pobj), son, attr,
+ handle);
if (rc != 0)
GOTO(err_destroy, rc);
if (spec->no_create ||
(S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_HAS_EA) ||
S_ISDIR(attr->la_mode)) {
- const struct lu_buf *buf;
-
buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
spec->u.sp_ea.eadatalen);
rc = mdo_xattr_set(env, son, buf,
S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
- XATTR_NAME_LOV, 0,
- handle);
+ XATTR_NAME_LOV,
+ 0, handle);
+ if (rc != 0)
+ GOTO(err_destroy, rc);
+ }
+
+ if (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_PCC) {
+ struct md_hsm mh;
+
+ memset(&mh, 0, sizeof(mh));
+ mh.mh_flags = HS_EXISTS | HS_ARCHIVED | HS_RELEASED;
+ mh.mh_arch_id = spec->sp_archive_id;
+ lustre_hsm2buf(hsm_buf->lb_buf, &mh);
+ rc = mdo_xattr_set(env, son, hsm_buf, XATTR_NAME_HSM,
+ 0, handle);
if (rc != 0)
GOTO(err_destroy, rc);
}
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
S_ISDIR(attr->la_mode)) {
/* set default acl */
#endif
if (S_ISLNK(attr->la_mode)) {
- struct lu_ucred *uc = lu_ucred_assert(env);
struct dt_object *dt = mdd_object_child(son);
const char *target_name = spec->u.sp_symname;
int sym_len = strlen(target_name);
- const struct lu_buf *buf;
loff_t pos = 0;
buf = mdd_buf_get_const(env, target_name, sym_len);
- rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
- uc->uc_cap &
- CFS_CAP_SYS_RESOURCE_MASK);
-
+ rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle);
if (rc == sym_len)
rc = 0;
else
GOTO(err_initlized, rc = -EFAULT);
}
+ if (initsecctx && spec->sp_cr_file_secctx_name != NULL) {
+ buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
+ spec->sp_cr_file_secctx_size);
+ rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
+ 0, handle);
+ if (rc < 0)
+ GOTO(err_initlized, rc);
+ }
+
err_initlized:
if (unlikely(rc != 0)) {
int rc2;
if (rc)
GOTO(stop, rc);
stop:
- mdd_trans_stop(env, mdd, rc, handle);
+ rc = mdd_trans_stop(env, mdd, rc, handle);
+
RETURN(rc);
}
-/*
+/**
* Create object and insert it into namespace.
+ *
+ * Two operations have to be performed:
+ *
+ * - an allocation of a new object (->do_create()), and
+ * - an insertion into a parent index (->dio_insert()).
+ *
+ * Due to locking, operation order is not important, when both are
+ * successful, *but* error handling cases are quite different:
+ *
+ * - if insertion is done first, and following object creation fails,
+ * insertion has to be rolled back, but this operation might fail
+ * also leaving us with dangling index entry.
+ *
+ * - if creation is done first, is has to be undone if insertion fails,
+ * leaving us with leaked space, which is not good but not fatal.
+ *
+ * It seems that creation-first is simplest solution, but it is sub-optimal
+ * in the frequent
+ *
+ * $ mkdir foo
+ * $ mkdir foo
+ *
+ * case, because second mkdir is bound to create object, only to
+ * destroy it immediately.
+ *
+ * To avoid this follow local file systems that do double lookup:
+ *
+ * 0. lookup -> -EEXIST (mdd_create_sanity_check())
+ * 1. create (mdd_create_object_internal())
+ * 2. insert (__mdd_index_insert(), lookup again)
+ *
+ * \param[in] pobj parent object
+ * \param[in] lname name of child being created
+ * \param[in,out] child child object being created
+ * \param[in] spec additional create parameters
+ * \param[in] ma attributes for new child object
+ *
+ * \retval 0 on success
+ * \retval negative errno on failure
*/
-static int mdd_create(const struct lu_env *env, struct md_object *pobj,
+int mdd_create(const struct lu_env *env, struct md_object *pobj,
const struct lu_name *lname, struct md_object *child,
- struct md_op_spec *spec, struct md_attr* ma)
+ struct md_op_spec *spec, struct md_attr *ma)
{
struct mdd_thread_info *info = mdd_env_info(env);
struct lu_attr *la = &info->mti_la_for_fix;
struct lu_attr *pattr = &info->mti_pattr;
struct lu_buf acl_buf;
struct lu_buf def_acl_buf;
+ struct lu_buf hsm_buf;
struct linkea_data *ldata = &info->mti_link_data;
const char *name = lname->ln_name;
struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
int rc2;
ENTRY;
- /*
- * Two operations have to be performed:
- *
- * - an allocation of a new object (->do_create()), and
- *
- * - an insertion into a parent index (->dio_insert()).
- *
- * Due to locking, operation order is not important, when both are
- * successful, *but* error handling cases are quite different:
- *
- * - if insertion is done first, and following object creation fails,
- * insertion has to be rolled back, but this operation might fail
- * also leaving us with dangling index entry.
- *
- * - if creation is done first, is has to be undone if insertion
- * fails, leaving us with leaked space, which is neither good, nor
- * fatal.
- *
- * It seems that creation-first is simplest solution, but it is
- * sub-optimal in the frequent
- *
- * $ mkdir foo
- * $ mkdir foo
- *
- * case, because second mkdir is bound to create object, only to
- * destroy it immediately.
- *
- * To avoid this follow local file systems that do double lookup:
- *
- * 0. lookup -> -EEXIST (mdd_create_sanity_check())
- *
- * 1. create (mdd_object_create_internal())
- *
- * 2. insert (__mdd_index_insert(), lookup again)
- */
-
rc = mdd_la_get(env, mdd_pobj, pattr);
if (rc != 0)
RETURN(rc);
if (rc)
RETURN(rc);
- if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
GOTO(out_free, rc = -EINPROGRESS);
handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
GOTO(out_free, rc = PTR_ERR(handle));
- acl_buf.lb_buf = info->mti_xattr_buf;
- acl_buf.lb_len = sizeof(info->mti_xattr_buf);
+ lu_buf_check_and_alloc(&info->mti_xattr_buf,
+ min_t(unsigned int, mdd->mdd_dt_conf.ddp_max_ea_size,
+ XATTR_SIZE_MAX));
+ acl_buf = info->mti_xattr_buf;
def_acl_buf.lb_buf = info->mti_key;
def_acl_buf.lb_len = sizeof(info->mti_key);
rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
if (rc < 0)
GOTO(out_stop, rc);
+ if (S_ISDIR(attr->la_mode)) {
+ struct lmv_user_md *lmu = spec->u.sp_ea.eadata;
+
+ /*
+ * migrate may create 1-stripe directory, so lod_ah_init()
+ * doesn't adjust stripe count from lmu.
+ */
+ if (lmu && lmu->lum_stripe_count == cpu_to_le32(1)) {
+ info->mti_lmu = *lmu;
+ info->mti_lmu.lum_stripe_count = 0;
+ spec->u.sp_ea.eadata = &info->mti_lmu;
+ }
+ }
+
mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
memset(ldata, 0, sizeof(*ldata));
lname, 1, 0, ldata);
}
+ if (spec->sp_cr_flags & MDS_OPEN_PCC) {
+ LASSERT(spec->sp_cr_flags & MDS_OPEN_HAS_EA);
+
+ memset(&hsm_buf, 0, sizeof(hsm_buf));
+ lu_buf_alloc(&hsm_buf, sizeof(struct hsm_attrs));
+ if (hsm_buf.lb_buf == NULL)
+ GOTO(out_stop, rc = -ENOMEM);
+ }
+
rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
handle, spec, ldata, &def_acl_buf, &acl_buf,
- hint);
- if (rc)
- GOTO(out_stop, rc);
+ &hsm_buf, hint);
+ if (rc)
+ GOTO(out_stop, rc);
- rc = mdd_trans_start(env, mdd, handle);
- if (rc)
- GOTO(out_stop, rc);
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(out_stop, rc);
- rc = mdd_object_create(env, mdd_pobj, son, attr, spec, &acl_buf,
- &def_acl_buf, hint, handle);
+ rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
+ &def_acl_buf, &hsm_buf, hint, handle, true);
if (rc != 0)
GOTO(out_stop, rc);
if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
- mdd_write_lock(env, son, MOR_TGT_CHILD);
- rc = __mdd_orphan_add(env, son, handle);
+ mdd_write_lock(env, son, DT_TGT_CHILD);
+ son->mod_flags |= VOLATILE_OBJ;
+ rc = mdd_orphan_insert(env, son, handle);
GOTO(out_volatile, rc);
} else {
- rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
+ rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(son),
attr->la_mode, name, handle);
if (rc != 0)
GOTO(err_created, rc);
- mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
- ldata, 1);
+ mdd_links_add(env, son, mdd_object_fid(mdd_pobj), lname,
+ handle, ldata, 1);
/* update parent directory mtime/ctime */
*la = *attr;
EXIT;
err_insert:
if (rc != 0) {
- int rc2;
-
if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
- rc2 = __mdd_orphan_del(env, son, handle);
+ rc2 = mdd_orphan_delete(env, son, handle);
else
rc2 = __mdd_index_delete(env, mdd_pobj, name,
S_ISDIR(attr->la_mode),
goto out_stop;
err_created:
- mdd_write_lock(env, son, MOR_TGT_CHILD);
+ mdd_write_lock(env, son, DT_TGT_CHILD);
if (S_ISDIR(attr->la_mode)) {
/* Drop the reference, no need to delete "."/"..",
* because the object is to be destroyed directly. */
mdd_write_unlock(env, son);
}
- if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
+ if (rc == 0 && fid_is_namespace_visible(mdd_object_fid(son)) &&
likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
rc = mdd_changelog_ns_store(env, mdd,
S_ISDIR(attr->la_mode) ? CL_MKDIR :
S_ISREG(attr->la_mode) ? CL_CREATE :
S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
- 0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
- NULL, handle);
+ 0, son, mdd_object_fid(mdd_pobj), NULL, NULL,
+ lname, NULL, handle);
out_stop:
rc2 = mdd_trans_stop(env, mdd, rc, handle);
if (rc == 0) {
/* if we vmalloced a large buffer drop it */
lu_buf_free(ldata->ld_buf);
+ if (spec->sp_cr_flags & MDS_OPEN_PCC)
+ lu_buf_free(&hsm_buf);
+
/* The child object shouldn't be cached anymore */
if (rc)
set_bit(LU_OBJECT_HEARD_BANSHEE,
};
static int mdd_rename_order(const struct lu_env *env,
- struct mdd_device *mdd,
- struct mdd_object *src_pobj,
+ struct mdd_device *mdd,
+ struct mdd_object *src_pobj,
const struct lu_attr *pattr,
- struct mdd_object *tgt_pobj)
+ struct mdd_object *tgt_pobj)
{
- /* order of locking, 1 - tgt-src, 0 - src-tgt*/
- int rc;
- ENTRY;
+ /* order of locking, 1 - tgt-src, 0 - src-tgt*/
+ int rc;
- if (src_pobj == tgt_pobj)
- RETURN(MDD_RN_SAME);
-
- /* compared the parent child relationship of src_p&tgt_p */
- if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
- rc = MDD_RN_SRCTGT;
- } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
- rc = MDD_RN_TGTSRC;
- } else {
- rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
- NULL);
- if (rc == -EREMOTE)
- rc = 0;
-
- if (rc == 1)
- rc = MDD_RN_TGTSRC;
- else if (rc == 0)
- rc = MDD_RN_SRCTGT;
- }
+ ENTRY;
+ if (src_pobj == tgt_pobj)
+ RETURN(MDD_RN_SAME);
+
+ /* compared the parent child relationship of src_p & tgt_p */
+ if (lu_fid_eq(&mdd->mdd_root_fid, mdd_object_fid(src_pobj))) {
+ rc = MDD_RN_SRCTGT;
+ } else if (lu_fid_eq(&mdd->mdd_root_fid, mdd_object_fid(tgt_pobj))) {
+ rc = MDD_RN_TGTSRC;
+ } else {
+ rc = mdd_is_parent(env, mdd, src_pobj, pattr,
+ mdd_object_fid(tgt_pobj));
+ if (rc == -EREMOTE)
+ rc = 0;
- RETURN(rc);
+ if (rc == 1)
+ rc = MDD_RN_TGTSRC;
+ else if (rc == 0)
+ rc = MDD_RN_SRCTGT;
+ }
+
+ RETURN(rc);
}
/* has not mdd_write{read}_lock on any obj yet. */
* before mdd_rename and enable MDS_PERM_BYPASS. */
LASSERT(sobj);
+ /*
+ * If we are using project inheritance, we only allow renames
+ * into our tree when the project IDs are the same; otherwise
+ * tree quota mechanism would be circumvented.
+ */
+ if (((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
+ tpattr->la_projid != cattr->la_projid) ||
+ ((pattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
+ (pattr->la_projid != tpattr->la_projid)))
+ RETURN(-EXDEV);
+
rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
if (rc)
RETURN(rc);
struct mdd_object *mdd_tpobj,
struct mdd_object *mdd_sobj,
struct mdd_object *mdd_tobj,
- const struct lu_name *tname,
const struct lu_name *sname,
+ const struct lu_name *tname,
struct md_attr *ma,
struct linkea_data *ldata,
struct thandle *handle)
{
- struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
+ struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
int rc;
LASSERT(ma->ma_attr.la_valid & LA_CTIME);
la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
- LASSERT(mdd_spobj);
- LASSERT(mdd_tpobj);
- LASSERT(mdd_sobj);
+ LASSERT(mdd_spobj);
+ LASSERT(mdd_tpobj);
+ LASSERT(mdd_sobj);
- /* name from source dir */
- rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
- if (rc)
- return rc;
+ /* name from source dir */
+ rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
+ if (rc)
+ return rc;
- /* .. from source child */
- if (S_ISDIR(mdd_object_type(mdd_sobj))) {
- /* source child can be directory,
- * counted by source dir's nlink */
- rc = mdo_declare_ref_del(env, mdd_spobj, handle);
- if (rc)
- return rc;
+ /* .. from source child */
+ if (S_ISDIR(mdd_object_type(mdd_sobj))) {
+ /* source child can be directory, count by source dir's nlink */
+ rc = mdo_declare_ref_del(env, mdd_spobj, handle);
+ if (rc)
+ return rc;
if (mdd_spobj != mdd_tpobj) {
rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
handle);
return rc;
rc = mdo_declare_index_insert(env, mdd_sobj,
- mdo2fid(mdd_tpobj),
+ mdd_object_fid(mdd_tpobj),
S_IFDIR, dotdot, handle);
if (rc != 0)
return rc;
if (rc)
return rc;
- rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
- S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
+ rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
if (rc)
return rc;
/* new name */
- rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
+ rc = mdo_declare_index_insert(env, mdd_tpobj, mdd_object_fid(mdd_sobj),
mdd_object_type(mdd_sobj),
tname->ln_name, handle);
if (rc != 0)
return rc;
}
- rc = mdd_declare_changelog_store(env, mdd, tname, sname, handle);
+ rc = mdd_declare_changelog_store(env, mdd, CL_RENAME, tname, sname,
+ handle);
if (rc)
return rc;
struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
struct thandle *handle;
struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
- const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
- const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
+ const struct lu_fid *tpobj_fid = mdd_object_fid(mdd_tpobj);
+ const struct lu_fid *spobj_fid = mdd_object_fid(mdd_spobj);
bool is_dir;
bool tobj_ref = 0;
bool tobj_locked = 0;
int rc, rc2;
ENTRY;
+ /* let unlink to complete and commit */
+ CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 2 + cfs_fail_val);
+
if (tobj)
mdd_tobj = md2mdd_obj(tobj);
rc = mdd_la_get(env, mdd_tobj, tattr);
if (rc)
GOTO(out_pending, rc);
+ /* search for an existing archive.
+ * we should check ahead as the object
+ * can be destroyed in this transaction */
+ if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
+ cl_flags |= CLF_RENAME_LAST_EXISTS;
}
rc = mdd_la_get(env, mdd_tpobj, tpattr);
GOTO(out_pending, rc = PTR_ERR(handle));
memset(ldata, 0, sizeof(*ldata));
- mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj), lsname,
- mdd_object_fid(mdd_tpobj), ltname, 1, 0, ldata);
+ rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
+ lsname, mdd_object_fid(mdd_tpobj), ltname,
+ 1, 0, ldata);
+ if (rc)
+ GOTO(stop, rc);
+
rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
mdd_tobj, lsname, ltname, ma, ldata, handle);
if (rc)
GOTO(fixup_tpobj, rc);
/* Update the linkEA for the source object */
- mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
- rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
- mdo2fid(mdd_tpobj), ltname, handle, ldata,
- 0, 0);
+ mdd_write_lock(env, mdd_sobj, DT_SRC_CHILD);
+ rc = mdd_links_rename(env, mdd_sobj, mdd_object_fid(mdd_spobj),
+ lsname, mdd_object_fid(mdd_tpobj), ltname,
+ handle, ldata, 0, 0);
if (rc == -ENOENT)
/* Old files might not have EA entry */
- mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
+ mdd_links_add(env, mdd_sobj, mdd_object_fid(mdd_spobj),
lsname, handle, NULL, 0);
mdd_write_unlock(env, mdd_sobj);
/* We don't fail the transaction if the link ea can't be
* it must be local one.
*/
if (tobj && mdd_object_exists(mdd_tobj)) {
- mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
+ mdd_write_lock(env, mdd_tobj, DT_TGT_CHILD);
tobj_locked = 1;
if (mdd_is_dead_obj(mdd_tobj)) {
/* shld not be dead, something is wrong */
/* fetch updated nlink */
rc = mdd_la_get(env, mdd_tobj, tattr);
- if (rc != 0) {
+ if (rc == -ENOENT) {
+ /* the object got removed, let's
+ * return the latest known attributes */
+ tattr->la_nlink = 0;
+ rc = 0;
+ } else if (rc != 0) {
CERROR("%s: Failed to get nlink for tobj "
DFID": rc = %d\n",
mdd2obd_dev(mdd)->obd_name,
ma->ma_attr = *tattr;
ma->ma_valid |= MA_INODE;
- if (tattr->la_nlink == 0) {
+ if (tattr->la_nlink == 0)
cl_flags |= CLF_RENAME_LAST;
- if (mdd_hsm_archive_exists(env, mdd_tobj, ma))
- cl_flags |= CLF_RENAME_LAST_EXISTS;
- }
+ else
+ cl_flags &= ~CLF_RENAME_LAST_EXISTS;
}
la->la_valid = LA_CTIME | LA_MTIME;
}
rc2 = __mdd_index_insert(env, mdd_tpobj,
- mdo2fid(mdd_tobj),
- mdd_object_type(mdd_tobj),
- tname, handle);
+ mdd_object_fid(mdd_tobj),
+ mdd_object_type(mdd_tobj),
+ tname, handle);
if (rc2 != 0)
CWARN("tp obj fix error: rc = %d\n", rc2);
}
ltname, lsname, handle);
stop:
- mdd_trans_stop(env, mdd, rc, handle);
+ rc = mdd_trans_stop(env, mdd, rc, handle);
out_pending:
mdd_object_put(env, mdd_sobj);
}
/**
- * During migration once the parent FID has been changed,
- * we need update the parent FID in linkea.
+ * Check whether we should migrate the file/dir
+ * return val
+ * < 0 permission check failed or other error.
+ * = 0 the file can be migrated.
**/
-static int mdd_linkea_update_child_internal(const struct lu_env *env,
- struct mdd_object *parent,
- struct mdd_object *child,
- const char *name, int namelen,
- struct thandle *handle,
- bool declare)
+static int mdd_migrate_sanity_check(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *spobj,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_attr *spattr,
+ const struct lu_attr *tpattr,
+ const struct lu_attr *attr)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct linkea_data ldata = { NULL };
- struct lu_buf *buf = &info->mti_link_buf;
- int count;
- int rc = 0;
+ int rc;
ENTRY;
- buf = lu_buf_check_and_alloc(buf, PATH_MAX);
- if (buf->lb_buf == NULL)
- RETURN(-ENOMEM);
-
- ldata.ld_buf = buf;
- rc = mdd_links_read(env, child, &ldata);
- if (rc != 0) {
- if (rc == -ENOENT || rc == -ENODATA)
- rc = 0;
- RETURN(rc);
+ if (!mdd_object_remote(sobj)) {
+ mdd_read_lock(env, sobj, DT_SRC_CHILD);
+ if (sobj->mod_count > 0) {
+ CDEBUG(D_INFO, "%s: "DFID" is opened, count %d\n",
+ mdd_obj_dev_name(sobj),
+ PFID(mdd_object_fid(sobj)),
+ sobj->mod_count);
+ mdd_read_unlock(env, sobj);
+ RETURN(-EBUSY);
+ }
+ mdd_read_unlock(env, sobj);
}
- LASSERT(ldata.ld_leh != NULL);
- ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
- for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
- struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
- struct lu_name lname;
- struct lu_fid fid;
-
- linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
- &lname, &fid);
-
- if (strncmp(lname.ln_name, name, namelen) != 0 ||
- lu_fid_eq(&fid, mdd_object_fid(parent))) {
- ldata.ld_lee = (struct link_ea_entry *)
- ((char *)ldata.ld_lee +
- ldata.ld_reclen);
- continue;
- }
+ if (mdd_object_exists(tobj))
+ RETURN(-EEXIST);
- CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
- mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
- lname.ln_namelen, lname.ln_name,
- PFID(mdd_object_fid(parent)));
- /* update to the new parent fid */
- linkea_entry_pack(ldata.ld_lee, &lname,
- mdd_object_fid(parent));
- if (declare)
- rc = mdd_declare_links_add(env, child, handle, &ldata,
- MLAO_IGNORE);
- else
- rc = mdd_links_write(env, child, &ldata, handle);
- break;
- }
+ rc = mdd_rename_sanity_check(env, spobj, spattr, tpobj, tpattr, sobj,
+ attr, NULL, NULL);
RETURN(rc);
}
-static int mdd_linkea_declare_update_child(const struct lu_env *env,
- struct mdd_object *parent,
- struct mdd_object *child,
- const char *name, int namelen,
- struct thandle *handle)
+typedef int (*mdd_dir_stripe_cb)(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct mdd_object *stripe,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ int index,
+ struct thandle *handle);
+
+static int mdd_dir_declare_delete_stripe(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct mdd_object *stripe,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ int index,
+ struct thandle *handle)
{
- return mdd_linkea_update_child_internal(env, parent, child, name,
- namelen, handle, true);
-}
+ struct mdd_thread_info *info = mdd_env_info(env);
+ char *stripe_name = info->mti_name;
+ struct lmv_user_md *lmu = lmu_buf->lb_buf;
+ int rc;
-static int mdd_linkea_update_child(const struct lu_env *env,
- struct mdd_object *parent,
- struct mdd_object *child,
- const char *name, int namelen,
- struct thandle *handle)
-{
- return mdd_linkea_update_child_internal(env, parent, child, name,
- namelen, handle, false);
-}
+ if (index < le32_to_cpu(lmu->lum_stripe_count))
+ return 0;
-static int mdd_update_linkea_internal(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *child_name,
- struct linkea_data *ldata,
- struct thandle *handle,
- int declare)
-{
- struct mdd_thread_info *info = mdd_env_info(env);
- int count;
- int rc = 0;
- ENTRY;
+ rc = mdo_declare_index_delete(env, stripe, dotdot, handle);
+ if (rc)
+ return rc;
- LASSERT(ldata->ld_buf != NULL);
+ snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
+ PFID(mdd_object_fid(stripe)), index);
-again:
- /* If it is mulitple links file, we need update the name entry for
- * all parent */
- LASSERT(ldata->ld_leh != NULL);
- ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
- for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- struct mdd_object *pobj;
- struct lu_name lname;
- struct lu_fid fid;
-
- linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
- &lname, &fid);
- pobj = mdd_object_find(env, mdd, &fid);
- if (IS_ERR(pobj)) {
- CWARN("%s: cannot find obj "DFID": rc = %ld\n",
- mdd2obd_dev(mdd)->obd_name, PFID(&fid),
- PTR_ERR(pobj));
- linkea_del_buf(ldata, &lname);
- goto again;
- }
+ rc = mdo_declare_index_delete(env, obj, stripe_name, handle);
+ if (rc)
+ return rc;
- if (!mdd_object_exists(pobj)) {
- CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
- mdd2obd_dev(mdd)->obd_name, PFID(&fid));
- linkea_del_buf(ldata, &lname);
- mdd_object_put(env, pobj);
- goto again;
- }
+ rc = mdo_declare_ref_del(env, obj, handle);
- if (pobj == mdd_pobj &&
- lname.ln_namelen == child_name->ln_namelen &&
- strncmp(lname.ln_name, child_name->ln_name,
- lname.ln_namelen) == 0) {
- CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
- mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
- PFID(&fid));
- linkea_del_buf(ldata, &lname);
- mdd_object_put(env, pobj);
- goto again;
- }
+ return rc;
+}
- CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
- mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
- PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
+/* delete stripe from its master object namespace */
+static int mdd_dir_delete_stripe(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct mdd_object *stripe,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ int index,
+ struct thandle *handle)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ char *stripe_name = info->mti_name;
+ struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+ struct lmv_user_md *lmu = lmu_buf->lb_buf;
+ __u32 del_offset = le32_to_cpu(lmu->lum_stripe_count);
+ int rc;
- if (declare) {
- /* Remove source name from source directory */
- /* Insert new fid with target name into target dir */
- rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
- handle);
- if (rc != 0)
- GOTO(next_put, rc);
+ ENTRY;
- rc = mdo_declare_index_insert(env, pobj,
- mdd_object_fid(mdd_tobj),
- mdd_object_type(mdd_tobj),
- lname.ln_name, handle);
- if (rc != 0)
- GOTO(next_put, rc);
+ /* local dir will delete via LOD */
+ LASSERT(mdd_object_remote(obj));
+ LASSERT(del_offset < le32_to_cpu(lmv->lmv_stripe_count));
- rc = mdo_declare_ref_add(env, mdd_tobj, handle);
- if (rc)
- GOTO(next_put, rc);
+ if (index < del_offset)
+ RETURN(0);
- rc = mdo_declare_ref_del(env, mdd_sobj, handle);
- if (rc)
- GOTO(next_put, rc);
- } else {
- char *tmp_name = info->mti_key;
-
- if (lname.ln_namelen >= sizeof(info->mti_key)) {
- /* lnamelen is too big(> NAME_MAX + 16),
- * something wrong about this linkea, let's
- * skip it */
- linkea_del_buf(ldata, &lname);
- mdd_object_put(env, pobj);
- goto again;
- }
+ mdd_write_lock(env, stripe, DT_SRC_CHILD);
+ rc = __mdd_index_delete_only(env, stripe, dotdot, handle);
+ if (rc)
+ GOTO(out, rc);
- /* Note: lname might be without \0 at the end, see
- * linkea_entry_unpack(), let's add extra \0 by
- * snprintf */
- snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
- lname.ln_namelen, lname.ln_name);
- lname.ln_name = tmp_name;
-
- /* Let's check if this linkEA still valid, before
- * it might be packed into the RPC buffer. */
- rc = mdd_lookup(env, &pobj->mod_obj, &lname,
- &info->mti_fid, NULL);
- if (rc < 0 ||
- !lu_fid_eq(&info->mti_fid,
- mdd_object_fid(mdd_sobj))) {
- /* skip invalid linkea entry */
- linkea_del_buf(ldata, &lname);
- mdd_object_put(env, pobj);
- goto again;
- }
+ snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
+ PFID(mdd_object_fid(stripe)), index);
- rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
- if (rc != 0)
- GOTO(next_put, rc);
+ rc = __mdd_index_delete_only(env, obj, stripe_name, handle);
+ if (rc)
+ GOTO(out, rc);
- rc = __mdd_index_insert(env, pobj,
- mdd_object_fid(mdd_tobj),
- mdd_object_type(mdd_tobj),
- tmp_name, handle);
- if (rc != 0)
- GOTO(next_put, rc);
+ rc = mdo_ref_del(env, obj, handle);
+ GOTO(out, rc);
+out:
+ mdd_write_unlock(env, stripe);
- mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
- rc = mdo_ref_add(env, mdd_tobj, handle);
- mdd_write_unlock(env, mdd_tobj);
- if (rc)
- GOTO(next_put, rc);
+ return rc;
+}
- mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
- mdo_ref_del(env, mdd_sobj, handle);
- mdd_write_unlock(env, mdd_sobj);
- }
-next_put:
- mdd_object_put(env, pobj);
- if (rc != 0)
- break;
+static int mdd_dir_declare_destroy_stripe(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct mdd_object *stripe,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ int index,
+ struct thandle *handle)
+{
+ struct lmv_user_md *lmu = lmu_buf->lb_buf;
+ __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
+ int rc;
+
+ if (index < shrink_offset) {
+ if (shrink_offset < 2)
+ return 0;
+ return mdo_declare_xattr_set(env, stripe, lmv_buf,
+ XATTR_NAME_LMV".set", 0, handle);
+ }
+
+ rc = mdo_declare_ref_del(env, stripe, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_destroy(env, stripe, handle);
+
+ return rc;
+}
+
+static int mdd_dir_destroy_stripe(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct mdd_object *stripe,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ int index,
+ struct thandle *handle)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+ struct lmv_user_md *lmu = lmu_buf->lb_buf;
+ __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
+ int rc;
+
+ ENTRY;
- ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
- ldata->ld_reclen);
+ /* update remaining stripes' LMV */
+ if (index < shrink_offset) {
+ struct lmv_mds_md_v1 *slave_lmv;
+ struct lu_buf slave_buf = {
+ .lb_buf = &info->mti_lmv.lmv_md_v1,
+ .lb_len = sizeof(*slave_lmv)
+ };
+ __u32 version = le32_to_cpu(lmv->lmv_layout_version);
+
+ /* if dir will be shrunk to 1-stripe, don't update */
+ if (shrink_offset < 2)
+ RETURN(0);
+
+ slave_lmv = slave_buf.lb_buf;
+ memset(slave_lmv, 0, sizeof(*slave_lmv));
+ slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
+ slave_lmv->lmv_stripe_count = lmu->lum_stripe_count;
+ slave_lmv->lmv_master_mdt_index = cpu_to_le32(index);
+ slave_lmv->lmv_hash_type = lmv->lmv_hash_type &
+ cpu_to_le32(LMV_HASH_TYPE_MASK);
+ slave_lmv->lmv_layout_version = cpu_to_le32(++version);
+
+ rc = mdo_xattr_set(env, stripe, &slave_buf,
+ XATTR_NAME_LMV".set", 0, handle);
+ RETURN(rc);
}
+ mdd_write_lock(env, stripe, DT_SRC_CHILD);
+ rc = mdo_ref_del(env, stripe, handle);
+ if (!rc)
+ rc = mdo_destroy(env, stripe, handle);
+ mdd_write_unlock(env, stripe);
+
RETURN(rc);
}
-static int mdd_migrate_xattrs(const struct lu_env *env,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj)
+static int mdd_shrink_stripe_is_empty(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct mdd_object *stripe,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ int index,
+ struct thandle *handle)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- char *xname;
- struct thandle *handle;
- struct lu_buf xbuf;
- int xlen;
- int rem;
- int xsize;
- int list_xsize;
- struct lu_buf list_xbuf;
- int rc;
- int rc1;
+ struct lmv_user_md *lmu = lmu_buf->lb_buf;
+ __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
+
+ /* the default value is 0, but it means 1 */
+ if (!shrink_offset)
+ shrink_offset = 1;
+
+ if (index < shrink_offset)
+ return 0;
+
+ return mdd_dir_is_empty(env, stripe);
+}
+
+/*
+ * iterate stripes of striped directory on remote MDT, local striped directory
+ * is accessed via LOD.
+ */
+static int mdd_dir_iterate_stripes(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ struct thandle *handle,
+ mdd_dir_stripe_cb cb)
+{
+ struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+ struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
+ struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+ struct mdd_object *stripe;
+ int i;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(lmv);
+
+ for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
+ fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
+ if (!fid_is_sane(fid))
+ continue;
+
+ stripe = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(stripe))
+ RETURN(PTR_ERR(stripe));
+
+ rc = cb(env, obj, stripe, lmv_buf, lmu_buf, i, handle);
+ mdd_object_put(env, stripe);
+ if (rc)
+ RETURN(rc);
+ }
+
+ RETURN(0);
+}
+
+typedef int (*mdd_xattr_cb)(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_buf *buf,
+ const char *name,
+ int fl, struct thandle *handle);
+
+/* iterate xattrs, but ignore LMA, LMV, and LINKEA if 'skip_linkea' is set. */
+static int mdd_iterate_xattrs(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ bool skip_linkea,
+ struct thandle *handle,
+ mdd_xattr_cb cb)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ char *xname;
+ struct lu_buf list_xbuf;
+ struct lu_buf cbxbuf;
+ struct lu_buf xbuf = { NULL };
+ int list_xsize;
+ int xlen;
+ int rem;
+ int xsize;
+ int rc;
+
+ ENTRY;
/* retrieve xattr list from the old object */
- list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
+ list_xsize = mdo_xattr_list(env, sobj, &LU_BUF_NULL);
if (list_xsize == -ENODATA)
- return 0;
+ RETURN(0);
if (list_xsize < 0)
- return list_xsize;
+ RETURN(list_xsize);
lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
if (info->mti_big_buf.lb_buf == NULL)
- return -ENOMEM;
+ RETURN(-ENOMEM);
list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
list_xbuf.lb_len = list_xsize;
- rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
+ rc = mdo_xattr_list(env, sobj, &list_xbuf);
if (rc < 0)
- return rc;
+ RETURN(rc);
+
+ rem = rc;
rc = 0;
- rem = list_xsize;
xname = list_xbuf.lb_buf;
while (rem > 0) {
xlen = strnlen(xname, rem - 1) + 1;
- if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
- strcmp(XATTR_NAME_LMA, xname) == 0 ||
+ if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
strcmp(XATTR_NAME_LMV, xname) == 0)
goto next;
- /* For directory, if there are default layout, migrate here */
- if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
- !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
+ if (skip_linkea &&
+ strcmp(XATTR_NAME_LINK, xname) == 0)
goto next;
- xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
+ xsize = mdo_xattr_get(env, sobj, &LU_BUF_NULL, xname);
if (xsize == -ENODATA)
goto next;
if (xsize < 0)
- GOTO(out, rc);
+ GOTO(out, rc = xsize);
- lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
- if (info->mti_link_buf.lb_buf == NULL)
+ lu_buf_check_and_alloc(&xbuf, xsize);
+ if (xbuf.lb_buf == NULL)
GOTO(out, rc = -ENOMEM);
- xbuf.lb_len = xsize;
- xbuf.lb_buf = info->mti_link_buf.lb_buf;
- rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
+ rc = mdo_xattr_get(env, sobj, &xbuf, xname);
if (rc == -ENODATA)
goto next;
if (rc < 0)
GOTO(out, rc);
- handle = mdd_trans_create(env, mdd);
- if (IS_ERR(handle))
- GOTO(out, rc = PTR_ERR(handle));
-
- rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
- handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
- /* Note: this transaction is part of migration, and it is not
- * the last step of migration, so we set th_local = 1 to avoid
- * update last rcvd for this transaction */
- handle->th_local = 1;
- rc = mdd_trans_start(env, mdd, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
-
- rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
- if (rc == -EEXIST)
- GOTO(stop_trans, rc = 0);
+ cbxbuf = xbuf;
+ cbxbuf.lb_len = xsize;
+repeat:
+ rc = cb(env, tobj, &cbxbuf, xname, 0, handle);
+ if (unlikely(rc == -ENOSPC &&
+ strcmp(xname, XATTR_NAME_LINK) == 0)) {
+ rc = linkea_overflow_shrink(
+ (struct linkea_data *)(cbxbuf.lb_buf));
+ if (likely(rc > 0)) {
+ cbxbuf.lb_len = rc;
+ goto repeat;
+ }
+ }
- if (rc != 0)
- GOTO(stop_trans, rc);
-stop_trans:
- rc1 = mdd_trans_stop(env, mdd, rc, handle);
- if (rc == 0)
- rc = rc1;
- if (rc != 0)
+ if (rc)
GOTO(out, rc);
next:
+ xname += xlen;
rem -= xlen;
- memmove(xname, xname + xlen, rem);
}
+
out:
- return rc;
+ lu_buf_free(&xbuf);
+ RETURN(rc);
}
-static int mdd_declare_migrate_create(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- struct md_op_spec *spec,
- struct lu_attr *la,
- union lmv_mds_md *mgr_ea,
- struct linkea_data *ldata,
- struct thandle *handle)
+typedef int (*mdd_linkea_cb)(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *sname,
+ const struct lu_fid *sfid,
+ const struct lu_name *lname,
+ const struct lu_fid *fid,
+ void *opaque,
+ struct thandle *handle);
+
+static int mdd_declare_update_link(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *tname,
+ const struct lu_fid *tpfid,
+ const struct lu_name *lname,
+ const struct lu_fid *fid,
+ void *unused,
+ struct thandle *handle)
{
- struct lu_attr *la_flag = MDD_ENV_VAR(env, la_for_fix);
- const struct lu_buf *buf;
- int rc;
- int mgr_easize;
-
- rc = mdd_declare_object_create_internal(env, mdd_pobj, mdd_tobj, la,
- handle, spec, NULL);
- if (rc != 0)
- return rc;
-
- rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
- handle);
- if (rc != 0)
- return rc;
+ struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+ struct mdd_object *pobj;
+ int rc;
- if (S_ISLNK(la->la_mode)) {
- const char *target_name = spec->u.sp_symname;
- int sym_len = strlen(target_name);
- const struct lu_buf *buf;
+ /* ignore tobj */
+ if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+ !strcmp(tname->ln_name, lname->ln_name))
+ return 0;
- buf = mdd_buf_get_const(env, target_name, sym_len);
- rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
- buf, 0, handle);
- if (rc != 0)
- return rc;
- } else if (S_ISDIR(la->la_mode)) {
- rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
- MLAO_IGNORE);
- if (rc != 0)
- return rc;
- }
+ pobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(pobj))
+ return PTR_ERR(pobj);
- if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
- buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
- spec->u.sp_ea.eadatalen);
- rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
- 0, handle);
- if (rc)
- return rc;
- }
- mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
- buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
- rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
- 0, handle);
+ rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
+ if (!rc)
+ rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
+ mdd_object_type(sobj),
+ lname->ln_name, handle);
+ mdd_object_put(env, pobj);
if (rc)
return rc;
- la_flag->la_valid = LA_FLAGS;
- la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
- rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
+ rc = mdo_declare_ref_add(env, tobj, handle);
+ if (rc)
+ return rc;
+ rc = mdo_declare_ref_del(env, sobj, handle);
return rc;
}
-static int mdd_migrate_create(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *lname,
- struct lu_attr *la)
+static int mdd_update_link(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *tname,
+ const struct lu_fid *tpfid,
+ const struct lu_name *lname,
+ const struct lu_fid *fid,
+ void *unused,
+ struct thandle *handle)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- struct md_op_spec *spec = &info->mti_spec;
- struct lu_buf lmm_buf = { NULL };
- struct lu_buf link_buf = { NULL };
- const struct lu_buf *buf;
- struct thandle *handle;
- struct lmv_mds_md_v1 *mgr_ea;
- struct lu_attr *la_flag = MDD_ENV_VAR(env, la_for_fix);
- struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
- int mgr_easize;
- struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
- int rc;
- ENTRY;
-
- /* prepare spec for create */
- memset(spec, 0, sizeof(*spec));
- spec->sp_cr_lookup = 0;
- spec->sp_feat = &dt_directory_features;
- if (S_ISLNK(la->la_mode)) {
- buf = lu_buf_check_and_alloc(
- &mdd_env_info(env)->mti_big_buf,
- la->la_size + 1);
- link_buf = *buf;
- link_buf.lb_len = la->la_size + 1;
- memset(link_buf.lb_buf, 0, link_buf.lb_len);
- rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
- if (rc <= 0) {
- rc = rc != 0 ? rc : -EFAULT;
- CERROR("%s: "DFID" readlink failed: rc = %d\n",
- mdd2obd_dev(mdd)->obd_name,
- PFID(mdd_object_fid(mdd_sobj)), rc);
- RETURN(rc);
- }
- spec->u.sp_symname = link_buf.lb_buf;
- } else if (S_ISREG(la->la_mode)) {
- /* retrieve lov of the old object */
- rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
- if (rc != 0 && rc != -ENODATA)
- RETURN(rc);
- if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
- spec->u.sp_ea.eadata = lmm_buf.lb_buf;
- spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
- spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
- }
- } else if (S_ISDIR(la->la_mode)) {
- rc = mdd_links_read(env, mdd_sobj, ldata);
- if (rc < 0 && rc != -ENODATA)
- RETURN(rc);
- }
-
- mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
- memset(mgr_ea, 0, sizeof(*mgr_ea));
- mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
- mgr_ea->lmv_stripe_count = cpu_to_le32(2);
- mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
- mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
- fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
- fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
+ struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+ struct mdd_object *pobj;
+ int rc;
- mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
+ ENTRY;
- handle = mdd_trans_create(env, mdd);
- if (IS_ERR(handle))
- GOTO(out_free, rc = PTR_ERR(handle));
+ LASSERT(lu_name_is_valid(lname));
- /* Note: this transaction is part of migration, and it is not
- * the last step of migration, so we set th_local = 1 to avoid
- * update last rcvd for this transaction */
- handle->th_local = 1;
- rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
- spec, la,
- (union lmv_mds_md *)info->mti_xattr_buf,
- ldata, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
+ /* ignore tobj */
+ if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+ !strncmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
+ RETURN(0);
- rc = mdd_trans_start(env, mdd, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
+ CDEBUG(D_INFO, "update "DFID"/"DNAME":"DFID"\n",
+ PFID(fid), PNAME(lname), PFID(mdd_object_fid(tobj)));
- /* create the target object */
- rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
- hint, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
+ pobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(pobj)) {
+ CWARN("%s: cannot find obj "DFID": %ld\n",
+ mdd2obd_dev(mdd)->obd_name, PFID(fid), PTR_ERR(pobj));
+ RETURN(PTR_ERR(pobj));
+ }
- if (S_ISDIR(la->la_mode)) {
- rc = mdd_links_write(env, mdd_tobj, ldata, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
+ if (!mdd_object_exists(pobj)) {
+ CDEBUG(D_INFO, DFID" doesn't exist\n", PFID(fid));
+ mdd_object_put(env, pobj);
+ RETURN(-ENOENT);
}
- /* Set MIGRATE EA on the source inode, so once the migration needs
- * to be re-done during failover, the re-do process can locate the
- * target object which is already being created. */
- mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
- buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
- rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
+ mdd_write_lock(env, pobj, DT_TGT_PARENT);
+ rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
+ if (!rc)
+ rc = __mdd_index_insert_only(env, pobj, mdd_object_fid(tobj),
+ mdd_object_type(sobj),
+ lname->ln_name, handle);
+ mdd_write_unlock(env, pobj);
+ mdd_object_put(env, pobj);
+ if (rc)
+ RETURN(rc);
- /* Set immutable flag, so any modification is disabled until
- * the migration is done. Once the migration is interrupted,
- * if the resume process find the migrating object has both
- * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
- * flag and approve the migration */
- la_flag->la_valid = LA_FLAGS;
- la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
- rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
-stop_trans:
- if (handle != NULL) {
- int rc1;
+ mdd_write_lock(env, tobj, DT_TGT_CHILD);
+ rc = mdo_ref_add(env, tobj, handle);
+ mdd_write_unlock(env, tobj);
+ if (rc)
+ RETURN(rc);
+
+ mdd_write_lock(env, sobj, DT_SRC_CHILD);
+ rc = mdo_ref_del(env, sobj, handle);
+ mdd_write_unlock(env, sobj);
- rc1 = mdd_trans_stop(env, mdd, rc, handle);
- if (rc == 0)
- rc = rc1;
- }
-out_free:
- if (lmm_buf.lb_buf != NULL)
- OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
RETURN(rc);
}
-static int mdd_migrate_entries(const struct lu_env *env,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj)
+static inline int mdd_fld_lookup(const struct lu_env *env,
+ struct mdd_device *mdd,
+ const struct lu_fid *fid,
+ __u32 *mdt_index)
{
- struct dt_object *next = mdd_object_child(mdd_sobj);
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- struct dt_object *dt_tobj = mdd_object_child(mdd_tobj);
- struct thandle *handle;
- struct dt_it *it;
- const struct dt_it_ops *iops;
- int rc;
- int result;
- struct lu_dirent *ent;
- ENTRY;
-
- OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
- if (ent == NULL)
- RETURN(-ENOMEM);
+ struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
+ struct seq_server_site *ss;
+ int rc;
- if (!dt_try_as_dir(env, next))
- GOTO(out_ent, rc = -ENOTDIR);
- /*
- * iterate directories
- */
- iops = &next->do_index_ops->dio_it;
- it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
- if (IS_ERR(it))
- GOTO(out_ent, rc = PTR_ERR(it));
+ ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
- rc = iops->load(env, it, 0);
- if (rc == 0)
- rc = iops->next(env, it);
- else if (rc > 0)
- rc = 0;
- /*
- * At this point and across for-loop:
- *
- * rc == 0 -> ok, proceed.
- * rc > 0 -> end of directory.
- * rc < 0 -> error.
- */
- do {
- struct mdd_object *child;
- char *name = mdd_env_info(env)->mti_key;
- int len;
- int recsize;
- int is_dir;
- bool target_exist = false;
- int rc1;
-
- len = iops->key_size(env, it);
- if (len == 0)
- goto next;
+ range->lsr_flags = LU_SEQ_RANGE_MDT;
+ rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
+ if (rc)
+ return rc;
- result = iops->rec(env, it, (struct dt_rec *)ent,
- LUDA_FID | LUDA_TYPE);
- if (result == -ESTALE)
- goto next;
- if (result != 0) {
- rc = result;
- goto out;
- }
+ *mdt_index = range->lsr_index;
- fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
- recsize = le16_to_cpu(ent->lde_reclen);
+ return 0;
+}
- /* Insert new fid with target name into target dir */
- if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
- (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
- ent->lde_name[1] == '.'))
- goto next;
+static int mdd_is_link_on_source_mdt(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *tname,
+ const struct lu_fid *tpfid,
+ const struct lu_name *lname,
+ const struct lu_fid *fid,
+ void *opaque,
+ struct thandle *handle)
+{
+ struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+ __u32 source_mdt_index = *(__u32 *)opaque;
+ __u32 link_mdt_index;
+ int rc;
- child = mdd_object_find(env, mdd, &ent->lde_fid);
- if (IS_ERR(child))
- GOTO(out, rc = PTR_ERR(child));
+ ENTRY;
- mdd_write_lock(env, child, MOR_SRC_CHILD);
- is_dir = S_ISDIR(mdd_object_type(child));
+ /* ignore tobj */
+ if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+ !strcmp(tname->ln_name, lname->ln_name))
+ return 0;
- snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
+ rc = mdd_fld_lookup(env, mdd, fid, &link_mdt_index);
+ if (rc)
+ RETURN(rc);
- /* Check whether the name has been inserted to the target */
- if (dt_try_as_dir(env, dt_tobj)) {
- struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
+ RETURN(link_mdt_index == source_mdt_index);
+}
- rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
- (struct dt_key *)name);
- if (unlikely(rc == 0))
- target_exist = true;
- }
+static int mdd_iterate_linkea(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *tname,
+ const struct lu_fid *tpfid,
+ struct linkea_data *ldata,
+ void *opaque,
+ struct thandle *handle,
+ mdd_linkea_cb cb)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ char *filename = info->mti_name;
+ struct lu_name lname;
+ struct lu_fid fid;
+ int rc = 0;
- handle = mdd_trans_create(env, mdd);
- if (IS_ERR(handle))
- GOTO(out, rc = PTR_ERR(handle));
-
- /* Note: this transaction is part of migration, and it is not
- * the last step of migration, so we set th_local = 1 to avoid
- * updating last rcvd for this transaction */
- handle->th_local = 1;
- if (likely(!target_exist)) {
- rc = mdo_declare_index_insert(env, mdd_tobj,
- &ent->lde_fid,
- mdd_object_type(child),
- name, handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ if (!ldata->ld_buf)
+ return 0;
- if (is_dir) {
- rc = mdo_declare_ref_add(env, mdd_tobj, handle);
- if (rc != 0)
- GOTO(out_put, rc);
- }
- }
+ for (linkea_first_entry(ldata); ldata->ld_lee && !rc;
+ linkea_next_entry(ldata)) {
+ linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
+ &fid);
- rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ /* Note: lname might miss \0 at the end */
+ snprintf(filename, sizeof(info->mti_name), "%.*s",
+ lname.ln_namelen, lname.ln_name);
+ lname.ln_name = filename;
- if (is_dir) {
- rc = mdo_declare_ref_del(env, mdd_sobj, handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ CDEBUG(D_INFO, DFID"/"DNAME"\n", PFID(&fid), PNAME(&lname));
- /* Update .. for child */
- rc = mdo_declare_index_delete(env, child, dotdot,
- handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
+ handle);
+ }
- rc = mdo_declare_index_insert(env, child,
- mdd_object_fid(mdd_tobj),
- S_IFDIR, dotdot, handle);
- if (rc != 0)
- GOTO(out_put, rc);
- }
+ return rc;
+}
- rc = mdd_linkea_declare_update_child(env, mdd_tobj,
- child, name,
- strlen(name),
- handle);
- if (rc != 0)
- GOTO(out_put, rc);
+/**
+ * Prepare linkea, and check whether file needs migrate: if source still has
+ * link on source MDT, no need to migrate, just update namespace on source and
+ * target parents.
+ *
+ * \retval 0 do migrate
+ * \retval 1 don't migrate
+ * \retval -errno on failure
+ */
+static int migrate_linkea_prepare(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *spobj,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ const struct lu_name *lname,
+ const struct lu_attr *attr,
+ struct linkea_data *ldata)
+{
+ __u32 source_mdt_index;
+ int rc;
- rc = mdd_trans_start(env, mdd, handle);
- if (rc != 0) {
- CERROR("%s: transaction start failed: rc = %d\n",
- mdd2obd_dev(mdd)->obd_name, rc);
- GOTO(out_put, rc);
- }
+ ENTRY;
- if (likely(!target_exist)) {
- rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
- mdd_object_type(child),
- name, handle);
- if (rc != 0)
- GOTO(out_put, rc);
- }
+ memset(ldata, 0, sizeof(*ldata));
+ rc = mdd_linkea_prepare(env, sobj, mdd_object_fid(spobj), lname,
+ mdd_object_fid(tpobj), lname, 1, 0, ldata);
+ if (rc)
+ RETURN(rc);
- rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ /*
+ * Then it will check if the file should be migrated. If the file has
+ * mulitple links, we only need migrate the file if all of its entries
+ * has been migrated to the remote MDT.
+ */
+ if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
+ RETURN(0);
- if (is_dir) {
- rc = __mdd_index_delete_only(env, child, dotdot,
- handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ /* If there are still links locally, don't migrate this file */
+ LASSERT(ldata->ld_leh != NULL);
- rc = __mdd_index_insert_only(env, child,
- mdd_object_fid(mdd_tobj), S_IFDIR,
- dotdot, handle);
- if (rc != 0)
- GOTO(out_put, rc);
- }
+ /*
+ * If linkEA is overflow, it means there are some unknown name entries
+ * under unknown parents, which will prevent the migration.
+ */
+ if (unlikely(ldata->ld_leh->leh_overflow_time))
+ RETURN(-EOVERFLOW);
- rc = mdd_linkea_update_child(env, mdd_tobj, child, name,
- strlen(name), handle);
+ rc = mdd_fld_lookup(env, mdd, mdd_object_fid(sobj), &source_mdt_index);
+ if (rc)
+ RETURN(rc);
-out_put:
- mdd_write_unlock(env, child);
- mdd_object_put(env, child);
- rc1 = mdd_trans_stop(env, mdd, rc, handle);
- if (rc == 0)
- rc = rc1;
-
- if (rc != 0)
- GOTO(out, rc);
-next:
- result = iops->next(env, it);
- if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
- GOTO(out, rc = -EINTR);
-
- if (result == -ESTALE)
- goto next;
- } while (result == 0);
-out:
- iops->put(env, it);
- iops->fini(env, it);
-out_ent:
- OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
+ rc = mdd_iterate_linkea(env, sobj, NULL, lname, mdd_object_fid(tpobj),
+ ldata, &source_mdt_index, NULL,
+ mdd_is_link_on_source_mdt);
RETURN(rc);
}
-static int mdd_declare_update_linkea(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *child_name,
- struct linkea_data *ldata,
- struct thandle *handle)
+static int mdd_dir_declare_layout_delete(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ struct thandle *handle)
{
- return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
- child_name, ldata, handle, 1);
+ int rc;
+
+ if (!lmv_buf->lb_buf)
+ rc = mdo_declare_index_delete(env, obj, dotdot, handle);
+ else if (mdd_object_remote(obj))
+ rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
+ mdd_dir_declare_delete_stripe);
+ else
+ rc = mdo_declare_xattr_set(env, obj, lmu_buf,
+ XATTR_NAME_LMV".del", 0, handle);
+
+ return rc;
}
-static int mdd_update_linkea(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *child_name,
- struct linkea_data *ldata,
- struct thandle *handle)
+static int mdd_dir_layout_delete(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ struct thandle *handle)
{
- return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
- child_name, ldata, handle, 0);
+ int rc;
+
+ ENTRY;
+
+ mdd_write_lock(env, obj, DT_SRC_PARENT);
+ if (!lmv_buf->lb_buf)
+ /* normal dir */
+ rc = __mdd_index_delete_only(env, obj, dotdot, handle);
+ else if (mdd_object_remote(obj))
+ /* striped, but remote */
+ rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
+ mdd_dir_delete_stripe);
+ else
+ rc = mdo_xattr_set(env, obj, lmu_buf, XATTR_NAME_LMV".del", 0,
+ handle);
+ mdd_write_unlock(env, obj);
+
+ RETURN(rc);
}
-static int mdd_declare_migrate_update_name(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *lname,
- struct lu_attr *la,
- struct lu_attr *parent_la,
- struct linkea_data *ldata,
- struct thandle *handle)
+static int mdd_declare_migrate_create(const struct lu_env *env,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ struct lu_buf *sbuf,
+ struct linkea_data *ldata,
+ struct md_op_spec *spec,
+ struct dt_allocation_hint *hint,
+ struct thandle *handle)
{
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
int rc;
- /* Revert IMMUTABLE flag */
- la_flag->la_valid = LA_FLAGS;
- la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
- rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
- if (rc != 0)
- return rc;
+ if (S_ISDIR(attr->la_mode)) {
+ struct lu_buf lmu_buf = { NULL };
- /* delete entry from source dir */
- rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
- if (rc != 0)
- return rc;
+ if (lmv) {
+ struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
- if (ldata->ld_buf != NULL) {
- rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
- mdd_tobj, lname, ldata, handle);
- if (rc != 0)
+ lmu->lum_stripe_count = 0;
+ lmu_buf.lb_buf = lmu;
+ lmu_buf.lb_len = sizeof(*lmu);
+ }
+
+ rc = mdd_dir_declare_layout_delete(env, sobj, sbuf, &lmu_buf,
+ handle);
+ if (rc)
return rc;
+
+ if (lmv) {
+ rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
+ handle);
+ if (rc)
+ return rc;
+ }
}
- if (S_ISREG(mdd_object_type(mdd_sobj))) {
- rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
- handle);
- if (rc != 0)
+ rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
+ lname, attr, handle, spec, ldata, NULL, NULL,
+ NULL, hint);
+ if (rc)
+ return rc;
+
+ if (S_ISDIR(attr->la_mode) && mdd_dir_is_empty(env, sobj) != 0) {
+ if (!lmv) {
+ /*
+ * if sobj is not striped, fake a 1-stripe LMV, which
+ * will be used to generate a compound LMV for tobj.
+ */
+ LASSERT(sizeof(info->mti_key) >
+ lmv_mds_md_size(1, LMV_MAGIC_V1));
+ lmv = (typeof(lmv))info->mti_key;
+ memset(lmv, 0, sizeof(*lmv));
+ lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
+ lmv->lmv_stripe_count = cpu_to_le32(1);
+ fid_le_to_cpu(&lmv->lmv_stripe_fids[0],
+ mdd_object_fid(sobj));
+ sbuf->lb_buf = lmv;
+ sbuf->lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
+
+ rc = mdo_declare_xattr_set(env, tobj, sbuf,
+ XATTR_NAME_LMV".add", 0,
+ handle);
+ sbuf->lb_buf = NULL;
+ sbuf->lb_len = 0;
+ } else {
+ rc = mdo_declare_xattr_set(env, tobj, sbuf,
+ XATTR_NAME_LMV".add", 0,
+ handle);
+ }
+ if (rc)
return rc;
+ }
+
+ /*
+ * tobj mode will be used in lod_declare_xattr_set(), but it's not
+ * createb yet, copy from sobj.
+ */
+ tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
+ tobj->mod_obj.mo_lu.lo_header->loh_attr |=
+ sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
+
+ rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
+ mdo_declare_xattr_set);
+ if (rc)
+ return rc;
+
+ if (S_ISREG(attr->la_mode)) {
+ struct lu_buf fid_buf;
handle->th_complex = 1;
- rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
- XATTR_NAME_FID,
- LU_XATTR_REPLACE, handle);
- if (rc < 0)
+
+ /* target may be remote, update PFID via sobj. */
+ fid_buf.lb_buf = (void *)mdd_object_fid(tobj);
+ fid_buf.lb_len = sizeof(struct lu_fid);
+ rc = mdo_declare_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID,
+ 0, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+ if (rc)
return rc;
}
- if (S_ISDIR(mdd_object_type(mdd_sobj))) {
- rc = mdo_declare_ref_del(env, mdd_pobj, handle);
- if (rc != 0)
+ if (!S_ISDIR(attr->la_mode)) {
+ rc = mdd_iterate_linkea(env, sobj, tobj, lname,
+ mdd_object_fid(tpobj), ldata, NULL,
+ handle, mdd_declare_update_link);
+ if (rc)
return rc;
+
+ if (lmv) {
+ rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
+ handle);
+ if (rc)
+ return rc;
+ }
}
- /* new name */
- rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
- mdd_object_type(mdd_tobj),
- lname->ln_name, handle);
- if (rc != 0)
- return rc;
+ return rc;
+}
- rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL, MLAO_IGNORE);
- if (rc != 0)
+/**
+ * Create target, migrate xattrs and update links.
+ *
+ * Create target according to \a spec, and then migrate xattrs, if it's
+ * directory, migrate source stripes to target, else update fid to target
+ * for links.
+ *
+ * \param[in] env execution environment
+ * \param[in] tpobj target parent object
+ * \param[in] sobj source object
+ * \param[in] tobj target object
+ * \param[in] lname file name
+ * \param[in] attr source attributes
+ * \param[in] sbuf source LMV buf
+ * \param[in] ldata source linkea
+ * \param[in] spec migrate create spec
+ * \param[in] hint target creation hint
+ * \param[in] handle tranasction handle
+ *
+ * \retval 0 on success
+ * \retval -errno on failure
+ **/
+static int mdd_migrate_create(const struct lu_env *env,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ const struct lu_buf *sbuf,
+ struct linkea_data *ldata,
+ struct md_op_spec *spec,
+ struct dt_allocation_hint *hint,
+ struct thandle *handle)
+{
+ int rc;
+
+ ENTRY;
+
+ /*
+ * directory will migrate sobj stripes to tobj:
+ * 1. delete stripes from sobj.
+ * 2. add stripes to tobj, see lod_dir_declare_layout_add().
+ * 3. create/attach stripes for tobj, see lod_xattr_set_lmv().
+ */
+ if (S_ISDIR(attr->la_mode)) {
+ struct lu_buf lmu_buf = { NULL };
+
+ if (sbuf->lb_buf) {
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
+
+ lmu->lum_stripe_count = 0;
+ lmu_buf.lb_buf = lmu;
+ lmu_buf.lb_len = sizeof(*lmu);
+ }
+
+ rc = mdd_dir_layout_delete(env, sobj, sbuf, &lmu_buf, handle);
+ if (rc)
+ RETURN(rc);
+
+ /*
+ * delete LMV so that later when destroying sobj it won't delete
+ * stripes again.
+ */
+ if (sbuf->lb_buf) {
+ mdd_write_lock(env, sobj, DT_SRC_CHILD);
+ rc = mdo_xattr_del(env, sobj, XATTR_NAME_LMV, handle);
+ mdd_write_unlock(env, sobj);
+ if (rc)
+ RETURN(rc);
+ }
+ }
+
+ /* don't set nlink from sobj */
+ attr->la_valid &= ~LA_NLINK;
+
+ rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, NULL,
+ hint, handle, false);
+ if (rc)
+ RETURN(rc);
+
+ mdd_write_lock(env, tobj, DT_TGT_CHILD);
+ rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle, mdo_xattr_set);
+ mdd_write_unlock(env, tobj);
+ if (rc)
+ RETURN(rc);
+
+ /* for regular file, update OST objects XATTR_NAME_FID */
+ if (S_ISREG(attr->la_mode)) {
+ struct lu_buf fid_buf;
+
+ /* target may be remote, update PFID via sobj. */
+ fid_buf.lb_buf = (void *)mdd_object_fid(tobj);
+ fid_buf.lb_len = sizeof(struct lu_fid);
+ rc = mdo_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID, 0,
+ handle);
+ if (rc)
+ RETURN(rc);
+
+ /* delete LOV to avoid deleting OST objs when destroying sobj */
+ mdd_write_lock(env, sobj, DT_SRC_CHILD);
+ rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+ mdd_write_unlock(env, sobj);
+ if (rc)
+ RETURN(rc);
+ }
+
+ if (!S_ISDIR(attr->la_mode))
+ rc = mdd_iterate_linkea(env, sobj, tobj, lname,
+ mdd_object_fid(tpobj), ldata,
+ NULL, handle, mdd_update_link);
+
+ RETURN(rc);
+}
+
+static int mdd_declare_migrate_update(const struct lu_env *env,
+ struct mdd_object *spobj,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ struct lu_attr *spattr,
+ struct lu_attr *tpattr,
+ struct linkea_data *ldata,
+ bool do_create,
+ bool do_destroy,
+ struct md_attr *ma,
+ struct thandle *handle)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ const struct lu_fid *fid = mdd_object_fid(do_create ? tobj : sobj);
+ struct lu_attr *la = &info->mti_la_for_fix;
+ int rc;
+
+ rc = mdo_declare_index_delete(env, spobj, lname->ln_name, handle);
+ if (rc)
return rc;
- if (S_ISDIR(mdd_object_type(mdd_sobj))) {
- rc = mdo_declare_ref_add(env, mdd_pobj, handle);
- if (rc != 0)
+ if (S_ISDIR(attr->la_mode)) {
+ rc = mdo_declare_ref_del(env, spobj, handle);
+ if (rc)
return rc;
}
- /* delete old object */
- rc = mdo_declare_ref_del(env, mdd_sobj, handle);
- if (rc != 0)
+ rc = mdo_declare_index_insert(env, tpobj, fid, mdd_object_type(sobj),
+ lname->ln_name, handle);
+ if (rc)
return rc;
- if (S_ISDIR(mdd_object_type(mdd_sobj))) {
- /* delete old object */
- rc = mdo_declare_ref_del(env, mdd_sobj, handle);
- if (rc != 0)
- return rc;
- /* set nlink to 0 */
- rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
- if (rc != 0)
+ rc = mdd_declare_links_add(env, do_create ? tobj : sobj, handle, ldata);
+ if (rc)
+ return rc;
+
+ if (S_ISDIR(attr->la_mode)) {
+ rc = mdo_declare_ref_add(env, tpobj, handle);
+ if (rc)
return rc;
}
- rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdo_declare_attr_set(env, spobj, la, handle);
if (rc)
return rc;
- rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
- if (rc != 0)
- return rc;
+ if (tpobj != spobj) {
+ rc = mdo_declare_attr_set(env, tpobj, la, handle);
+ if (rc)
+ return rc;
+ }
+
+ if (do_create && do_destroy) {
+ rc = mdo_declare_ref_del(env, sobj, handle);
+ if (rc)
+ return rc;
- rc = mdd_declare_changelog_store(env, mdd, lname, NULL, handle);
+ rc = mdo_declare_destroy(env, sobj, handle);
+ if (rc)
+ return rc;
+ }
return rc;
}
-static int mdd_migrate_update_name(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *lname,
- struct md_attr *ma)
+/**
+ * migrate dirent from \a spobj to \a tpobj, and destroy \a sobj
+ **/
+static int mdd_migrate_update(const struct lu_env *env,
+ struct mdd_object *spobj,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ struct lu_attr *spattr,
+ struct lu_attr *tpattr,
+ struct linkea_data *ldata,
+ bool do_create,
+ bool do_destroy,
+ struct md_attr *ma,
+ struct thandle *handle)
{
- struct lu_attr *p_la = MDD_ENV_VAR(env, la_for_fix);
- struct lu_attr *so_attr = MDD_ENV_VAR(env, cattr);
- struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
- struct thandle *handle;
- int is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
- const char *name = lname->ln_name;
- int rc;
+ struct mdd_thread_info *info = mdd_env_info(env);
+ const struct lu_fid *fid = mdd_object_fid(do_create ? tobj : sobj);
+ struct lu_attr *la = &info->mti_la_for_fix;
+ int rc;
+
ENTRY;
- /* update time for parent */
- LASSERT(ma->ma_attr.la_valid & LA_CTIME);
- p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
- p_la->la_valid = LA_CTIME;
+ CDEBUG(D_INFO, "update %s "DFID"/"DFID" to "DFID"/"DFID"\n",
+ lname->ln_name, PFID(mdd_object_fid(spobj)),
+ PFID(mdd_object_fid(sobj)), PFID(mdd_object_fid(tpobj)),
+ PFID(fid));
- rc = mdd_la_get(env, mdd_sobj, so_attr);
- if (rc != 0)
+ rc = __mdd_index_delete(env, spobj, lname->ln_name,
+ S_ISDIR(attr->la_mode), handle);
+ if (rc)
RETURN(rc);
- ldata->ld_buf = NULL;
- rc = mdd_links_read(env, mdd_sobj, ldata);
- if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
+ rc = __mdd_index_insert(env, tpobj, fid, mdd_object_type(sobj),
+ lname->ln_name, handle);
+ if (rc)
RETURN(rc);
+ rc = mdd_links_write(env, do_create ? tobj : sobj, ldata, handle);
+ if (rc)
+ RETURN(rc);
+
+ la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+ la->la_valid = LA_CTIME | LA_MTIME;
+ mdd_write_lock(env, spobj, DT_SRC_PARENT);
+ rc = mdd_update_time(env, spobj, spattr, la, handle);
+ mdd_write_unlock(env, spobj);
+ if (rc)
+ RETURN(rc);
+
+ if (tpobj != spobj) {
+ la->la_valid = LA_CTIME | LA_MTIME;
+ mdd_write_lock(env, tpobj, DT_TGT_PARENT);
+ rc = mdd_update_time(env, tpobj, tpattr, la, handle);
+ mdd_write_unlock(env, tpobj);
+ if (rc)
+ RETURN(rc);
+ }
+
+ /*
+ * there are three situations we shouldn't destroy source:
+ * 1. if source is not dir, and it happens to be located on the same MDT
+ * as target parent.
+ * 2. if source is not dir, and has link on the same MDT where source is
+ * located.
+ * 3. if source is dir, and it's a normal, non-empty dir.
+ *
+ * the first two situations equals to !do_create, and the 3rd equals to
+ * !do_destroy, so the below condition is actually
+ * !(!do_create || !do_destroy).
+ *
+ * NB, if user has opened source dir before migration, he will get
+ * -ENOENT error when close it later, because source is likely to be
+ * remote, which can't be moved to orphan list, but except this error
+ * message, this won't cause any inconsistency or trouble.
+ */
+ if (do_create && do_destroy) {
+ mdd_write_lock(env, sobj, DT_SRC_CHILD);
+ mdo_ref_del(env, sobj, handle);
+ rc = mdo_destroy(env, sobj, handle);
+ mdd_write_unlock(env, sobj);
+ }
+
+ RETURN(rc);
+}
+
+/**
+ * Migrate directory or file.
+ *
+ * migrate source to target in following steps:
+ * 1. create target, append source stripes after target's if it's directory,
+ * migrate xattrs and update fid of source links.
+ * 2. update namespace: migrate dirent from source parent to target parent,
+ * update file linkea, and destroy source if it's not needed any more.
+ *
+ * \param[in] env execution environment
+ * \param[in] md_pobj parent master object
+ * \param[in] md_sobj source object
+ * \param[in] lname file name
+ * \param[in] md_tobj target object
+ * \param[in] spec target creation spec
+ * \param[in] ma used to update \a pobj mtime and ctime
+ *
+ * \retval 0 on success
+ * \retval -errno on failure
+ */
+static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
+ struct md_object *md_sobj, const struct lu_name *lname,
+ struct md_object *md_tobj, struct md_op_spec *spec,
+ struct md_attr *ma)
+{
+ struct mdd_device *mdd = mdo2mdd(md_pobj);
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct mdd_object *pobj = md2mdd_obj(md_pobj);
+ struct mdd_object *sobj = md2mdd_obj(md_sobj);
+ struct mdd_object *tobj = md2mdd_obj(md_tobj);
+ struct mdd_object *spobj = NULL;
+ struct mdd_object *tpobj = NULL;
+ struct lu_attr *spattr = &info->mti_pattr;
+ struct lu_attr *tpattr = &info->mti_tpattr;
+ struct lu_attr *attr = &info->mti_cattr;
+ struct linkea_data *ldata = &info->mti_link_data;
+ struct dt_allocation_hint *hint = &info->mti_hint;
+ struct lu_fid *fid = &info->mti_fid2;
+ struct lu_buf pbuf = { NULL };
+ struct lu_buf sbuf = { NULL };
+ struct lmv_mds_md_v1 *plmv;
+ struct thandle *handle;
+ bool do_create = true;
+ bool do_destroy = true;
+ int rc;
+ ENTRY;
+
+ rc = mdd_la_get(env, sobj, attr);
+ if (rc)
+ RETURN(rc);
+
+ /* locate source and target stripe on pobj, which are the real parent */
+ rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
+ if (rc < 0 && rc != -ENODATA)
+ RETURN(rc);
+
+ plmv = pbuf.lb_buf;
+ if (plmv) {
+ __u32 hash_type = le32_to_cpu(plmv->lmv_hash_type);
+ __u32 count = le32_to_cpu(plmv->lmv_stripe_count);
+ int index;
+
+ /* locate target parent stripe */
+ if (hash_type & LMV_HASH_FLAG_MIGRATION) {
+ /*
+ * fail check here to make sure top dir migration
+ * succeed.
+ */
+ if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
+ GOTO(out, rc = -EIO);
+ hash_type &= ~LMV_HASH_FLAG_MIGRATION;
+ count = le32_to_cpu(plmv->lmv_migrate_offset);
+ }
+ index = lmv_name_to_stripe_index(hash_type, count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (index < 0)
+ GOTO(out, rc = index);
+
+ fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+ tpobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(tpobj))
+ GOTO(out, rc = PTR_ERR(tpobj));
+
+ /* locate source parent stripe */
+ if (le32_to_cpu(plmv->lmv_hash_type) &
+ LMV_HASH_FLAG_MIGRATION) {
+ hash_type = le32_to_cpu(plmv->lmv_migrate_hash);
+ count = le32_to_cpu(plmv->lmv_stripe_count) -
+ le32_to_cpu(plmv->lmv_migrate_offset);
+
+ index = lmv_name_to_stripe_index(hash_type, count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (index < 0) {
+ mdd_object_put(env, tpobj);
+ GOTO(out, rc = index);
+ }
+
+ index += le32_to_cpu(plmv->lmv_migrate_offset);
+ fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+ spobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(spobj)) {
+ mdd_object_put(env, tpobj);
+ GOTO(out, rc = PTR_ERR(spobj));
+ }
+ } else {
+ spobj = tpobj;
+ mdd_object_get(spobj);
+ }
+ } else {
+ tpobj = pobj;
+ spobj = pobj;
+ mdd_object_get(tpobj);
+ mdd_object_get(spobj);
+ }
+
+ rc = mdd_la_get(env, spobj, spattr);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_la_get(env, tpobj, tpattr);
+ if (rc)
+ GOTO(out, rc);
+
+ if (S_ISDIR(attr->la_mode)) {
+ struct lmv_user_md_v1 *lmu = spec->u.sp_ea.eadata;
+
+ LASSERT(lmu);
+
+ /*
+ * if user use default value '0' for stripe_count, we need to
+ * adjust it to '1' to create a 1-stripe directory.
+ */
+ if (lmu->lum_stripe_count == 0) {
+ /* eadata is from request, don't alter it */
+ info->mti_lmu = *lmu;
+ info->mti_lmu.lum_stripe_count = cpu_to_le32(1);
+ spec->u.sp_ea.eadata = &info->mti_lmu;
+ lmu = spec->u.sp_ea.eadata;
+ }
+
+ rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
+ if (rc == -ENODATA) {
+ if (mdd_dir_is_empty(env, sobj) == 0) {
+ /*
+ * if sobj is empty, and target is not striped,
+ * create target as a normal directory.
+ */
+ if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
+ info->mti_lmu = *lmu;
+ info->mti_lmu.lum_stripe_count = 0;
+ spec->u.sp_ea.eadata = &info->mti_lmu;
+ lmu = spec->u.sp_ea.eadata;
+ }
+ } else {
+ /*
+ * sobj is not striped dir, if it's not empty,
+ * it will be migrated to be a stripe of target,
+ * don't destroy it after migration.
+ */
+ do_destroy = false;
+ }
+ } else if (rc) {
+ GOTO(out, rc);
+ } else {
+ struct lmv_mds_md_v1 *lmv = sbuf.lb_buf;
+
+ if (le32_to_cpu(lmv->lmv_hash_type) &
+ LMV_HASH_FLAG_MIGRATION) {
+ __u32 lum_stripe_count = lmu->lum_stripe_count;
+ __u32 lmv_hash_type = lmv->lmv_hash_type &
+ cpu_to_le32(LMV_HASH_TYPE_MASK);
+
+ if (!lum_stripe_count)
+ lum_stripe_count = cpu_to_le32(1);
+
+ /* TODO: check specific MDTs */
+ if (lmv->lmv_migrate_offset !=
+ lum_stripe_count ||
+ lmv->lmv_master_mdt_index !=
+ lmu->lum_stripe_offset ||
+ (lmv_hash_type != 0 &&
+ lmv_hash_type != lmu->lum_hash_type)) {
+ CERROR("%s: \'"DNAME"\' migration was "
+ "interrupted, run \'lfs migrate "
+ "-m %d -c %d -H %d "DNAME"\' to "
+ "finish migration.\n",
+ mdd2obd_dev(mdd)->obd_name,
+ PNAME(lname),
+ le32_to_cpu(
+ lmv->lmv_master_mdt_index),
+ le32_to_cpu(
+ lmv->lmv_migrate_offset),
+ le32_to_cpu(lmv_hash_type),
+ PNAME(lname));
+ GOTO(out, rc = -EPERM);
+ }
+ GOTO(out, rc = -EALREADY);
+ }
+ }
+ } else if (!mdd_object_remote(tpobj)) {
+ /*
+ * if source is already on MDT where target parent is located,
+ * no need to create, just update namespace.
+ */
+ do_create = false;
+ } else if (S_ISLNK(attr->la_mode)) {
+ lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
+ if (!sbuf.lb_buf)
+ GOTO(out, rc = -ENOMEM);
+ rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
+ if (rc <= 0) {
+ rc = rc ?: -EFAULT;
+ CERROR("%s: "DFID" readlink failed: rc = %d\n",
+ mdd_obj_dev_name(sobj),
+ PFID(mdd_object_fid(sobj)), rc);
+ GOTO(out, rc);
+ }
+ spec->u.sp_symname = sbuf.lb_buf;
+ } else if (S_ISREG(attr->la_mode)) {
+ spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
+ spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
+ }
+
+ /*
+ * if sobj has link on the same MDT, no need to create, just update
+ * namespace, and it will be a remote file on target parent, which is
+ * similar to rename.
+ */
+ rc = migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, lname, attr,
+ ldata);
+ if (rc > 0)
+ do_create = false;
+ else if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
+ spattr, tpattr, attr);
+ if (rc)
+ GOTO(out, rc);
+
+ mdd_object_make_hint(env, tpobj, tobj, attr, spec, hint);
+
handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ GOTO(out, rc = PTR_ERR(handle));
- rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
- lname, so_attr, p_la, ldata,
- handle);
- if (rc != 0) {
- /* If the migration can not be fit in one transaction, just
- * leave it in the original MDT */
- if (rc == -E2BIG)
- GOTO(stop_trans, rc = 0);
- else
+ if (do_create) {
+ rc = mdd_declare_migrate_create(env, tpobj, sobj, tobj, lname,
+ attr, &sbuf, ldata, spec, hint,
+ handle);
+ if (rc)
GOTO(stop_trans, rc);
}
- CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
- mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
- PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
- PFID(mdd_object_fid(mdd_tobj)));
-
- rc = mdd_trans_start(env, mdd, handle);
- if (rc != 0)
+ rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, tobj, lname,
+ attr, spattr, tpattr, ldata, do_create,
+ do_destroy, ma, handle);
+ if (rc)
GOTO(stop_trans, rc);
- /* Revert IMMUTABLE flag */
- la_flag->la_valid = LA_FLAGS;
- la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
- rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
- if (rc != 0)
+ rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
+ handle);
+ if (rc)
GOTO(stop_trans, rc);
- /* Remove source name from source directory */
- rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
- if (rc != 0)
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
GOTO(stop_trans, rc);
- if (ldata->ld_buf != NULL) {
- rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
- lname, ldata, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
-
- /* linkea update might decrease the source object
- * nlink, let's get the attr again after ref_del */
- rc = mdd_la_get(env, mdd_sobj, so_attr);
- if (rc != 0)
+ if (do_create) {
+ rc = mdd_migrate_create(env, tpobj, sobj, tobj, lname, attr,
+ &sbuf, ldata, spec, hint, handle);
+ if (rc)
GOTO(stop_trans, rc);
}
- if (S_ISREG(so_attr->la_mode)) {
- if (so_attr->la_nlink == 1) {
- rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
- handle);
- if (rc != 0 && rc != -ENODATA)
- GOTO(stop_trans, rc);
-
- rc = mdo_xattr_set(env, mdd_tobj, NULL,
- XATTR_NAME_FID,
- LU_XATTR_REPLACE, handle);
- if (rc < 0)
- GOTO(stop_trans, rc);
- }
- }
-
- /* Insert new fid with target name into target dir */
- rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
- mdd_object_type(mdd_tobj), name, handle);
- if (rc != 0)
+ rc = mdd_migrate_update(env, spobj, tpobj, sobj, tobj, lname, attr,
+ spattr, tpattr, ldata, do_create, do_destroy,
+ ma, handle);
+ if (rc)
GOTO(stop_trans, rc);
- linkea_add_buf(ldata, lname, mdd_object_fid(mdd_pobj));
- rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
- ldata, 1);
- if (rc != 0)
+ rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
+ mdd_object_fid(spobj), mdd_object_fid(sobj),
+ mdd_object_fid(tpobj), lname, lname,
+ handle);
+ if (rc)
GOTO(stop_trans, rc);
- mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
+ EXIT;
+stop_trans:
+ rc = mdd_trans_stop(env, mdd, rc, handle);
+out:
+ if (spobj && !IS_ERR(spobj))
+ mdd_object_put(env, spobj);
+ if (tpobj && !IS_ERR(tpobj))
+ mdd_object_put(env, tpobj);
+ lu_buf_free(&sbuf);
+ lu_buf_free(&pbuf);
+ return rc;
+}
- /* Increase mod_count to add the source object to the orphan list,
- * so if other clients still send RPC to the old object, then these
- * objects can help the request to find the new object, see
- * mdt_reint_open() */
- mdd_sobj->mod_count++;
- rc = mdd_finish_unlink(env, mdd_sobj, ma, mdd_pobj, lname, handle);
- mdd_sobj->mod_count--;
- if (rc != 0)
- GOTO(out_unlock, rc);
+static int __mdd_dir_declare_layout_shrink(const struct lu_env *env,
+ struct mdd_object *pobj,
+ struct mdd_object *obj,
+ struct mdd_object *stripe,
+ struct lu_attr *attr,
+ struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ struct lu_name *lname,
+ struct thandle *handle)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+ struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
+ struct lu_buf shrink_buf = { .lb_buf = lmu,
+ .lb_len = sizeof(*lmu) };
+ int rc;
- mdo_ref_del(env, mdd_sobj, handle);
- if (is_dir)
- mdo_ref_del(env, mdd_sobj, handle);
+ LASSERT(lmv);
- /* Get the attr again after ref_del */
- rc = mdd_la_get(env, mdd_sobj, so_attr);
- if (rc != 0)
- GOTO(out_unlock, rc);
+ memcpy(lmu, lmu_buf->lb_buf, sizeof(*lmu));
- ma->ma_attr = *so_attr;
- ma->ma_valid |= MA_INODE;
+ if (le32_to_cpu(lmu->lum_stripe_count) < 2)
+ lmu->lum_stripe_count = 0;
- rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
- if (rc != 0)
- GOTO(out_unlock, rc);
+ rc = mdd_dir_declare_layout_delete(env, obj, lmv_buf, &shrink_buf,
+ handle);
+ if (rc)
+ return rc;
- rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, mdd_tobj,
- mdo2fid(mdd_pobj), mdo2fid(mdd_sobj),
- mdo2fid(mdd_pobj), lname, lname, handle);
- if (rc != 0) {
- CWARN("%s: changelog for migrate %s "DFID
- "under "DFID" failed: rc = %d\n",
- mdd2obd_dev(mdd)->obd_name, lname->ln_name,
- PFID(mdd_object_fid(mdd_sobj)),
- PFID(mdd_object_fid(mdd_pobj)), rc);
- /* Sigh, there are no easy way to migrate back the object, so
- * let's reset the result to 0 for now XXX */
- rc = 0;
+ if (lmu->lum_stripe_count == 0) {
+ lmu->lum_stripe_count = cpu_to_le32(1);
+
+ rc = mdo_declare_xattr_del(env, obj, XATTR_NAME_LMV, handle);
+ if (rc)
+ return rc;
}
-out_unlock:
- mdd_write_unlock(env, mdd_sobj);
-stop_trans:
- mdd_trans_stop(env, mdd, rc, handle);
+ rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, &shrink_buf, handle,
+ mdd_dir_declare_destroy_stripe);
+ if (rc)
+ return rc;
- RETURN(rc);
-}
+ if (le32_to_cpu(lmu->lum_stripe_count) > 1)
+ return mdo_declare_xattr_set(env, obj, lmv_buf,
+ XATTR_NAME_LMV".set", 0, handle);
-static int mdd_fld_lookup(const struct lu_env *env, struct mdd_device *mdd,
- const struct lu_fid *fid, __u32 *mdt_index)
-{
- struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
- struct seq_server_site *ss;
- int rc;
+ rc = mdo_declare_index_insert(env, stripe, mdd_object_fid(pobj),
+ S_IFDIR, dotdot, handle);
+ if (rc)
+ return rc;
- ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
+ rc = mdd_iterate_xattrs(env, obj, stripe, false, handle,
+ mdo_declare_xattr_set);
+ if (rc)
+ return rc;
- range->lsr_flags = LU_SEQ_RANGE_MDT;
- rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
- if (rc != 0)
+ rc = mdo_declare_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
+ if (rc)
return rc;
- *mdt_index = range->lsr_index;
+ rc = mdo_declare_attr_set(env, stripe, attr, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(stripe),
+ attr->la_mode, lname->ln_name, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_ref_del(env, obj, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_ref_del(env, obj, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_destroy(env, obj, handle);
+ if (rc)
+ return rc;
+
+ return rc;
- return 0;
}
-/**
- * Check whether we should migrate the file/dir
- * return val
- * < 0 permission check failed or other error.
- * = 0 the file can be migrated.
- * > 0 the file does not need to be migrated, mostly
- * for multiple link file
- **/
-static int mdd_migrate_sanity_check(const struct lu_env *env,
- struct mdd_object *pobj,
- const struct lu_attr *pattr,
- struct mdd_object *sobj,
- struct lu_attr *sattr)
+
+/*
+ * after files under \a obj were migrated, shrink old stripes from \a obj,
+ * furthermore, if it becomes a 1-stripe directory, convert it to a normal one.
+ */
+static int __mdd_dir_layout_shrink(const struct lu_env *env,
+ struct mdd_object *pobj,
+ struct mdd_object *obj,
+ struct mdd_object *stripe,
+ struct lu_attr *attr,
+ struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ struct lu_name *lname,
+ struct thandle *handle)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct linkea_data *ldata = &info->mti_link_data;
- struct mdd_device *mdd = mdo2mdd(&pobj->mod_obj);
- int mgr_easize;
- struct lu_buf *mgr_buf;
- int count;
- int rc;
- __u64 mdt_index;
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+ struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
+ struct lu_buf shrink_buf = { .lb_buf = lmu,
+ .lb_len = sizeof(*lmu) };
+ int len = lmv_buf->lb_len;
+ __u32 version = le32_to_cpu(lmv->lmv_layout_version);
+ int rc;
+
ENTRY;
- mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
- mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
- if (mgr_buf->lb_buf == NULL)
- RETURN(-ENOMEM);
+ /* lmu needs to be altered, but lmu_buf is const */
+ memcpy(lmu, lmu_buf->lb_buf, sizeof(*lmu));
- rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
- if (rc > 0) {
- union lmv_mds_md *lmm = mgr_buf->lb_buf;
-
- /* If the object has migrateEA, it means IMMUTE flag
- * is being set by previous migration process, so it
- * needs to override the IMMUTE flag, otherwise the
- * following sanity check will fail */
- if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
- LMV_HASH_FLAG_MIGRATION) {
- struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
-
- sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
- CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
- mdd2obd_dev(mdd)->obd_name,
- PFID(mdd_object_fid(sobj)));
- }
- }
+ /*
+ * if dir will be shrunk to 1-stripe, delete all stripes, because it
+ * will be converted to normal dir.
+ */
+ if (le32_to_cpu(lmu->lum_stripe_count) == 1)
+ lmu->lum_stripe_count = 0;
- rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
- sobj, sattr, NULL, NULL);
- if (rc != 0)
+ /* delete stripes after lmu_stripe_count */
+ rc = mdd_dir_layout_delete(env, obj, lmv_buf, &shrink_buf, handle);
+ if (rc)
RETURN(rc);
- /* Then it will check if the file should be migrated. If the file
- * has mulitple links, we only need migrate the file if all of its
- * entries has been migrated to the remote MDT */
- if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
- RETURN(0);
+ if (lmu->lum_stripe_count == 0) {
+ lmu->lum_stripe_count = cpu_to_le32(1);
- rc = mdd_links_read(env, sobj, ldata);
- if (rc != 0) {
- /* For multiple links files, if there are no linkEA data at all,
- * means the file might be created before linkEA is enabled, and
- * all of its links should not be migrated yet, otherwise it
- * should have some linkEA there */
- if (rc == -ENOENT || rc == -ENODATA)
- RETURN(1);
+ /* delete LMV to avoid deleting stripes again upon destroy */
+ mdd_write_lock(env, obj, DT_SRC_CHILD);
+ rc = mdo_xattr_del(env, obj, XATTR_NAME_LMV, handle);
+ mdd_write_unlock(env, obj);
+ if (rc)
+ RETURN(rc);
+ }
+
+ /* destroy stripes after lmu_stripe_count */
+ mdd_write_lock(env, obj, DT_SRC_PARENT);
+ rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, &shrink_buf, handle,
+ mdd_dir_destroy_stripe);
+ mdd_write_unlock(env, obj);
+
+ if (le32_to_cpu(lmu->lum_stripe_count) > 1) {
+ /* update dir LMV, that's all if it's still striped. */
+ lmv->lmv_stripe_count = lmu->lum_stripe_count;
+ lmv->lmv_hash_type &= ~cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
+ lmv->lmv_migrate_offset = 0;
+ lmv->lmv_migrate_hash = 0;
+ lmv->lmv_layout_version = cpu_to_le32(++version);
+
+ lmv_buf->lb_len = sizeof(*lmv);
+ rc = mdo_xattr_set(env, obj, lmv_buf, XATTR_NAME_LMV".set", 0,
+ handle);
+ lmv_buf->lb_len = len;
RETURN(rc);
}
- mdt_index = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site->ss_node_id;
- /* If there are still links locally, then the file will not be
- * migrated. */
- LASSERT(ldata->ld_leh != NULL);
- ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
- for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
- struct lu_name lname;
- struct lu_fid fid;
- __u32 parent_mdt_index;
-
- linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
- &lname, &fid);
- ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
- ldata->ld_reclen);
-
- rc = mdd_fld_lookup(env, mdd, &fid, &parent_mdt_index);
- if (rc != 0)
- RETURN(rc);
+ /* replace directory with its remaining stripe */
+ LASSERT(pobj);
+ LASSERT(stripe);
- /* Migrate the object only if none of its parents are on the
- * current MDT. */
- if (parent_mdt_index != mdt_index)
- continue;
+ mdd_write_lock(env, pobj, DT_SRC_PARENT);
+ mdd_write_lock(env, obj, DT_SRC_CHILD);
- CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
- PFID(mdd_object_fid(sobj)), lname.ln_namelen,
- lname.ln_name, PFID(&fid));
- rc = 1;
- break;
- }
+ /* insert dotdot to stripe which points to parent */
+ rc = __mdd_index_insert_only(env, stripe, mdd_object_fid(pobj),
+ S_IFDIR, dotdot, handle);
+ if (rc)
+ GOTO(out, rc);
- RETURN(rc);
+ /* copy xattrs including linkea */
+ rc = mdd_iterate_xattrs(env, obj, stripe, false, handle, mdo_xattr_set);
+ if (rc)
+ GOTO(out, rc);
+
+ /* delete LMV */
+ rc = mdo_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ /* don't set nlink from parent */
+ attr->la_valid &= ~LA_NLINK;
+
+ rc = mdo_attr_set(env, stripe, attr, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ /* delete dir name from parent */
+ rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ /* insert stripe to parent with dir name */
+ rc = __mdd_index_insert_only(env, pobj, mdd_object_fid(stripe),
+ attr->la_mode, lname->ln_name, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ /* destroy dir obj */
+ rc = mdo_ref_del(env, obj, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdo_ref_del(env, obj, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdo_destroy(env, obj, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ EXIT;
+out:
+ mdd_write_unlock(env, obj);
+ mdd_write_unlock(env, pobj);
+
+ return rc;
}
-static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
- struct md_object *sobj, const struct lu_name *lname,
- struct md_object *tobj, struct md_attr *ma)
+/*
+ * shrink directory stripes to lum_stripe_count specified by lum_mds_md.
+ */
+int mdd_dir_layout_shrink(const struct lu_env *env,
+ struct md_object *md_obj,
+ const struct lu_buf *lmu_buf)
{
- struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
- struct mdd_device *mdd = mdo2mdd(pobj);
- struct mdd_object *mdd_sobj = md2mdd_obj(sobj);
- struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
- struct lu_attr *so_attr = MDD_ENV_VAR(env, cattr);
- struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
- int rc;
+ struct mdd_device *mdd = mdo2mdd(md_obj);
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct mdd_object *obj = md2mdd_obj(md_obj);
+ struct mdd_object *pobj = NULL;
+ struct mdd_object *stripe = NULL;
+ struct lu_attr *attr = &info->mti_pattr;
+ struct lu_fid *fid = &info->mti_fid2;
+ struct lu_name lname = { NULL };
+ struct lu_buf lmv_buf = { NULL };
+ struct lmv_mds_md_v1 *lmv;
+ struct lmv_user_md *lmu;
+ struct thandle *handle;
+ int rc;
ENTRY;
- /* If the file will being migrated, it will check whether
- * the file is being opened by someone else right now */
- mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
- if (mdd_sobj->mod_count > 0) {
- CERROR("%s: "DFID"%s is already opened count %d: rc = %d\n",
- mdd2obd_dev(mdd)->obd_name,
- PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
- mdd_sobj->mod_count, -EBUSY);
- mdd_read_unlock(env, mdd_sobj);
- GOTO(put, rc = -EBUSY);
- }
- mdd_read_unlock(env, mdd_sobj);
- rc = mdd_la_get(env, mdd_sobj, so_attr);
- if (rc != 0)
- GOTO(put, rc);
+ rc = mdd_la_get(env, obj, attr);
+ if (rc)
+ RETURN(rc);
- rc = mdd_la_get(env, mdd_pobj, pattr);
- if (rc != 0)
- GOTO(put, rc);
+ if (!S_ISDIR(attr->la_mode))
+ RETURN(-ENOTDIR);
- rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
- if (rc != 0) {
- if (rc > 0)
- rc = 0;
- GOTO(put, rc);
- }
+ rc = mdd_stripe_get(env, obj, &lmv_buf, XATTR_NAME_LMV);
+ if (rc < 0)
+ RETURN(rc);
- /* Sigh, it is impossible to finish all of migration in a single
- * transaction, for example migrating big directory entries to the
- * new MDT, it needs insert all of name entries of children in the
- * new directory.
- *
- * So migration will be done in multiple steps and transactions.
- *
- * 1. create an orphan object on the remote MDT in one transaction.
- * 2. migrate extend attributes to the new target file/directory.
- * 3. For directory, migrate the entries to the new MDT and update
- * linkEA of each children. Because we can not migrate all entries
- * in a single transaction, so the migrating directory will become
- * a striped directory during migration, so once the process is
- * interrupted, the directory is still accessible. (During lookup,
- * client will locate the name by searching both original and target
- * object).
- * 4. Finally, update the name/FID to point to the new file/directory
- * in a separate transaction.
+ lmv = lmv_buf.lb_buf;
+ lmu = lmu_buf->lb_buf;
+
+ /* this was checked in MDT */
+ LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
+ le32_to_cpu(lmv->lmv_stripe_count));
+
+ rc = mdd_dir_iterate_stripes(env, obj, &lmv_buf, lmu_buf, NULL,
+ mdd_shrink_stripe_is_empty);
+ if (rc < 0)
+ GOTO(out, rc);
+ else if (rc != 0)
+ GOTO(out, rc = -ENOTEMPTY);
+
+ /*
+ * if obj stripe count will be shrunk to 1, we need to convert it to a
+ * normal dir, which will change its fid and update parent namespace,
+ * get obj name and parent fid from linkea.
*/
+ if (le32_to_cpu(lmu->lum_stripe_count) < 2) {
+ struct linkea_data *ldata = &info->mti_link_data;
+ char *filename = info->mti_name;
- /* step 1: Check whether the orphan object has been created, and create
- * orphan object on the remote MDT if needed */
- if (!mdd_object_exists(mdd_tobj)) {
- rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
- lname, so_attr);
- if (rc != 0)
- GOTO(put, rc);
- }
+ rc = mdd_links_read(env, obj, ldata);
+ if (rc)
+ GOTO(out, rc);
- LASSERT(mdd_object_exists(mdd_tobj));
- /* step 2: migrate xattr */
- rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
- if (rc != 0)
- GOTO(put, rc);
+ if (ldata->ld_leh->leh_reccount > 1)
+ GOTO(out, rc = -EINVAL);
- /* step 3: migrate name entries to the orphan object */
- if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
- rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
- if (rc != 0)
- GOTO(put, rc);
- if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
- OBD_FAIL_MDS_REINT_NET_REP)))
- GOTO(put, rc = 0);
- } else {
- OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
+ linkea_first_entry(ldata);
+ if (!ldata->ld_lee)
+ GOTO(out, rc = -ENODATA);
+
+ linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
+ fid);
+
+ /* Note: lname might miss \0 at the end */
+ snprintf(filename, sizeof(info->mti_name), "%.*s",
+ lname.ln_namelen, lname.ln_name);
+ lname.ln_name = filename;
+
+ pobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(pobj)) {
+ rc = PTR_ERR(pobj);
+ pobj = NULL;
+ GOTO(out, rc);
+ }
+
+ fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
+
+ stripe = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(stripe)) {
+ mdd_object_put(env, pobj);
+ pobj = NULL;
+ GOTO(out, rc = PTR_ERR(stripe));
+ }
}
- LASSERT(mdd_object_exists(mdd_tobj));
- /* step 4: update name entry to the new object */
- rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
- ma);
- if (rc != 0)
- GOTO(put, rc);
-put:
- RETURN(rc);
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ GOTO(out, rc = PTR_ERR(handle));
+
+ rc = __mdd_dir_declare_layout_shrink(env, pobj, obj, stripe, attr,
+ &lmv_buf, lmu_buf, &lname, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
+ handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ rc = __mdd_dir_layout_shrink(env, pobj, obj, stripe, attr, &lmv_buf,
+ lmu_buf, &lname, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
+ XATTR_NAME_LMV, handle);
+ GOTO(stop_trans, rc);
+
+stop_trans:
+ rc = mdd_trans_stop(env, mdd, rc, handle);
+out:
+ if (pobj) {
+ mdd_object_put(env, stripe);
+ mdd_object_put(env, pobj);
+ }
+ lu_buf_free(&lmv_buf);
+ return rc;
}
const struct md_dir_operations mdd_dir_ops = {