#define DEBUG_SUBSYSTEM S_MDS
#include <linux/module.h>
+#ifdef HAVE_EXT4_LDISKFS
+#include <ldiskfs/ldiskfs_jbd2.h>
+#else
#include <linux/jbd.h>
+#endif
#include <obd.h>
#include <obd_class.h>
#include <obd_support.h>
#include <lustre_fid.h>
#include <lustre_param.h>
+#ifdef HAVE_EXT4_LDISKFS
+#include <ldiskfs/ldiskfs.h>
+#else
#include <linux/ldiskfs_fs.h>
+#endif
#include <lustre_mds.h>
#include <lustre/lustre_idl.h>
}
static int mdd_object_init(const struct lu_env *env, struct lu_object *o,
- const struct lu_object_conf *_)
+ const struct lu_object_conf *unused)
{
struct mdd_device *d = lu2mdd_dev(o->lo_dev);
struct mdd_object *mdd_obj = lu2mdd_obj(o);
{
struct mdd_object *mdd = lu2mdd_obj((struct lu_object *)o);
return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p(open_count=%d, "
- "valid=%x, cltime=%llu, flags=%lx",
+ "valid=%x, cltime=%llu, flags=%lx)",
mdd, mdd->mod_count, mdd->mod_valid,
mdd->mod_cltime, mdd->mod_flags);
}
RETURN(-ENOMEM);
rec = (struct llog_changelog_rec *)buf->lb_buf;
- rec->cr_flags = CLF_VERSION;
- rec->cr_type = (__u32)type;
- rec->cr_tfid = *tfid;
- rec->cr_namelen = 0;
+ rec->cr.cr_flags = CLF_VERSION;
+ rec->cr.cr_type = (__u32)type;
+ rec->cr.cr_tfid = *tfid;
+ rec->cr.cr_namelen = 0;
mdd_obj->mod_cltime = cfs_time_current_64();
rc = mdd_changelog_llog_write(mdd, rec, handle);
struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
#ifdef HAVE_QUOTA_SUPPORT
struct obd_device *obd = mdd->mdd_obd_dev;
+ struct obd_export *exp = md_quota(env)->mq_exp;
struct mds_obd *mds = &obd->u.mds;
unsigned int qnids[MAXQUOTAS] = { 0, 0 };
unsigned int qoids[MAXQUOTAS] = { 0, 0 };
int quota_opc = 0, block_count = 0;
- int inode_pending = 0, block_pending = 0;
+ int inode_pending[MAXQUOTAS] = { 0, 0 };
+ int block_pending[MAXQUOTAS] = { 0, 0 };
#endif
ENTRY;
mdd_quota_wrapper(la_copy, qnids);
mdd_quota_wrapper(la_tmp, qoids);
/* get file quota for new owner */
- lquota_chkquota(mds_quota_interface_ref, obd,
- qnids[USRQUOTA], qnids[GRPQUOTA], 1,
- &inode_pending, NULL, 0, NULL, 0);
+ lquota_chkquota(mds_quota_interface_ref, obd, exp,
+ qnids, inode_pending, 1, NULL, 0,
+ NULL, 0);
block_count = (la_tmp->la_blocks + 7) >> 3;
if (block_count) {
void *data = NULL;
mdd_data_get(env, mdd_obj, &data);
/* get block quota for new owner */
lquota_chkquota(mds_quota_interface_ref, obd,
- qnids[USRQUOTA],
- qnids[GRPQUOTA],
- block_count, &block_pending,
- NULL, LQUOTA_FLAGS_BLK,
- data, 1);
+ exp, qnids, block_pending,
+ block_count, NULL,
+ LQUOTA_FLAGS_BLK, data, 1);
}
}
}
}
#ifdef HAVE_QUOTA_SUPPORT
if (quota_opc) {
- if (inode_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qnids[USRQUOTA], qnids[GRPQUOTA],
- inode_pending, 0);
- if (block_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qnids[USRQUOTA], qnids[GRPQUOTA],
- block_pending, 1);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
+ inode_pending, 0);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qnids,
+ block_pending, 1);
/* Trigger dqrel/dqacq for original owner and new owner.
* If failed, the next call for lquota_chkquota will
* process it. */
RETURN(rc);
mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP);
+ /* security-replated changes may require sync */
+ if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
+ mdd->mdd_sync_permission == 1)
+ txn_param_sync(&mdd_env_info(env)->mti_param);
+
handle = mdd_trans_start(env, mdd);
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
struct thandle *handle;
#ifdef HAVE_QUOTA_SUPPORT
struct obd_device *obd = mdd->mdd_obd_dev;
+ struct obd_export *exp = md_quota(env)->mq_exp;
struct mds_obd *mds = &obd->u.mds;
unsigned int qids[MAXQUOTAS] = { 0, 0 };
int quota_opc = 0, block_count = 0;
- int inode_pending = 0, block_pending = 0;
+ int inode_pending[MAXQUOTAS] = { 0, 0 };
+ int block_pending[MAXQUOTAS] = { 0, 0 };
#endif
int rc = 0;
ENTRY;
quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
mdd_quota_wrapper(&ma->ma_attr, qids);
/* get file quota for child */
- lquota_chkquota(mds_quota_interface_ref, obd, qids[USRQUOTA],
- qids[GRPQUOTA], 1, &inode_pending, NULL, 0,
+ lquota_chkquota(mds_quota_interface_ref, obd, exp,
+ qids, inode_pending, 1, NULL, 0,
NULL, 0);
switch (ma->ma_attr.la_mode & S_IFMT) {
case S_IFLNK:
}
/* get block quota for child */
if (block_count)
- lquota_chkquota(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA],
- block_count, &block_pending, NULL,
- LQUOTA_FLAGS_BLK, NULL, 0);
+ lquota_chkquota(mds_quota_interface_ref, obd, exp,
+ qids, block_pending, block_count,
+ NULL, LQUOTA_FLAGS_BLK, NULL, 0);
}
#endif
out_pending:
#ifdef HAVE_QUOTA_SUPPORT
if (quota_opc) {
- if (inode_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA],
- inode_pending, 0);
- if (block_pending)
- lquota_pending_commit(mds_quota_interface_ref, obd,
- qids[USRQUOTA], qids[GRPQUOTA],
- block_pending, 1);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qids,
+ inode_pending, 0);
+ lquota_pending_commit(mds_quota_interface_ref, obd, qids,
+ block_pending, 1);
/* Trigger dqacq on the owner of child. If failed,
* the next call for lquota_chkquota will process it. */
lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
- FSFILT_OP_CREATE_PARTIAL_CHILD);
+ quota_opc);
}
#endif
return rc;
struct md_attr *ma)
{
struct mdd_object *mdd_obj = md2mdd_obj(obj);
+ struct mdd_device *mdd = mdo2mdd(obj);
struct thandle *handle;
int rc;
int reset = 1;
/* release open count */
mdd_obj->mod_count --;
- if (mdd_obj->mod_count == 0) {
+ if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
/* remove link to object from orphan index */
- if (mdd_obj->mod_flags & ORPHAN_OBJ)
- __mdd_orphan_del(env, mdd_obj, handle);
+ rc = __mdd_orphan_del(env, mdd_obj, handle);
+ if (rc == 0) {
+ CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
+ "list, OSS objects to be destroyed.\n",
+ PFID(mdd_object_fid(mdd_obj)));
+ } else {
+ CERROR("Object "DFID" can not be deleted from orphan "
+ "list, maybe cause OST objects can not be "
+ "destroyed (err: %d).\n",
+ PFID(mdd_object_fid(mdd_obj)), rc);
+ /* If object was not deleted from orphan list, do not
+ * destroy OSS objects, which will be done when next
+ * recovery. */
+ GOTO(out, rc);
+ }
}
rc = mdd_iattr_get(env, mdd_obj, ma);
- if (rc == 0) {
- if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
- rc = mdd_object_kill(env, mdd_obj, ma);
+ /* Object maybe not in orphan list originally, it is rare case for
+ * mdd_finish_unlink() failure. */
+ if (rc == 0 && ma->ma_attr.la_nlink == 0) {
#ifdef HAVE_QUOTA_SUPPORT
- if (mds->mds_quota) {
- quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
- mdd_quota_wrapper(&ma->ma_attr, qids);
- }
+ if (mds->mds_quota) {
+ quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+ mdd_quota_wrapper(&ma->ma_attr, qids);
+ }
#endif
- if (rc == 0)
- reset = 0;
+ /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */
+ if (ma->ma_valid & MA_FLAGS &&
+ ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
+ rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
+ } else {
+ rc = mdd_object_kill(env, mdd_obj, ma);
+ if (rc == 0)
+ reset = 0;
}
+
+ if (rc != 0)
+ CERROR("Error when prepare to delete Object "DFID" , "
+ "which will cause OST objects can not be "
+ "destroyed.\n", PFID(mdd_object_fid(mdd_obj)));
}
+ EXIT;
+out:
if (reset)
ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
lquota_adjust(mds_quota_interface_ref, obd, qids, 0, rc,
quota_opc);
#endif
- RETURN(rc);
+ return rc;
}
/*
RETURN(rc);
}
-static int mdd_append_attrs(const struct lu_env *env,
- struct mdd_device *mdd,
- __u32 attr,
- const struct dt_it_ops *iops,
- struct dt_it *it,
- struct lu_dirent*ent)
-{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct lu_fid *fid = &info->mti_fid2;
- int len = cpu_to_le16(ent->lde_namelen);
- const unsigned align = sizeof(struct luda_type) - 1;
- struct lu_fid_pack *pack;
- struct mdd_object *obj;
- struct luda_type *lt;
- int rc = 0;
-
- if (attr & LUDA_FID) {
- pack = (struct lu_fid_pack *)iops->rec(env, it);
- if (IS_ERR(pack)) {
- rc = PTR_ERR(pack);
- ent->lde_attrs = 0;
- goto out;
- }
- rc = fid_unpack(pack, fid);
- if (rc != 0) {
- ent->lde_attrs = 0;
- goto out;
- }
-
- fid_cpu_to_le(&ent->lde_fid, fid);
- ent->lde_attrs = LUDA_FID;
- }
-
- /* check if file type is required */
- if (attr & LUDA_TYPE) {
- if (!(attr & LUDA_FID)) {
- CERROR("wrong attr : [%x]\n",attr);
- rc = -EINVAL;
- goto out;
- }
-
- obj = mdd_object_find(env, mdd, fid);
- if (obj == NULL) /* remote object */
- goto out;
-
- if (IS_ERR(obj)) {
- rc = PTR_ERR(obj);
- goto out;
- }
-
- if (mdd_object_exists(obj) == +1) {
- len = (len + align) & ~align;
-
- lt = (void *) ent->lde_name + len;
- lt->lt_type = cpu_to_le16(mdd_object_type(obj));
-
- ent->lde_attrs |= LUDA_TYPE;
- }
- mdd_object_put(env, obj);
- }
-out:
- ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
- return rc;
-}
-
static int mdd_dir_page_build(const struct lu_env *env, struct mdd_device *mdd,
int first, void *area, int nob,
const struct dt_it_ops *iops, struct dt_it *it,
struct lu_dirent **last, __u32 attr)
{
int result;
+ __u64 hash = 0;
struct lu_dirent *ent;
- __u64 hash = 0;
if (first) {
memset(area, 0, sizeof (struct lu_dirpage));
ent = area;
do {
- char *name;
int len;
int recsize;
if (len == 0)
goto next;
- name = (char *)iops->key(env, it);
hash = iops->store(env, it);
-
if (unlikely(first)) {
first = 0;
*start = hash;
}
+ /* calculate max space required for lu_dirent */
recsize = lu_dirent_calc_size(len, attr);
- CDEBUG(D_INFO, "%p %p %d "LPU64" (%d) \"%*.*s\"\n",
- name, ent, nob, hash, len, len, len, name);
-
if (nob >= recsize) {
- ent->lde_hash = cpu_to_le64(hash);
- ent->lde_namelen = cpu_to_le16(len);
- ent->lde_reclen = cpu_to_le16(recsize);
- memcpy(ent->lde_name, name, len);
-
- result = mdd_append_attrs(env, mdd, attr, iops, it, ent);
+ result = iops->rec(env, it, ent, attr);
+ if (result == -ESTALE)
+ goto next;
if (result != 0)
goto out;
+
+ /* osd might not able to pack all attributes,
+ * so recheck rec length */
+ recsize = le16_to_cpu(ent->lde_reclen);
} else {
/*
* record doesn't fit into page, enlarge previous one.
next:
result = iops->next(env, it);
+ if (result == -ESTALE)
+ goto next;
} while (result == 0);
out:
rc = iops->load(env, it, rdpg->rp_hash);
- if (rc == 0)
+ if (rc == 0){
/*
* Iterator didn't find record with exactly the key requested.
*
* state)---position it on the next item.
*/
rc = iops->next(env, it);
- else if (rc > 0)
+ } else if (rc > 0)
rc = 0;
/*
return next->do_ops->do_object_sync(env, next);
}
+static dt_obj_version_t mdd_version_get(const struct lu_env *env,
+ struct md_object *obj)
+{
+ struct mdd_object *mdd_obj = md2mdd_obj(obj);
+
+ LASSERT(mdd_object_exists(mdd_obj));
+ return do_version_get(env, mdd_object_child(mdd_obj));
+}
+
+static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
+ dt_obj_version_t version)
+{
+ struct mdd_object *mdd_obj = md2mdd_obj(obj);
+
+ LASSERT(mdd_object_exists(mdd_obj));
+ return do_version_set(env, mdd_object_child(mdd_obj), version);
+}
+
const struct md_object_operations mdd_obj_ops = {
.moo_permission = mdd_permission,
.moo_attr_get = mdd_attr_get,
.moo_readlink = mdd_readlink,
.moo_capa_get = mdd_capa_get,
.moo_object_sync = mdd_object_sync,
+ .moo_version_get = mdd_version_get,
+ .moo_version_set = mdd_version_set,
.moo_path = mdd_path,
};