* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*
#define DEBUG_SUBSYSTEM S_MDS
#include <linux/module.h>
-#ifdef HAVE_EXT4_LDISKFS
-#include <ldiskfs/ldiskfs_jbd2.h>
-#else
-#include <linux/jbd.h>
-#endif
#include <obd.h>
#include <obd_class.h>
#include <obd_support.h>
#include <lprocfs_status.h>
/* fid_be_cpu(), fid_cpu_to_be(). */
#include <lustre_fid.h>
+#include <obd_lov.h>
#include <lustre_param.h>
-#ifdef HAVE_EXT4_LDISKFS
-#include <ldiskfs/ldiskfs.h>
-#else
-#include <linux/ldiskfs_fs.h>
-#endif
#include <lustre_mds.h>
#include <lustre/lustre_idl.h>
{
if (buf == NULL || buf->lb_buf == NULL)
return;
- if (buf->lb_vmalloc)
- OBD_VFREE(buf->lb_buf, buf->lb_len);
- else
- OBD_FREE(buf->lb_buf, buf->lb_len);
+ OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
buf->lb_buf = NULL;
+ buf->lb_len = 0;
}
const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
return buf;
}
-#define BUF_VMALLOC_SIZE (CFS_PAGE_SIZE<<2) /* 16k */
struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
{
struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
- if (buf->lb_vmalloc)
- OBD_VFREE(buf->lb_buf, buf->lb_len);
- else
- OBD_FREE(buf->lb_buf, buf->lb_len);
+ OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
buf->lb_buf = NULL;
}
if (buf->lb_buf == NULL) {
buf->lb_len = len;
- if (buf->lb_len <= BUF_VMALLOC_SIZE) {
- OBD_ALLOC(buf->lb_buf, buf->lb_len);
- buf->lb_vmalloc = 0;
- }
- if (buf->lb_buf == NULL) {
- OBD_VMALLOC(buf->lb_buf, buf->lb_len);
- buf->lb_vmalloc = 1;
- }
+ OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
if (buf->lb_buf == NULL)
buf->lb_len = 0;
}
struct lu_buf buf;
LASSERT(len >= oldbuf->lb_len);
- if (len > BUF_VMALLOC_SIZE) {
- OBD_VMALLOC(buf.lb_buf, len);
- buf.lb_vmalloc = 1;
- } else {
- OBD_ALLOC(buf.lb_buf, len);
- buf.lb_vmalloc = 0;
- }
+ OBD_ALLOC_LARGE(buf.lb_buf, len);
+
if (buf.lb_buf == NULL)
return -ENOMEM;
buf.lb_len = len;
memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
- if (oldbuf->lb_vmalloc)
- OBD_VFREE(oldbuf->lb_buf, oldbuf->lb_len);
- else
- OBD_FREE(oldbuf->lb_buf, oldbuf->lb_len);
+ OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
memcpy(oldbuf, &buf, sizeof(buf));
max_cookie_size = mdd_lov_cookiesize(env, mdd);
if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) {
if (mti->mti_max_cookie)
- OBD_FREE(mti->mti_max_cookie, mti->mti_max_cookie_size);
+ OBD_FREE_LARGE(mti->mti_max_cookie,
+ mti->mti_max_cookie_size);
mti->mti_max_cookie = NULL;
mti->mti_max_cookie_size = 0;
}
if (unlikely(mti->mti_max_cookie == NULL)) {
- OBD_ALLOC(mti->mti_max_cookie, max_cookie_size);
+ OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size);
if (likely(mti->mti_max_cookie != NULL))
mti->mti_max_cookie_size = max_cookie_size;
}
max_lmm_size = mdd_lov_mdsize(env, mdd);
if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) {
if (mti->mti_max_lmm)
- OBD_FREE(mti->mti_max_lmm, mti->mti_max_lmm_size);
+ OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size);
mti->mti_max_lmm = NULL;
mti->mti_max_lmm_size = 0;
}
if (unlikely(mti->mti_max_lmm == NULL)) {
- OBD_ALLOC(mti->mti_max_lmm, max_lmm_size);
- if (unlikely(mti->mti_max_lmm != NULL))
+ OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size);
+ if (likely(mti->mti_max_lmm != NULL))
mti->mti_max_lmm_size = max_lmm_size;
}
return mti->mti_max_lmm;
{
struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
- "valid=%x, cltime=%llu, flags=%lx)",
+ "valid=%x, cltime="LPU64", flags=%lx)",
mdd, mdd->mod_count, mdd->mod_valid,
mdd->mod_cltime, mdd->mod_flags);
}
if (obj == NULL)
GOTO(out, rc = -EREMOTE);
if (IS_ERR(obj))
- GOTO(out, rc = -PTR_ERR(obj));
+ GOTO(out, rc = PTR_ERR(obj));
/* get child fid from parent and name */
rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
mdd_object_put(env, obj);
if (mdd_obj == NULL)
GOTO(out, rc = -EREMOTE);
if (IS_ERR(mdd_obj))
- GOTO(out, rc = -PTR_ERR(mdd_obj));
+ GOTO(out, rc = PTR_ERR(mdd_obj));
rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
if (rc <= 0) {
mdd_object_put(env, mdd_obj);
/* Verify that our path hasn't changed since we started the lookup.
Record the current index, and verify the path resolves to the
same fid. If it does, then the path is correct as of this index. */
- spin_lock(&mdd->mdd_cl.mc_lock);
+ cfs_spin_lock(&mdd->mdd_cl.mc_lock);
pli->pli_currec = mdd->mdd_cl.mc_index;
- spin_unlock(&mdd->mdd_cl.mc_lock);
+ cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
if (rc) {
CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
PFID(&pli->pli_fid));
GOTO(out, rc = -EAGAIN);
}
-
+ ptr++; /* skip leading / */
memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
EXIT;
out:
- if (buf && !IS_ERR(buf) && buf->lb_vmalloc)
+ if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
/* if we vmalloced a large buffer drop it */
mdd_buf_put(buf);
RETURN(-EOVERFLOW);
if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
- path[0] = '/';
- path[1] = '\0';
+ path[0] = '\0';
RETURN(0);
}
RETURN(rc);
}
-int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm,
- int *size)
+int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm)
{
struct lov_desc *ldesc;
struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj);
+ struct lov_user_md *lum = (struct lov_user_md*)lmm;
ENTRY;
+ if (!lum)
+ RETURN(0);
+
ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
LASSERT(ldesc != NULL);
- if (!lmm)
- RETURN(0);
+ lum->lmm_magic = LOV_MAGIC_V1;
+ lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT;
+ lum->lmm_pattern = ldesc->ld_pattern;
+ lum->lmm_stripe_size = ldesc->ld_default_stripe_size;
+ lum->lmm_stripe_count = ldesc->ld_default_stripe_count;
+ lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset;
- lmm->lmm_magic = LOV_MAGIC_V1;
- lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
- lmm->lmm_pattern = ldesc->ld_pattern;
- lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
- lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
- *size = sizeof(struct lov_mds_md);
+ RETURN(sizeof(*lum));
+}
- RETURN(sizeof(struct lov_mds_md));
+static int is_rootdir(struct mdd_object *mdd_obj)
+{
+ const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj);
+ const struct lu_fid *fid = mdo2fid(mdd_obj);
+
+ return lu_fid_eq(&mdd_dev->mdd_root_fid, fid);
}
/* get lov EA only */
rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size,
XATTR_NAME_LOV);
-
- if (rc == 0 && (ma->ma_need & MA_LOV_DEF)) {
- rc = mdd_get_default_md(mdd_obj, ma->ma_lmm,
- &ma->ma_lmm_size);
- }
-
+ if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj))
+ rc = mdd_get_default_md(mdd_obj, ma->ma_lmm);
if (rc > 0) {
- ma->ma_valid |= MA_LOV;
+ ma->ma_lmm_size = rc;
+ ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen;
+ ma->ma_valid |= MA_LOV | MA_LAY_GEN;
rc = 0;
}
RETURN(rc);
}
+/* get the first parent fid from link EA */
+static int mdd_pfid_get(const struct lu_env *env,
+ struct mdd_object *mdd_obj, struct md_attr *ma)
+{
+ struct lu_buf *buf;
+ struct link_ea_header *leh;
+ struct link_ea_entry *lee;
+ struct lu_fid *pfid = &ma->ma_pfid;
+ ENTRY;
+
+ if (ma->ma_valid & MA_PFID)
+ RETURN(0);
+
+ buf = mdd_links_get(env, mdd_obj);
+ if (IS_ERR(buf))
+ RETURN(PTR_ERR(buf));
+
+ leh = buf->lb_buf;
+ lee = (struct link_ea_entry *)(leh + 1);
+ memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
+ fid_be_to_cpu(pfid, pfid);
+ ma->ma_valid |= MA_PFID;
+ if (buf->lb_len > OBD_ALLOC_BIG)
+ /* if we vmalloced a large buffer drop it */
+ mdd_buf_put(buf);
+ RETURN(0);
+}
+
int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
struct md_attr *ma)
{
RETURN(rc);
}
-static int mdd_attr_get_internal(const struct lu_env *env,
- struct mdd_object *mdd_obj,
+static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj,
+ struct md_attr *ma)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lustre_mdt_attrs *lma =
+ (struct lustre_mdt_attrs *)info->mti_xattr_buf;
+ int lma_size;
+ int rc;
+ ENTRY;
+
+ /* If all needed data are already valid, nothing to do */
+ if ((ma->ma_valid & (MA_HSM | MA_SOM)) ==
+ (ma->ma_need & (MA_HSM | MA_SOM)))
+ RETURN(0);
+
+ /* Read LMA from disk EA */
+ lma_size = sizeof(info->mti_xattr_buf);
+ rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA);
+ if (rc <= 0)
+ RETURN(rc);
+
+ /* Useless to check LMA incompatibility because this is already done in
+ * osd_ea_fid_get(), and this will fail long before this code is
+ * called.
+ * So, if we are here, LMA is compatible.
+ */
+
+ lustre_lma_swab(lma);
+
+ /* Swab and copy LMA */
+ if (ma->ma_need & MA_HSM) {
+ if (lma->lma_compat & LMAC_HSM)
+ ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK;
+ else
+ ma->ma_hsm.mh_flags = 0;
+ ma->ma_valid |= MA_HSM;
+ }
+
+ /* Copy SOM */
+ if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) {
+ LASSERT(ma->ma_som != NULL);
+ ma->ma_som->msd_ioepoch = lma->lma_ioepoch;
+ ma->ma_som->msd_size = lma->lma_som_size;
+ ma->ma_som->msd_blocks = lma->lma_som_blocks;
+ ma->ma_som->msd_mountid = lma->lma_som_mountid;
+ ma->ma_valid |= MA_SOM;
+ }
+
+ RETURN(0);
+}
+
+int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
struct md_attr *ma)
{
int rc = 0;
S_ISDIR(mdd_object_type(mdd_obj)))
rc = __mdd_lmm_get(env, mdd_obj, ma);
}
+ if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) {
+ if (S_ISREG(mdd_object_type(mdd_obj)))
+ rc = mdd_pfid_get(env, mdd_obj, ma);
+ }
if (rc == 0 && ma->ma_need & MA_LMV) {
if (S_ISDIR(mdd_object_type(mdd_obj)))
rc = __mdd_lmv_get(env, mdd_obj, ma);
}
+ if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) {
+ if (S_ISREG(mdd_object_type(mdd_obj)))
+ rc = __mdd_lma_get(env, mdd_obj, ma);
+ }
#ifdef CONFIG_FS_POSIX_ACL
if (rc == 0 && ma->ma_need & MA_ACL_DEF) {
if (S_ISDIR(mdd_object_type(mdd_obj)))
rc = mdd_def_acl_get(env, mdd_obj, ma);
}
#endif
- CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
- rc, ma->ma_valid);
+ CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
+ rc, ma->ma_valid, ma->ma_lmm);
RETURN(rc);
}
struct mdd_object *mdd_obj, struct md_attr *ma)
{
int rc;
- int needlock = ma->ma_need & (MA_LOV | MA_LMV | MA_ACL_DEF);
+ int needlock = ma->ma_need &
+ (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID);
if (needlock)
mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
RETURN(rc);
}
+int mdd_declare_object_create_internal(const struct lu_env *env,
+ struct mdd_object *p,
+ struct mdd_object *c,
+ struct md_attr *ma,
+ struct thandle *handle,
+ const struct md_op_spec *spec)
+{
+ struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
+ const struct dt_index_features *feat = spec->sp_feat;
+ int rc;
+ ENTRY;
+
+ if (feat != &dt_directory_features && feat != NULL)
+ dof->dof_type = DFT_INDEX;
+ else
+ dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
+
+ dof->u.dof_idx.di_feat = feat;
+
+ rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
+
+ RETURN(rc);
+}
+
int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
struct mdd_object *c, struct md_attr *ma,
struct thandle *handle,
struct lu_attr *la, const struct md_attr *ma)
{
struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
- struct md_ucred *uc = md_ucred(env);
+ struct md_ucred *uc;
int rc;
ENTRY;
if (la->la_valid & (LA_NLINK | LA_RDEV | LA_BLKSIZE))
RETURN(-EPERM);
+ /* export destroy does not have ->le_ses, but we may want
+ * to drop LUSTRE_SOM_FL. */
+ if (!env->le_ses)
+ RETURN(0);
+
+ uc = md_ucred(env);
+
rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
if (rc)
RETURN(rc);
if (la->la_valid == LA_ATIME) {
/* This is atime only set for read atime update on close. */
- if (la->la_atime <= tmp_la->la_atime +
- mdd_obj2mdd_dev(obj)->mdd_atime_diff)
+ if (la->la_atime >= tmp_la->la_atime &&
+ la->la_atime < (tmp_la->la_atime +
+ mdd_obj2mdd_dev(obj)->mdd_atime_diff))
la->la_valid &= ~LA_ATIME;
RETURN(0);
}
}
}
+ if (la->la_valid & LA_KILL_SUID) {
+ la->la_valid &= ~LA_KILL_SUID;
+ if ((tmp_la->la_mode & S_ISUID) &&
+ !(la->la_valid & LA_MODE)) {
+ la->la_mode = tmp_la->la_mode;
+ la->la_valid |= LA_MODE;
+ }
+ la->la_mode &= ~S_ISUID;
+ }
+
+ if (la->la_valid & LA_KILL_SGID) {
+ la->la_valid &= ~LA_KILL_SGID;
+ if (((tmp_la->la_mode & (S_ISGID | S_IXGRP)) ==
+ (S_ISGID | S_IXGRP)) &&
+ !(la->la_valid & LA_MODE)) {
+ la->la_mode = tmp_la->la_mode;
+ la->la_valid |= LA_MODE;
+ }
+ la->la_mode &= ~S_ISGID;
+ }
+
/* Make sure a caller can chmod. */
if (la->la_valid & LA_MODE) {
- /* Bypass la_vaild == LA_MODE,
- * this is for changing file with SUID or SGID. */
- if ((la->la_valid & ~LA_MODE) &&
- !(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
+ if (!(ma->ma_attr_flags & MDS_PERM_BYPASS) &&
(uc->mu_fsuid != tmp_la->la_uid) &&
!mdd_capable(uc, CFS_CAP_FOWNER))
RETURN(-EPERM);
- if (la->la_mode == (umode_t) -1)
+ if (la->la_mode == (cfs_umode_t) -1)
la->la_mode = tmp_la->la_mode;
else
la->la_mode = (la->la_mode & S_IALLUGO) |
static int mdd_changelog_data_store(const struct lu_env *env,
struct mdd_device *mdd,
enum changelog_rec_type type,
+ int flags,
struct mdd_object *mdd_obj,
struct thandle *handle)
{
const struct lu_fid *tfid = mdo2fid(mdd_obj);
struct llog_changelog_rec *rec;
+ struct thandle *th = NULL;
struct lu_buf *buf;
int reclen;
int rc;
+ /* Not recording */
if (!(mdd->mdd_cl.mc_flags & CLM_ON))
RETURN(0);
+ if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
+ RETURN(0);
- LASSERT(handle != NULL);
LASSERT(mdd_obj != NULL);
+ LASSERT(handle != NULL);
- if ((type == CL_SETATTR) &&
+ if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
/* Don't need multiple updates in this log */
/* Don't check under lock - no big deal if we get an extra
RETURN(-ENOMEM);
rec = (struct llog_changelog_rec *)buf->lb_buf;
- rec->cr.cr_flags = CLF_VERSION;
+ rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
rec->cr.cr_type = (__u32)type;
rec->cr.cr_tfid = *tfid;
rec->cr.cr_namelen = 0;
mdd_obj->mod_cltime = cfs_time_current_64();
- rc = mdd_changelog_llog_write(mdd, rec, handle);
+ rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
+
+ if (th)
+ mdd_trans_stop(env, mdd, rc, th);
+
if (rc < 0) {
CERROR("changelog failed: rc=%d op%d t"DFID"\n",
rc, type, PFID(tfid));
return 0;
}
+int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
+ int flags, struct md_object *obj)
+{
+ struct thandle *handle;
+ struct mdd_object *mdd_obj = md2mdd_obj(obj);
+ struct mdd_device *mdd = mdo2mdd(obj);
+ int rc;
+ ENTRY;
+
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ return(PTR_ERR(handle));
+
+ rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
+ handle);
+
+stop:
+ mdd_trans_stop(env, mdd, rc, handle);
+
+ RETURN(rc);
+}
+
+/**
+ * Should be called with write lock held.
+ *
+ * \see mdd_lma_set_locked().
+ */
+static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj,
+ const struct md_attr *ma, struct thandle *handle)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lu_buf *buf;
+ struct lustre_mdt_attrs *lma =
+ (struct lustre_mdt_attrs *) info->mti_xattr_buf;
+ int lmasize = sizeof(struct lustre_mdt_attrs);
+ int rc = 0;
+
+ ENTRY;
+
+ /* Either HSM or SOM part is not valid, we need to read it before */
+ if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) {
+ rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA);
+ if (rc <= 0)
+ RETURN(rc);
+
+ lustre_lma_swab(lma);
+ } else {
+ memset(lma, 0, lmasize);
+ }
+
+ /* Copy HSM data */
+ if (ma->ma_valid & MA_HSM) {
+ lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK;
+ lma->lma_compat |= LMAC_HSM;
+ }
+
+ /* Copy SOM data */
+ if (ma->ma_valid & MA_SOM) {
+ LASSERT(ma->ma_som != NULL);
+ if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) {
+ lma->lma_compat &= ~LMAC_SOM;
+ } else {
+ lma->lma_compat |= LMAC_SOM;
+ lma->lma_ioepoch = ma->ma_som->msd_ioepoch;
+ lma->lma_som_size = ma->ma_som->msd_size;
+ lma->lma_som_blocks = ma->ma_som->msd_blocks;
+ lma->lma_som_mountid = ma->ma_som->msd_mountid;
+ }
+ }
+
+ /* Copy FID */
+ memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid));
+
+ lustre_lma_swab(lma);
+ buf = mdd_buf_get(env, lma, lmasize);
+ rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle);
+
+ RETURN(rc);
+}
+
+/**
+ * Save LMA extended attributes with data from \a ma.
+ *
+ * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if
+ * not, LMA EA will be first read from disk, modified and write back.
+ *
+ */
+static int mdd_lma_set_locked(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ const struct md_attr *ma, struct thandle *handle)
+{
+ int rc;
+
+ mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+ rc = __mdd_lma_set(env, mdd_obj, ma, handle);
+ mdd_write_unlock(env, mdd_obj);
+ return rc;
+}
+
+/* Precedence for choosing record type when multiple
+ * attributes change: setattr > mtime > ctime > atime
+ * (ctime changes when mtime does, plus chmod/chown.
+ * atime and ctime are independent.) */
+static int mdd_attr_set_changelog(const struct lu_env *env,
+ struct md_object *obj, struct thandle *handle,
+ __u64 valid)
+{
+ struct mdd_device *mdd = mdo2mdd(obj);
+ int bits, type = 0;
+
+ bits = (valid & ~(LA_CTIME|LA_MTIME|LA_ATIME)) ? 1 << CL_SETATTR : 0;
+ bits |= (valid & LA_MTIME) ? 1 << CL_MTIME : 0;
+ bits |= (valid & LA_CTIME) ? 1 << CL_CTIME : 0;
+ bits |= (valid & LA_ATIME) ? 1 << CL_ATIME : 0;
+ bits = bits & mdd->mdd_cl.mc_mask;
+ if (bits == 0)
+ return 0;
+
+ /* The record type is the lowest non-masked set bit */
+ while (bits && ((bits & 1) == 0)) {
+ bits = bits >> 1;
+ type++;
+ }
+
+ /* FYI we only store the first CLF_FLAGMASK bits of la_valid */
+ return mdd_changelog_data_store(env, mdd, type, (int)valid,
+ md2mdd_obj(obj), handle);
+}
+
+static int mdd_declare_attr_set(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *obj,
+ const struct md_attr *ma,
+ struct lov_mds_md *lmm,
+ struct thandle *handle)
+{
+ struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
+ int rc, i;
+
+ rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+ if (rc)
+ return rc;
+
+ if (ma->ma_valid & MA_LOV) {
+ buf->lb_buf = NULL;
+ buf->lb_len = ma->ma_lmm_size;
+ rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
+ 0, handle);
+ if (rc)
+ return rc;
+ }
+
+ if (ma->ma_valid & (MA_HSM | MA_SOM)) {
+ buf->lb_buf = NULL;
+ buf->lb_len = sizeof(struct lustre_mdt_attrs);
+ rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
+ 0, handle);
+ if (rc)
+ return rc;
+ }
+
+#ifdef CONFIG_FS_POSIX_ACL
+ if (ma->ma_attr.la_valid & LA_MODE) {
+ mdd_read_lock(env, obj, MOR_TGT_CHILD);
+ rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_ACL_ACCESS,
+ BYPASS_CAPA);
+ mdd_read_unlock(env, obj);
+ if (rc == -EOPNOTSUPP || rc == -ENODATA)
+ rc = 0;
+ else if (rc < 0)
+ return rc;
+
+ if (rc != 0) {
+ buf->lb_buf = NULL;
+ buf->lb_len = rc;
+ rc = mdo_declare_xattr_set(env, obj, buf,
+ XATTR_NAME_ACL_ACCESS, 0,
+ handle);
+ if (rc)
+ return rc;
+ }
+ }
+#endif
+
+ /* basically the log is the same as in unlink case */
+ if (lmm) {
+ __u16 stripe;
+
+ if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
+ le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
+ CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
+ mdd->mdd_obd_dev->obd_name,
+ le32_to_cpu(lmm->lmm_magic),
+ PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
+ return -EINVAL;
+ }
+
+ stripe = le16_to_cpu(lmm->lmm_stripe_count);
+ if (stripe == LOV_ALL_STRIPES) {
+ struct lov_desc *ldesc;
+
+ ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc;
+ LASSERT(ldesc != NULL);
+ stripe = ldesc->ld_tgt_count;
+ }
+
+ for (i = 0; i < stripe; i++) {
+ rc = mdd_declare_llog_record(env, mdd,
+ sizeof(struct llog_unlink_rec),
+ handle);
+ if (rc)
+ return rc;
+ }
+ }
+
+ return rc;
+}
+
/* set attr and LOV EA at once, return updated attr */
static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
const struct md_attr *ma)
struct llog_cookie *logcookies = NULL;
int rc, lmm_size = 0, cookie_size = 0;
struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
-#ifdef HAVE_QUOTA_SUPPORT
struct obd_device *obd = mdd->mdd_obd_dev;
- struct obd_export *exp = md_quota(env)->mq_exp;
struct mds_obd *mds = &obd->u.mds;
+#ifdef HAVE_QUOTA_SUPPORT
unsigned int qnids[MAXQUOTAS] = { 0, 0 };
unsigned int qoids[MAXQUOTAS] = { 0, 0 };
int quota_opc = 0, block_count = 0;
#endif
ENTRY;
- mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
- MDD_TXN_ATTR_SET_OP);
- handle = mdd_trans_start(env, mdd);
- if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
- /*TODO: add lock here*/
- /* start a log jounal handle if needed */
+ *la_copy = ma->ma_attr;
+ rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* setattr on "close" only change atime, or do nothing */
+ if (ma->ma_valid == MA_INODE &&
+ ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
+ RETURN(0);
+
if (S_ISREG(mdd_object_type(mdd_obj)) &&
ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
lmm_size = mdd_lov_mdsize(env, mdd);
lmm = mdd_max_lmm_get(env, mdd);
if (lmm == NULL)
- GOTO(cleanup, rc = -ENOMEM);
+ RETURN(-ENOMEM);
rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
XATTR_NAME_LOV);
if (rc < 0)
- GOTO(cleanup, rc);
+ RETURN(rc);
}
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ RETURN(PTR_ERR(handle));
+
+ rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
+ lmm_size > 0 ? lmm : NULL, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ /* permission changes may require sync operation */
+ if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+ handle->th_sync = !!mdd->mdd_sync_permission;
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ /* permission changes may require sync operation */
+ if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+ handle->th_sync |= mdd->mdd_sync_permission;
+
if (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME))
CDEBUG(D_INODE, "setting mtime "LPU64", ctime "LPU64"\n",
ma->ma_attr.la_mtime, ma->ma_attr.la_ctime);
- *la_copy = ma->ma_attr;
- rc = mdd_fix_attr(env, mdd_obj, la_copy, ma);
- if (rc)
- GOTO(cleanup, rc);
-
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota && la_copy->la_valid & (LA_UID | LA_GID)) {
+ struct obd_export *exp = md_quota(env)->mq_exp;
struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
}
if (rc == 0 && ma->ma_valid & MA_LOV) {
- umode_t mode;
+ cfs_umode_t mode;
mode = mdd_object_type(mdd_obj);
if (S_ISREG(mode) || S_ISDIR(mode)) {
}
}
+ if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) {
+ cfs_umode_t mode;
+
+ mode = mdd_object_type(mdd_obj);
+ if (S_ISREG(mode))
+ rc = mdd_lma_set_locked(env, mdd_obj, ma, handle);
+
+ }
cleanup:
- if ((rc == 0) && (ma->ma_attr.la_valid & (LA_MTIME | LA_CTIME)))
- rc = mdd_changelog_data_store(env, mdd, CL_SETATTR, mdd_obj,
- handle);
+ if (rc == 0)
+ rc = mdd_attr_set_changelog(env, obj, handle,
+ ma->ma_attr.la_valid);
+stop:
mdd_trans_stop(env, mdd, rc, handle);
if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
/*set obd attr, if needed*/
RETURN(rc);
}
+static int mdd_declare_xattr_set(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *obj,
+ const struct lu_buf *buf,
+ const char *name,
+ struct thandle *handle)
+
+{
+ int rc;
+
+ rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
+ if (rc)
+ return rc;
+
+ /* Only record user xattr changes */
+ if ((strncmp("user.", name, 5) == 0))
+ rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+
+ return rc;
+}
+
/**
* The caller should guarantee to update the object ctime
* after xattr_set if needed.
if (rc)
RETURN(rc);
- mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ RETURN(PTR_ERR(handle));
+
/* security-replated changes may require sync */
if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
mdd->mdd_sync_permission == 1)
- txn_param_sync(&mdd_env_info(env)->mti_param);
+ handle->th_sync = 1;
- handle = mdd_trans_start(env, mdd);
- if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ /* security-replated changes may require sync */
+ if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
+ handle->th_sync |= mdd->mdd_sync_permission;
rc = mdd_xattr_set_txn(env, mdd_obj, buf, name, fl, handle);
- /* Only record user xattr changes */
- if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
- (strncmp("user.", name, 5) == 0))
- rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
+ /* Only record system & user xattr changes */
+ if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
+ sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+ strncmp(POSIX_ACL_XATTR_ACCESS, name,
+ sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+ strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+ sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
+ rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
handle);
+
+stop:
mdd_trans_stop(env, mdd, rc, handle);
RETURN(rc);
}
+static int mdd_declare_xattr_del(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *obj,
+ const char *name,
+ struct thandle *handle)
+{
+ int rc;
+
+ rc = mdo_declare_xattr_del(env, obj, name, handle);
+ if (rc)
+ return rc;
+
+ /* Only record user xattr changes */
+ if ((strncmp("user.", name, 5) == 0))
+ rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+
+ return rc;
+}
+
/**
* The caller should guarantee to update the object ctime
* after xattr_set if needed.
if (rc)
RETURN(rc);
- mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
+ rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
rc = mdo_xattr_del(env, mdd_obj, name, handle,
mdd_object_capa(env, mdd_obj));
mdd_write_unlock(env, mdd_obj);
- /* Only record user xattr changes */
- if ((rc == 0) && (mdd->mdd_cl.mc_flags & CLM_ON) &&
- (strncmp("user.", name, 5) != 0))
- rc = mdd_changelog_data_store(env, mdd, CL_XATTR, mdd_obj,
+ /* Only record system & user xattr changes */
+ if ((rc == 0) && (strncmp(XATTR_USER_PREFIX, name,
+ sizeof(XATTR_USER_PREFIX) - 1) == 0 ||
+ strncmp(POSIX_ACL_XATTR_ACCESS, name,
+ sizeof(POSIX_ACL_XATTR_ACCESS) - 1) == 0 ||
+ strncmp(POSIX_ACL_XATTR_DEFAULT, name,
+ sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
+ rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
handle);
+stop:
mdd_trans_stop(env, mdd, rc, handle);
RETURN(rc);
int rc;
ENTRY;
+ /* XXX: this code won't be used ever:
+ * DNE uses slightly different approach */
+ LBUG();
+
/*
* Check -ENOENT early here because we need to get object type
* to calculate credits before transaction start
LASSERT(mdd_object_exists(mdd_obj) > 0);
- rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
- if (rc)
- RETURN(rc);
-
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
RETURN(-ENOMEM);
+ rc = mdd_trans_start(env, mdd, handle);
+
mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
int rc = 0;
ENTRY;
+ /* XXX: this code won't be used ever:
+ * DNE uses slightly different approach */
+ LBUG();
+
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota) {
quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
}
#endif
- mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP);
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
GOTO(out_pending, rc = PTR_ERR(handle));
+ rc = mdd_trans_start(env, mdd, handle);
+
mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
rc = mdd_oc_sanity_check(env, mdd_obj, ma);
if (rc)
int rc;
ENTRY;
- mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
- handle = mdd_trans_start(env, mdd);
+ /* XXX: this code won't be used ever:
+ * DNE uses slightly different approach */
+ LBUG();
+
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
RETURN(-ENOMEM);
+ rc = mdd_trans_start(env, mdd, handle);
+
mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
if (rc == 0)
if (flags & (FMODE_WRITE | MDS_OPEN_TRUNC | MDS_OPEN_APPEND))
res |= MAY_WRITE;
if (flags & MDS_FMODE_EXEC)
- res |= MAY_EXEC;
+ res = MAY_EXEC;
return res;
}
return rc;
}
+int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
+ struct md_attr *ma, struct thandle *handle)
+{
+ int rc;
+
+ rc = mdd_declare_unlink_log(env, obj, ma, handle);
+ if (rc)
+ return rc;
+
+ return mdo_declare_destroy(env, obj, handle);
+}
+
/* return md_attr back,
* if it is last unlink then return lov ea + llog cookie*/
int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
- struct md_attr *ma)
+ struct md_attr *ma, struct thandle *handle)
{
int rc = 0;
ENTRY;
rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
obj, ma);
}
+
+ if (rc == 0)
+ rc = mdo_destroy(env, obj, handle);
+
RETURN(rc);
}
+static int mdd_declare_close(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct md_attr *ma,
+ struct thandle *handle)
+{
+ int rc;
+
+ rc = orph_declare_index_delete(env, obj, handle);
+ if (rc)
+ return rc;
+
+ return mdd_declare_object_kill(env, obj, ma, handle);
+}
+
/*
* No permission check is needed.
*/
static int mdd_close(const struct lu_env *env, struct md_object *obj,
- struct md_attr *ma)
+ struct md_attr *ma, int mode)
{
struct mdd_object *mdd_obj = md2mdd_obj(obj);
struct mdd_device *mdd = mdo2mdd(obj);
- struct thandle *handle;
+ struct thandle *handle = NULL;
int rc;
int reset = 1;
#endif
ENTRY;
- rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP);
- if (rc)
- RETURN(rc);
- handle = mdd_trans_start(env, mdo2mdd(obj));
- if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
+ mdd_obj->mod_count--;
+
+ if (mdd_obj->mod_flags & ORPHAN_OBJ && !mdd_obj->mod_count)
+ CDEBUG(D_HA, "Object "DFID" is retained in orphan "
+ "list\n", PFID(mdd_object_fid(mdd_obj)));
+ RETURN(0);
+ }
+
+ /* check without any lock */
+ if (mdd_obj->mod_count == 1 &&
+ (mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
+ again:
+ handle = mdd_trans_create(env, mdo2mdd(obj));
+ if (IS_ERR(handle))
+ RETURN(PTR_ERR(handle));
+
+ rc = mdd_declare_close(env, mdd_obj, ma, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+ if (rc)
+ GOTO(stop, rc);
+ }
mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+ if (handle == NULL && mdd_obj->mod_count == 1 &&
+ (mdd_obj->mod_flags & ORPHAN_OBJ) != 0) {
+ mdd_write_unlock(env, mdd_obj);
+ goto again;
+ }
+
/* release open count */
mdd_obj->mod_count --;
if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
/* remove link to object from orphan index */
+ LASSERT(handle != NULL);
rc = __mdd_orphan_del(env, mdd_obj, handle);
if (rc == 0) {
CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
} else {
- rc = mdd_object_kill(env, mdd_obj, ma);
- if (rc == 0)
- reset = 0;
+ if (handle == NULL) {
+ handle = mdd_trans_create(env, mdo2mdd(obj));
+ if (IS_ERR(handle))
+ GOTO(out, rc = PTR_ERR(handle));
+
+ rc = mdd_declare_object_kill(env, mdd_obj, ma,
+ handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_declare_changelog_store(env, mdd,
+ NULL, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ rc = mdd_object_kill(env, mdd_obj, ma, handle);
+ if (rc == 0)
+ reset = 0;
}
if (rc != 0)
ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
mdd_write_unlock(env, mdd_obj);
- mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
+
+ if (rc == 0 &&
+ (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
+ !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
+ if (handle == NULL) {
+ handle = mdd_trans_create(env, mdo2mdd(obj));
+ if (IS_ERR(handle))
+ GOTO(stop, rc = IS_ERR(handle));
+
+ rc = mdd_declare_changelog_store(env, mdd, NULL,
+ handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+ if (rc)
+ GOTO(stop, rc);
+ }
+
+ mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
+ mdd_obj, handle);
+ }
+
+stop:
+ if (handle != NULL)
+ mdd_trans_stop(env, mdd, rc, handle);
#ifdef HAVE_QUOTA_SUPPORT
if (quota_opc)
/* Trigger dqrel on the owner of child. If failed,
}
static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
- int first, void *area, int nob,
+ struct lu_dirpage *dp, int nob,
const struct dt_it_ops *iops, struct dt_it *it,
- __u64 *start, __u64 *end,
- struct lu_dirent **last, __u32 attr)
+ __u32 attr)
{
+ void *area = dp;
int result;
__u64 hash = 0;
struct lu_dirent *ent;
+ struct lu_dirent *last = NULL;
+ int first = 1;
- if (first) {
- memset(area, 0, sizeof (struct lu_dirpage));
- area += sizeof (struct lu_dirpage);
- nob -= sizeof (struct lu_dirpage);
- }
+ memset(area, 0, sizeof (*dp));
+ area += sizeof (*dp);
+ nob -= sizeof (*dp);
ent = area;
do {
int len;
int recsize;
- len = iops->key_size(env, it);
+ len = iops->key_size(env, it);
/* IAM iterator can return record with zero len. */
if (len == 0)
hash = iops->store(env, it);
if (unlikely(first)) {
first = 0;
- *start = hash;
+ dp->ldp_hash_start = cpu_to_le64(hash);
}
/* calculate max space required for lu_dirent */
recsize = lu_dirent_calc_size(len, attr);
if (nob >= recsize) {
- result = iops->rec(env, it, ent, attr);
+ result = iops->rec(env, it, (struct dt_rec *)ent, attr);
if (result == -ESTALE)
goto next;
if (result != 0)
* so recheck rec length */
recsize = le16_to_cpu(ent->lde_reclen);
} else {
- /*
- * record doesn't fit into page, enlarge previous one.
- */
- if (*last) {
- (*last)->lde_reclen =
- cpu_to_le16(le16_to_cpu((*last)->lde_reclen) +
- nob);
- result = 0;
- } else
- result = -EINVAL;
-
+ result = (last != NULL) ? 0 :-EINVAL;
goto out;
}
- *last = ent;
+ last = ent;
ent = (void *)ent + recsize;
nob -= recsize;
} while (result == 0);
out:
- *end = hash;
+ dp->ldp_hash_end = cpu_to_le64(hash);
+ if (last != NULL) {
+ if (last->lde_hash == dp->ldp_hash_end)
+ dp->ldp_flags |= cpu_to_le32(LDF_COLLIDE);
+ last->lde_reclen = 0; /* end mark */
+ }
return result;
}
struct dt_object *next = mdd_object_child(obj);
const struct dt_it_ops *iops;
struct page *pg;
- struct lu_dirent *last = NULL;
struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
int i;
+ int nlupgs = 0;
int rc;
int nob;
- __u64 hash_start;
- __u64 hash_end = 0;
LASSERT(rdpg->rp_pages != NULL);
LASSERT(next->do_index_ops != NULL);
* iterate through directory and fill pages from @rdpg
*/
iops = &next->do_index_ops->dio_it;
- it = iops->init(env, next, mdd_object_capa(env, obj));
+ it = iops->init(env, next, rdpg->rp_attrs, mdd_object_capa(env, obj));
if (IS_ERR(it))
return PTR_ERR(it);
rc = iops->load(env, it, rdpg->rp_hash);
- if (rc == 0){
+ if (rc == 0) {
/*
* Iterator didn't find record with exactly the key requested.
*
*/
for (i = 0, nob = rdpg->rp_count; rc == 0 && nob > 0;
i++, nob -= CFS_PAGE_SIZE) {
+ struct lu_dirpage *dp;
+
LASSERT(i < rdpg->rp_npages);
pg = rdpg->rp_pages[i];
- rc = mdd_dir_page_build(env, mdd, !i, cfs_kmap(pg),
- min_t(int, nob, CFS_PAGE_SIZE), iops,
- it, &hash_start, &hash_end, &last,
- rdpg->rp_attrs);
- if (rc != 0 || i == rdpg->rp_npages - 1) {
- if (last)
- last->lde_reclen = 0;
+ dp = cfs_kmap(pg);
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+repeat:
+#endif
+ rc = mdd_dir_page_build(env, mdd, dp,
+ min_t(int, nob, LU_PAGE_SIZE),
+ iops, it, rdpg->rp_attrs);
+ if (rc > 0) {
+ /*
+ * end of directory.
+ */
+ dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
+ nlupgs++;
+ } else if (rc < 0) {
+ CWARN("build page failed: %d!\n", rc);
+ } else {
+ nlupgs++;
+#if CFS_PAGE_SIZE > LU_PAGE_SIZE
+ dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+ if ((unsigned long)dp & ~CFS_PAGE_MASK)
+ goto repeat;
+#endif
}
cfs_kunmap(pg);
}
- if (rc > 0) {
- /*
- * end of directory.
- */
- hash_end = DIR_END_OFF;
- rc = 0;
- }
- if (rc == 0) {
+ if (rc >= 0) {
struct lu_dirpage *dp;
dp = cfs_kmap(rdpg->rp_pages[0]);
dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
- dp->ldp_hash_end = cpu_to_le64(hash_end);
- if (i == 0)
+ if (nlupgs == 0) {
/*
- * No pages were processed, mark this.
+ * No pages were processed, mark this for first page
+ * and send back.
*/
- dp->ldp_flags |= LDF_EMPTY;
-
- dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+ dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
+ nlupgs = 1;
+ }
cfs_kunmap(rdpg->rp_pages[0]);
+
+ rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
}
iops->put(env, it);
iops->fini(env, it);
dp = (struct lu_dirpage*)cfs_kmap(pg);
memset(dp, 0 , sizeof(struct lu_dirpage));
dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
- dp->ldp_hash_end = cpu_to_le64(DIR_END_OFF);
- dp->ldp_flags |= LDF_EMPTY;
- dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
+ dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
+ dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
cfs_kunmap(pg);
- GOTO(out_unlock, rc = 0);
+ GOTO(out_unlock, rc = LU_PAGE_SIZE);
}
rc = __mdd_readpage(env, mdd_obj, rdpg);
return next->do_ops->do_object_sync(env, next);
}
-static dt_obj_version_t mdd_version_get(const struct lu_env *env,
- struct md_object *obj)
-{
- struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
- LASSERT(mdd_object_exists(mdd_obj));
- return do_version_get(env, mdd_object_child(mdd_obj));
-}
-
-static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
- dt_obj_version_t version)
-{
- struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
- LASSERT(mdd_object_exists(mdd_obj));
- return do_version_set(env, mdd_object_child(mdd_obj), version);
-}
-
const struct md_object_operations mdd_obj_ops = {
.moo_permission = mdd_permission,
.moo_attr_get = mdd_attr_get,
.moo_close = mdd_close,
.moo_readpage = mdd_readpage,
.moo_readlink = mdd_readlink,
+ .moo_changelog = mdd_changelog,
.moo_capa_get = mdd_capa_get,
.moo_object_sync = mdd_object_sync,
- .moo_version_get = mdd_version_get,
- .moo_version_set = mdd_version_set,
.moo_path = mdd_path,
+ .moo_file_lock = mdd_file_lock,
+ .moo_file_unlock = mdd_file_unlock,
};