#include <linux/ldiskfs_fs.h>
#include <lustre_mds.h>
#include <lustre/lustre_idl.h>
+#include <lustre_fid.h>
#include "mdd_internal.h"
static const char dotdot[] = "..";
static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
- const char *name, const struct lu_fid* fid, int mask);
+ const char *name, struct lu_fid* fid, int mask);
static int
__mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
- const char *name, const struct lu_fid* fid, int mask)
+ const char *name, struct lu_fid* fid, int mask)
{
struct mdd_object *mdd_obj = md2mdd_obj(pobj);
+ struct dynlock_handle *dlh;
int rc;
- mdd_read_lock(env, mdd_obj);
+ dlh = mdd_pdo_read_lock(env, mdd_obj, name);
+ if (dlh == NULL)
+ return -ENOMEM;
rc = __mdd_lookup(env, pobj, name, fid, mask);
- mdd_read_unlock(env, mdd_obj);
+ mdd_pdo_read_unlock(env, mdd_obj, dlh);
- return rc;
+ return rc;
}
static int mdd_lookup(const struct lu_env *env,
struct md_object *pobj, const char *name,
- struct lu_fid* fid)
+ struct lu_fid* fid, struct md_op_spec *spec)
{
int rc;
ENTRY;
}
/*
+ * For root fid use special function, whcih does not compare version component
+ * of fid. Vresion component is different for root fids on all MDTs.
+ */
+static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
+{
+ return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
+ fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
+}
+
+/*
* return 1: if lf is the fid of the ancestor of p1;
* return 0: if not;
*
LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
pfid = &mdd_env_info(env)->mti_fid;
- /* Do not lookup ".." in root, they do not exist there. */
- if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid))
+ /* Check for root first. */
+ if (mdd_is_root(mdd, mdo2fid(p1)))
RETURN(0);
for(;;) {
rc = mdd_parent_fid(env, p1, pfid);
if (rc)
GOTO(out, rc);
- if (lu_fid_eq(pfid, &mdd->mdd_root_fid))
+ if (mdd_is_root(mdd, pfid))
GOTO(out, rc = 0);
if (lu_fid_eq(pfid, lf))
GOTO(out, rc = 1);
if (parent == NULL) {
if (pf != NULL)
*pf = *pfid;
- GOTO(out, rc = EREMOTE);
+ GOTO(out, rc = -EREMOTE);
} else if (IS_ERR(parent))
GOTO(out, rc = PTR_ERR(parent));
p1 = parent;
RETURN(0);
rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
-
+ if (rc == 0) {
+ /* found root */
+ fid_zero(sfid);
+ } else if (rc == 1) {
+ /* found @fid is parent */
+ *sfid = *fid;
+ rc = 0;
+ }
RETURN(rc);
}
-/*Check whether it may create the cobj under the pobj*/
-static int mdd_may_create(const struct lu_env *env,
- struct mdd_object *pobj, struct mdd_object *cobj,
- int need_check)
+/* Check whether it may create the cobj under the pobj */
+static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
+ struct mdd_object *cobj, int need_check)
{
int rc = 0;
ENTRY;
if (mdd_is_dead_obj(pobj))
RETURN(-ENOENT);
- /*check pobj may create or not*/
if (need_check)
- rc = mdd_permission_internal(env, pobj,
- MAY_WRITE | MAY_EXEC);
+ rc = mdd_permission_internal_locked(env, pobj, NULL,
+ MAY_WRITE | MAY_EXEC);
RETURN(rc);
}
-/*
- * It's inline, so penalty for filesystems that don't use sticky bit is
- * minimal.
- */
static inline int mdd_is_sticky(const struct lu_env *env,
struct mdd_object *pobj,
struct mdd_object *cobj)
rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
if (rc)
return rc;
- else if (!(tmp_la->la_mode & S_ISVTX))
- return 0;
- else if (tmp_la->la_uid == uc->mu_fsuid)
+ else if (!(tmp_la->la_mode & S_ISVTX) ||
+ (tmp_la->la_uid == uc->mu_fsuid))
return 0;
else
return !mdd_capable(uc, CAP_FOWNER);
RETURN(-EBUSY);
} else if (S_ISDIR(mdd_object_type(cobj))) {
- RETURN(-EISDIR);
+ RETURN(-EISDIR);
}
if (pobj) {
RETURN(-EPERM);
if (need_check)
- rc = mdd_permission_internal(env, pobj,
+ rc = mdd_permission_internal_locked(env, pobj, NULL,
MAY_WRITE | MAY_EXEC);
}
RETURN(rc);
int rc = 0;
ENTRY;
+ if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
+ RETURN(-EPERM);
+
+ if (S_ISDIR(mdd_object_type(src_obj)))
+ RETURN(-EPERM);
+
+ LASSERT(src_obj != tgt_obj);
if (tgt_obj) {
rc = mdd_may_create(env, tgt_obj, NULL, 1);
if (rc)
RETURN(rc);
}
- if (S_ISDIR(mdd_object_type(src_obj)))
- RETURN(-EPERM);
-
- if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
- RETURN(-EPERM);
-
RETURN(rc);
}
-static void mdd_lock2(const struct lu_env *env,
- struct mdd_object *o0, struct mdd_object *o1)
+const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
+ const struct lu_fid *fid)
{
- mdd_write_lock(env, o0);
- mdd_write_lock(env, o1);
-}
+ struct mdd_thread_info *info = mdd_env_info(env);
-static void mdd_unlock2(const struct lu_env *env,
- struct mdd_object *o0, struct mdd_object *o1)
-{
- mdd_write_unlock(env, o1);
- mdd_write_unlock(env, o0);
+ fid_cpu_to_be(&info->mti_fid2, fid);
+ return (const struct dt_rec *)&info->mti_fid2;
}
+
/* insert new index, add reference if isdir, update times */
-static int __mdd_index_insert(const struct lu_env *env,
- struct mdd_object *pobj, const struct lu_fid *lf,
- const char *name, int isdir, struct thandle *th,
- struct lustre_capa *capa)
+static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
+ const struct lu_fid *lf, const char *name, int is_dir,
+ struct thandle *handle, struct lustre_capa *capa)
{
- int rc;
struct dt_object *next = mdd_object_child(pobj);
+ struct timeval start;
+ int rc;
ENTRY;
-#if 0
- struct lu_attr *la = &mdd_env_info(env)->mti_la;
-#endif
-
- if (dt_try_as_dir(env, next))
+ mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
+ LPROC_MDD_INDEX_INSERT);
+ if (dt_try_as_dir(env, next)) {
rc = next->do_index_ops->dio_insert(env, next,
- (struct dt_rec *)lf,
- (struct dt_key *)name,
- th, capa);
- else
+ __mdd_fid_rec(env, lf),
+ (const struct dt_key *)name,
+ handle, capa);
+ } else {
rc = -ENOTDIR;
+ }
if (rc == 0) {
- if (isdir)
- mdd_ref_add_internal(env, pobj, th);
-#if 0
- la->la_valid = LA_MTIME|LA_CTIME;
- la->la_atime = ma->ma_attr.la_atime;
- la->la_ctime = ma->ma_attr.la_ctime;
- rc = mdd_attr_set_internal(env, mdd_obj, la, handle, 0);
-#endif
+ if (is_dir) {
+ mdd_write_lock(env, pobj);
+ mdd_ref_add_internal(env, pobj, handle);
+ mdd_write_unlock(env, pobj);
+ }
}
- return rc;
+ mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
+ LPROC_MDD_INDEX_INSERT);
+ RETURN(rc);
}
-static int __mdd_index_delete(const struct lu_env *env,
- struct mdd_object *pobj, const char *name,
- int is_dir, struct thandle *handle,
+static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
+ const char *name, int is_dir, struct thandle *handle,
struct lustre_capa *capa)
{
- int rc;
struct dt_object *next = mdd_object_child(pobj);
+ struct timeval start;
+ int rc;
ENTRY;
+ mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
+ LPROC_MDD_INDEX_DELETE);
+
if (dt_try_as_dir(env, next)) {
rc = next->do_index_ops->dio_delete(env, next,
(struct dt_key *)name,
handle, capa);
- if (rc == 0 && is_dir)
+ if (rc == 0 && is_dir) {
+ mdd_write_lock(env, pobj);
mdd_ref_del_internal(env, pobj, handle);
+ mdd_write_unlock(env, pobj);
+ }
} else
rc = -ENOTDIR;
+
+ mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
+ LPROC_MDD_INDEX_DELETE);
RETURN(rc);
}
-
-static int __mdd_index_insert_only(const struct lu_env *env,
- struct mdd_object *pobj,
- const struct lu_fid *lf,
- const char *name, struct thandle *th,
- struct lustre_capa *capa)
+static int
+__mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
+ const struct lu_fid *lf, const char *name,
+ struct thandle *handle, struct lustre_capa *capa)
{
- int rc;
struct dt_object *next = mdd_object_child(pobj);
+ int rc;
ENTRY;
- if (dt_try_as_dir(env, next))
+ if (dt_try_as_dir(env, next)) {
rc = next->do_index_ops->dio_insert(env, next,
- (struct dt_rec *)lf,
- (struct dt_key *)name, th, capa);
- else
+ __mdd_fid_rec(env, lf),
+ (const struct dt_key *)name,
+ handle, capa);
+ } else {
rc = -ENOTDIR;
+ }
RETURN(rc);
}
struct md_object *src_obj, const char *name,
struct md_attr *ma)
{
+ struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
struct mdd_device *mdd = mdo2mdd(src_obj);
- struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
+ struct dynlock_handle *dlh;
struct thandle *handle;
int rc;
ENTRY;
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- mdd_lock2(env, mdd_tobj, mdd_sobj);
+ dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
+ if (dlh == NULL)
+ GOTO(out_trans, rc = -ENOMEM);
+ mdd_write_lock(env, mdd_sobj);
rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
if (rc)
- GOTO(out, rc);
+ GOTO(out_unlock, rc);
rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
name, handle,
mdd_object_capa(env, mdd_tobj));
- if (rc == 0)
- mdd_ref_add_internal(env, mdd_sobj, handle);
-
- *la_copy = ma->ma_attr;
- la_copy->la_valid = LA_CTIME;
- rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0);
if (rc)
- GOTO(out, rc);
+ GOTO(out_unlock, rc);
- la_copy->la_valid = LA_CTIME | LA_MTIME;
- rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
+ mdd_ref_add_internal(env, mdd_sobj, handle);
-out:
- mdd_unlock2(env, mdd_tobj, mdd_sobj);
+ *la = ma->ma_attr;
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
+ if (rc)
+ GOTO(out_unlock, rc);
+
+ la->la_valid = LA_CTIME;
+ rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
+ EXIT;
+out_unlock:
+ mdd_write_unlock(env, mdd_sobj);
+ mdd_pdo_write_unlock(env, mdd_tobj, dlh);
+out_trans:
mdd_trans_stop(env, mdd, rc, handle);
- RETURN(rc);
+ return rc;
+}
+
+static inline void mdd_set_dead_obj(struct mdd_object *obj)
+{
+ if (obj)
+ obj->mod_flags |= DEAD_OBJ;
}
/* caller should take a lock before calling */
/* add new orphan and the object
* will be deleted during the object_put() */
if (__mdd_orphan_add(env, obj, th) == 0)
- set_bit(LU_OBJECT_ORPHAN,
- &mdd2lu_obj(obj)->lo_header->loh_flags);
+ obj->mod_flags |= ORPHAN_OBJ;
+ mdd_set_dead_obj(obj);
if (obj->mod_count == 0)
rc = mdd_object_kill(env, obj, ma);
- }
+ else
+ /* clear MA_LOV | MA_COOKIE, if we do not
+ * unlink it in case we get it somewhere */
+ ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
+ } else
+ ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
+
RETURN(rc);
}
obj = mdd_object_child(dir);
iops = &obj->do_index_ops->dio_it;
- it = iops->init(env, obj, 0);
+ it = iops->init(env, obj, 0, BYPASS_CAPA);
if (it != NULL) {
result = iops->get(env, it, (const void *)"");
if (result > 0) {
RETURN(rc);
}
-static int mdd_unlink(const struct lu_env *env,
- struct md_object *pobj, struct md_object *cobj,
- const char *name, struct md_attr *ma)
+static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
+ struct md_object *cobj, const char *name,
+ struct md_attr *ma)
{
- struct mdd_device *mdd = mdo2mdd(pobj);
+ struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
- struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
+ struct mdd_device *mdd = mdo2mdd(pobj);
+ struct dynlock_handle *dlh;
struct thandle *handle;
int rc, is_dir;
ENTRY;
- rc = mdd_log_txn_param_build(env, mdd_cobj, ma, MDD_TXN_UNLINK_OP);
+ /*
+ * Check -ENOENT early here because we need to get object type
+ * to calculate credits before transaction start
+ */
+ if (!lu_object_exists(&cobj->mo_lu)) {
+ LU_OBJECT_DEBUG(D_ERROR, env, &cobj->mo_lu,
+ "unlinking as `%s'", name);
+ RETURN(-ENOENT);
+ }
+
+ LASSERTF(lu_object_exists(&cobj->mo_lu) > 0, "FID is "DFID"\n",
+ PFID(lu_object_fid(&cobj->mo_lu)));
+
+ rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
if (rc)
RETURN(rc);
-
+
handle = mdd_trans_start(env, mdd);
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- mdd_lock2(env, mdd_pobj, mdd_cobj);
+ dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
+ if (dlh == NULL)
+ GOTO(out_trans, rc = -ENOMEM);
+ mdd_write_lock(env, mdd_cobj);
rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
if (rc)
GOTO(cleanup, rc);
is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
+
+ current->debugging1 |= 0x1; /* XXX enable lvar_enoent_debug
+ * debugging */
rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
mdd_object_capa(env, mdd_pobj));
+ current->debugging1 &= ~0x1;
if (rc)
GOTO(cleanup, rc);
mdd_ref_del_internal(env, mdd_cobj, handle);
- *la_copy = ma->ma_attr;
if (is_dir) {
/* unlink dot */
mdd_ref_del_internal(env, mdd_cobj, handle);
- } else {
- la_copy->la_valid = LA_CTIME;
- rc = mdd_attr_set_internal(env, mdd_cobj, la_copy, handle, 0);
- if (rc)
- GOTO(cleanup, rc);
}
- la_copy->la_valid = LA_CTIME | LA_MTIME;
- rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
+ *la = ma->ma_attr;
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ la->la_valid = LA_CTIME;
+ rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
if (rc)
GOTO(cleanup, rc);
obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
strlen("unlinked"), "unlinked", 0,
NULL, NULL);
-
+ EXIT;
cleanup:
- mdd_unlock2(env, mdd_pobj, mdd_cobj);
+ mdd_write_unlock(env, mdd_cobj);
+ mdd_pdo_write_unlock(env, mdd_pobj, dlh);
+out_trans:
mdd_trans_stop(env, mdd, rc, handle);
- RETURN(rc);
+ return rc;
}
-/*
- * Partial operation. Be aware, this is called with write lock taken, so we use
- * locksless version of __mdd_lookup() here.
- */
static int mdd_ni_sanity_check(const struct lu_env *env,
struct md_object *pobj,
const char *name,
const struct lu_fid *fid)
{
- struct mdd_object *obj = md2mdd_obj(pobj);
-#if 0
- int rc;
-#endif
+ struct mdd_object *obj = md2mdd_obj(pobj);
ENTRY;
/* EEXIST check */
if (mdd_is_dead_obj(obj))
RETURN(-ENOENT);
- /* The exist of the name will be checked in _index_insert. */
-#if 0
- rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
- if (rc != -ENOENT)
- RETURN(rc ? : -EEXIST);
- else
- RETURN(0);
-#endif
- RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
+ /* The exist of the name will be checked in _index_insert. */
+ RETURN(mdd_permission_internal_locked(env, obj, NULL,
+ MAY_WRITE | MAY_EXEC));
}
-static int mdd_name_insert(const struct lu_env *env,
- struct md_object *pobj,
+/*
+ * Partial operation.
+ */
+static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
const char *name, const struct lu_fid *fid,
- int isdir)
+ int is_dir)
{
+ struct lu_attr *la = &mdd_env_info(env)->mti_la;
struct mdd_object *mdd_obj = md2mdd_obj(pobj);
struct mdd_device *mdd = mdo2mdd(pobj);
+ struct dynlock_handle *dlh;
struct thandle *handle;
int rc;
ENTRY;
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- mdd_write_lock(env, mdd_obj);
+ dlh = mdd_pdo_write_lock(env, mdd_obj, name);
+ if (dlh == NULL)
+ GOTO(out_trans, rc = -ENOMEM);
rc = mdd_ni_sanity_check(env, pobj, name, fid);
if (rc)
GOTO(out_unlock, rc);
- rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle,
- BYPASS_CAPA);
-
+ rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
+ handle, BYPASS_CAPA);
+ if (rc == 0) {
+ la->la_ctime = la->la_atime = CURRENT_SECONDS;
+ la->la_valid = LA_ATIME | LA_CTIME;
+ rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
+ }
+ EXIT;
out_unlock:
- mdd_write_unlock(env, mdd_obj);
-
+ mdd_pdo_write_unlock(env, mdd_obj, dlh);
+out_trans:
mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
- RETURN(rc);
+ return rc;
}
-/*
- * Be aware, this is called with write lock taken, so we use locksless version
- * of __mdd_lookup() here.
- */
static int mdd_nr_sanity_check(const struct lu_env *env,
struct md_object *pobj,
const char *name)
{
- struct mdd_object *obj = md2mdd_obj(pobj);
-#if 0
- struct mdd_thread_info *info = mdd_env_info(env);
- struct lu_fid *fid = &info->mti_fid;
- int rc;
-#endif
+ struct mdd_object *obj = md2mdd_obj(pobj);
ENTRY;
/* EEXIST check */
- if (mdd_is_dead_obj(obj))
+ if (mdd_is_dead_obj(obj)) {
+ CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
RETURN(-ENOENT);
+ }
- /* The exist of the name will be checked in _index_delete. */
-#if 0
- rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
- RETURN(rc);
-#endif
- RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
+ /* Name presense will be checked in _index_delete. */
+ RETURN(mdd_permission_internal_locked(env, obj, NULL,
+ MAY_WRITE | MAY_EXEC));
}
+/*
+ * Partial operation.
+ */
static int mdd_name_remove(const struct lu_env *env,
struct md_object *pobj,
const char *name, int is_dir)
{
- struct mdd_device *mdd = mdo2mdd(pobj);
+ struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_obj = md2mdd_obj(pobj);
+ struct mdd_device *mdd = mdo2mdd(pobj);
+ struct dynlock_handle *dlh;
struct thandle *handle;
int rc;
ENTRY;
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- mdd_write_lock(env, mdd_obj);
+ dlh = mdd_pdo_write_lock(env, mdd_obj, name);
+ if (dlh == NULL)
+ GOTO(out_trans, rc = -ENOMEM);
rc = mdd_nr_sanity_check(env, pobj, name);
if (rc)
GOTO(out_unlock, rc);
- rc = __mdd_index_delete(env, mdd_obj, name, is_dir, handle,
- BYPASS_CAPA);
+ rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
+ handle, BYPASS_CAPA);
+ if (rc)
+ GOTO(out_unlock, rc);
+ la->la_ctime = la->la_mtime = CURRENT_SECONDS;
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
+ EXIT;
out_unlock:
- mdd_write_unlock(env, mdd_obj);
-
+ mdd_pdo_write_unlock(env, mdd_obj, dlh);
+out_trans:
mdd_trans_stop(env, mdd, rc, handle);
- RETURN(rc);
+ return rc;
}
+
static int mdd_rt_sanity_check(const struct lu_env *env,
struct mdd_object *tgt_pobj,
struct mdd_object *tobj,
const struct lu_fid *lf, const char *name,
struct md_attr *ma)
{
- struct mdd_device *mdd = mdo2mdd(pobj);
+ struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
+ struct mdd_device *mdd = mdo2mdd(pobj);
+ struct dynlock_handle *dlh;
struct thandle *handle;
int rc;
ENTRY;
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
+ dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
+ if (dlh == NULL)
+ GOTO(out_trans, rc = -ENOMEM);
if (mdd_tobj)
- mdd_lock2(env, mdd_tpobj, mdd_tobj);
- else
- mdd_write_lock(env, mdd_tpobj);
+ mdd_write_lock(env, mdd_tobj);
- /*TODO rename sanity checking*/
+ /* XXX: Rename sanity checking. */
rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
if (rc)
GOTO(cleanup, rc);
- /* if rename_tgt is called then we should just re-insert name with
- * correct fid, no need to dec/inc parent nlink if obj is dir */
+ /*
+ * If rename_tgt is called then we should just re-insert name with
+ * correct fid, no need to dec/inc parent nlink if obj is dir.
+ */
rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
if (rc)
GOTO(cleanup, rc);
if (rc)
GOTO(cleanup, rc);
- if (tobj && lu_object_exists(&tobj->mo_lu))
+ *la = ma->ma_attr;
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ if (tobj && lu_object_exists(&tobj->mo_lu)) {
mdd_ref_del_internal(env, mdd_tobj, handle);
+ la->la_valid = LA_CTIME;
+ rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
+ }
+ EXIT;
cleanup:
if (tobj)
- mdd_unlock2(env, mdd_tpobj, mdd_tobj);
- else
- mdd_write_unlock(env, mdd_tpobj);
+ mdd_write_unlock(env, mdd_tobj);
+ mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
+out_trans:
mdd_trans_stop(env, mdd, rc, handle);
- RETURN(rc);
+ return rc;
}
/*
- * The permission has been checked when obj created,
- * no need check again.
+ * The permission has been checked when obj created, no need check again.
*/
static int mdd_cd_sanity_check(const struct lu_env *env,
struct mdd_object *obj)
{
- int rc = 0;
ENTRY;
/* EEXIST check */
if (!obj || mdd_is_dead_obj(obj))
RETURN(-ENOENT);
-#if 0
- mdd_read_lock(env, obj);
- rc = mdd_permission_internal(env, obj, MAY_WRITE);
- mdd_read_unlock(env, obj);
-#endif
-
- RETURN(rc);
+ RETURN(0);
}
-static int mdd_create_data(const struct lu_env *env,
- struct md_object *pobj, struct md_object *cobj,
- const struct md_create_spec *spec,
+static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
+ struct md_object *cobj, const struct md_op_spec *spec,
struct md_attr *ma)
{
struct mdd_device *mdd = mdo2mdd(cobj);
- struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
+ struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
struct mdd_object *son = md2mdd_obj(cobj);
struct lu_attr *attr = &ma->ma_attr;
struct lov_mds_md *lmm = NULL;
RETURN(rc);
if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
- !(spec->sp_cr_flags & FMODE_WRITE))
+ !(spec->sp_cr_flags & FMODE_WRITE))
RETURN(0);
- rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
- attr);
+
+ rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
+ spec, attr);
if (rc)
RETURN(rc);
mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
handle = mdd_trans_start(env, mdd);
if (IS_ERR(handle))
- RETURN(rc = PTR_ERR(handle));
+ GOTO(out_free, rc = PTR_ERR(handle));
/*
* XXX: Setting the lov ea is not locked but setting the attr is locked?
+ * Should this be fixed?
*/
/* Replay creates has objects already */
if (rc == 0)
rc = mdd_attr_get_internal_locked(env, son, ma);
+ mdd_trans_stop(env, mdd, rc, handle);
+out_free:
/* Finish mdd_lov_create() stuff. */
mdd_lov_create_finish(env, mdd, rc);
- mdd_trans_stop(env, mdd, rc, handle);
if (lmm)
OBD_FREE(lmm, lmm_size);
RETURN(rc);
static int
__mdd_lookup(const struct lu_env *env, struct md_object *pobj,
- const char *name, const struct lu_fid* fid, int mask)
+ const char *name, struct lu_fid* fid, int mask)
{
+ const struct dt_key *key = (const struct dt_key *)name;
struct mdd_object *mdd_obj = md2mdd_obj(pobj);
struct dt_object *dir = mdd_object_child(mdd_obj);
struct dt_rec *rec = (struct dt_rec *)fid;
- const struct dt_key *key = (const struct dt_key *)name;
+ struct timeval start;
int rc;
ENTRY;
+ mdd_lprocfs_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
if (mdd_is_dead_obj(mdd_obj))
RETURN(-ESTALE);
LBUG();
}
-#if 0
- if (mask == MAY_EXEC)
- rc = mdd_exec_permission_lite(env, mdd_obj);
- else
-#endif
- rc = mdd_permission_internal(env, mdd_obj, mask);
+ rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
if (rc)
RETURN(rc);
- if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))
+ if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
mdd_object_capa(env, mdd_obj));
- else
+ if (rc == 0)
+ fid_be_to_cpu(fid, fid);
+ } else
rc = -ENOTDIR;
+ mdd_lprocfs_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
RETURN(rc);
}
int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
- struct mdd_object *child, struct md_attr *ma,
+ struct mdd_object *child, struct md_attr *ma,
struct thandle *handle)
{
int rc;
ENTRY;
- /* update attributes for child.
+ /*
+ * Update attributes for child.
+ *
* FIXME:
* (1) the valid bits should be converted between Lustre and Linux;
* (2) maybe, the child attributes should be set in OSD when creation.
RETURN(rc);
if (S_ISDIR(ma->ma_attr.la_mode)) {
- /* add . and .. for newly created dir */
+ /* Add "." and ".." for newly created dir */
mdd_ref_add_internal(env, child, handle);
rc = __mdd_index_insert_only(env, child, mdo2fid(child),
dot, handle, BYPASS_CAPA);
static int mdd_create_sanity_check(const struct lu_env *env,
struct md_object *pobj,
- const char *name, struct md_attr *ma)
+ const char *name,
+ struct md_attr *ma,
+ int lookup)
{
struct mdd_thread_info *info = mdd_env_info(env);
struct lu_attr *la = &info->mti_la;
RETURN(-ENOENT);
/*
- * Check if the name already exist, though it will be checked
- * in _index_insert also, for avoiding rolling back if exists
- * _index_insert.
+ * In some cases this lookup is not needed - we know before if name
+ * exists or not because MDT performs lookup for it.
*/
- rc = __mdd_lookup_locked(env, pobj, name, fid,
- MAY_WRITE | MAY_EXEC);
- if (rc != -ENOENT)
- RETURN(rc ? : -EEXIST);
+ /* XXX disable that lookup temporary */
+ if (0 && lookup) {
+ /*
+ * Check if the name already exist, though it will be checked in
+ * _index_insert also, for avoiding rolling back if exists
+ * _index_insert.
+ */
+ rc = __mdd_lookup_locked(env, pobj, name, fid,
+ MAY_WRITE | MAY_EXEC);
+ if (rc != -ENOENT)
+ RETURN(rc ? : -EEXIST);
+ } else {
+ /*
+ * Check if has WRITE permission for the parent.
+ */
+ rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
+ if (rc)
+ RETURN(rc);
+ }
/* sgid check */
- mdd_read_lock(env, obj);
rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
- mdd_read_unlock(env, obj);
if (rc != 0)
RETURN(rc);
}
switch (ma->ma_attr.la_mode & S_IFMT) {
+ case S_IFDIR: {
+ struct mdd_device *mdd = mdo2mdd(pobj);
+ if (la->la_nlink >= mdd->mdd_dt_conf.ddp_max_nlink)
+ RETURN(-EMLINK);
+ }
case S_IFREG:
- case S_IFDIR:
case S_IFLNK:
case S_IFCHR:
case S_IFBLK:
static int mdd_create(const struct lu_env *env,
struct md_object *pobj, const char *name,
struct md_object *child,
- struct md_create_spec *spec,
+ struct md_op_spec *spec,
struct md_attr* ma)
{
- struct mdd_device *mdd = mdo2mdd(pobj);
+ struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
struct mdd_object *son = md2mdd_obj(child);
- struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
+ struct mdd_device *mdd = mdo2mdd(pobj);
struct lu_attr *attr = &ma->ma_attr;
struct lov_mds_md *lmm = NULL;
struct thandle *handle;
int rc, created = 0, inserted = 0, lmm_size = 0;
+ struct dynlock_handle *dlh;
struct timeval start;
ENTRY;
- mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE);
+ mdd_lprocfs_time_start(mdd, &start, LPROC_MDD_CREATE);
+
/*
* Two operations have to be performed:
*
* 2. insert (__mdd_index_insert(), lookup again)
*/
- /* sanity checks before big job */
- rc = mdd_create_sanity_check(env, pobj, name, ma);
+ /* Sanity checks before big job. */
+ rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
if (rc)
RETURN(rc);
- /* no RPC inside the transaction, so OST objects should be created at
- * first */
+ /*
+ * No RPC inside the transaction, so OST objects should be created at
+ * first.
+ */
if (S_ISREG(attr->la_mode)) {
rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
spec, attr);
mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
handle = mdd_trans_start(env, mdd);
if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ GOTO(out_free, rc = PTR_ERR(handle));
- mdd_write_lock(env, mdd_pobj);
+ dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
+ if (dlh == NULL)
+ GOTO(out_trans, rc = -ENOMEM);
/*
- * XXX check that link can be added to the parent in mkdir case.
+ * XXX: Check that link can be added to the parent in mkdir case.
*/
mdd_write_lock(env, son);
created = 1;
#ifdef CONFIG_FS_POSIX_ACL
+ mdd_read_lock(env, mdd_pobj);
rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
+ mdd_read_unlock(env, mdd_pobj);
if (rc) {
mdd_write_unlock(env, son);
GOTO(cleanup, rc);
#endif
rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
- son, ma, handle);
+ son, ma, handle);
mdd_write_unlock(env, son);
if (rc)
/*
GOTO(cleanup, rc);
inserted = 1;
- /* replay creates has objects already */
+
+ /* Replay creates has objects already. */
if (spec->u.sp_ea.no_lov_create) {
CDEBUG(D_INFO, "we already have lov ea\n");
- rc = mdd_lov_set_md(env, mdd_pobj, son,
- (struct lov_mds_md *)spec->u.sp_ea.eadata,
- spec->u.sp_ea.eadatalen, handle, 0);
- } else
- rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
- lmm_size, handle, 0);
+ LASSERT(lmm == NULL);
+ lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
+ lmm_size = spec->u.sp_ea.eadatalen;
+ }
+ rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
if (rc) {
CERROR("error on stripe info copy %d \n", rc);
GOTO(cleanup, rc);
}
+ if (lmm && lmm_size > 0) {
+ /* Set Lov here, do not get lmm again later */
+ memcpy(ma->ma_lmm, lmm, lmm_size);
+ ma->ma_lmm_size = lmm_size;
+ ma->ma_valid |= MA_LOV;
+ }
if (S_ISLNK(attr->la_mode)) {
struct dt_object *dt = mdd_object_child(son);
buf = mdd_buf_get_const(env, target_name, sym_len);
rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
mdd_object_capa(env, son));
+
if (rc == sym_len)
rc = 0;
else
- rc = -EFAULT;
+ GOTO(cleanup, rc = -EFAULT);
}
- *la_copy = ma->ma_attr;
- la_copy->la_valid = LA_CTIME | LA_MTIME;
- rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
+ *la = ma->ma_attr;
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
if (rc)
GOTO(cleanup, rc);
- /* return attr back */
+ /* Return attr back. */
rc = mdd_attr_get_internal_locked(env, son, ma);
+ EXIT;
cleanup:
if (rc && created) {
int rc2 = 0;
mdd_write_unlock(env, son);
}
}
- /* finish mdd_lov_create() stuff */
- mdd_lov_create_finish(env, mdd, rc);
- if (lmm)
- OBD_FREE(lmm, lmm_size);
- mdd_write_unlock(env, mdd_pobj);
+
+ mdd_pdo_write_unlock(env, mdd_pobj, dlh);
+out_trans:
mdd_trans_stop(env, mdd, rc, handle);
- mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
- RETURN(rc);
+out_free:
+ if (lmm && !spec->u.sp_ea.no_lov_create)
+ OBD_FREE(lmm, lmm_size);
+ /* Finish mdd_lov_create() stuff */
+ mdd_lov_create_finish(env, mdd, rc);
+ mdd_lprocfs_time_end(mdd, &start, LPROC_MDD_CREATE);
+ return rc;
}
+/*
+ * Get locks on parents in proper order
+ * RETURN: < 0 - error, rename_order if successful
+ */
+enum rename_order {
+ MDD_RN_SAME,
+ MDD_RN_SRCTGT,
+ MDD_RN_TGTSRC
+};
-static int mdd_rename_lock(const struct lu_env *env,
- struct mdd_device *mdd,
- struct mdd_object *src_pobj,
- struct mdd_object *tgt_pobj)
+static int mdd_rename_order(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *src_pobj,
+ struct mdd_object *tgt_pobj)
{
+ /* order of locking, 1 - tgt-src, 0 - src-tgt*/
int rc;
ENTRY;
- if (src_pobj == tgt_pobj) {
- mdd_write_lock(env, src_pobj);
- RETURN(0);
- }
+ if (src_pobj == tgt_pobj)
+ RETURN(MDD_RN_SAME);
/* compared the parent child relationship of src_p&tgt_p */
if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
- mdd_lock2(env, src_pobj, tgt_pobj);
- RETURN(0);
+ rc = MDD_RN_SRCTGT;
} else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
- mdd_lock2(env, tgt_pobj, src_pobj);
- RETURN(0);
- }
-
- rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
- if (rc < 0)
- RETURN(rc);
+ rc = MDD_RN_TGTSRC;
+ } else {
+ rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
+ if (rc == -EREMOTE)
+ rc = 0;
- if (rc == 1) {
- mdd_lock2(env, tgt_pobj, src_pobj);
- RETURN(0);
+ if (rc == 1)
+ rc = MDD_RN_TGTSRC;
+ else if (rc == 0)
+ rc = MDD_RN_SRCTGT;
}
- mdd_lock2(env, src_pobj, tgt_pobj);
- RETURN(0);
-}
-
-static void mdd_rename_unlock(const struct lu_env *env,
- struct mdd_object *src_pobj,
- struct mdd_object *tgt_pobj)
-{
- mdd_write_unlock(env, src_pobj);
- if (src_pobj != tgt_pobj)
- mdd_write_unlock(env, tgt_pobj);
+ RETURN(rc);
}
static int mdd_rename_sanity_check(const struct lu_env *env,
RETURN(-ENOENT);
/* The sobj maybe on the remote, check parent permission only here */
- rc = mdd_permission_internal(env, src_pobj, MAY_WRITE | MAY_EXEC);
+ rc = mdd_permission_internal_locked(env, src_pobj, NULL,
+ MAY_WRITE | MAY_EXEC);
if (rc)
RETURN(rc);
rc = mdd_may_create(env, tgt_pobj, NULL,
(src_pobj != tgt_pobj));
} else {
- mdd_read_lock(env, tobj);
rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
(src_pobj != tgt_pobj));
if (rc == 0)
if (S_ISDIR(mdd_object_type(tobj))
&& mdd_dir_is_empty(env, tobj))
rc = -ENOTEMPTY;
- mdd_read_unlock(env, tobj);
}
RETURN(rc);
struct md_object *tobj, const char *tname,
struct md_attr *ma)
{
- struct mdd_device *mdd = mdo2mdd(src_pobj);
+ struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
+ struct mdd_device *mdd = mdo2mdd(src_pobj);
struct mdd_object *mdd_sobj = NULL;
struct mdd_object *mdd_tobj = NULL;
- struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
+ struct dynlock_handle *sdlh, *tdlh;
struct thandle *handle;
int is_dir;
int rc;
RETURN(PTR_ERR(handle));
/* FIXME: Should consider tobj and sobj too in rename_lock. */
- rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj);
- if (rc)
+ rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
+ if (rc < 0)
GOTO(cleanup_unlocked, rc);
+ /* Get locks in determined order */
+ if (rc == MDD_RN_SAME) {
+ sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+ /* check hashes to determine do we need one lock or two */
+ if (mdd_name2hash(sname) != mdd_name2hash(tname))
+ tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+ else
+ tdlh = sdlh;
+ } else if (rc == MDD_RN_SRCTGT) {
+ sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+ tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+ } else {
+ tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+ sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+ }
+ if (sdlh == NULL || tdlh == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+
rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
lf, is_dir, mdd_tobj);
if (rc)
if (rc)
GOTO(cleanup, rc);
+ *la = ma->ma_attr;
mdd_sobj = mdd_object_find(env, mdd, lf);
- *la_copy = ma->ma_attr;
- la_copy->la_valid = LA_CTIME;
if (mdd_sobj) {
- /*XXX: how to update ctime for remote sobj? */
- rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle);
+ la->la_valid = LA_CTIME;
+
+ /* XXX: How to update ctime for remote sobj? */
+ rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
if (rc)
GOTO(cleanup, rc);
}
if (tobj && lu_object_exists(&tobj->mo_lu)) {
mdd_write_lock(env, mdd_tobj);
mdd_ref_del_internal(env, mdd_tobj, handle);
- /* remove dot reference */
+
+ /* Remove dot reference. */
if (is_dir)
mdd_ref_del_internal(env, mdd_tobj, handle);
- la_copy->la_valid = LA_CTIME;
- rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
+ la->la_valid = LA_CTIME;
+ rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
if (rc)
GOTO(cleanup, rc);
GOTO(cleanup, rc);
}
- la_copy->la_valid = LA_CTIME | LA_MTIME;
- rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle, 0);
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
if (rc)
GOTO(cleanup, rc);
if (mdd_spobj != mdd_tpobj) {
- la_copy->la_valid = LA_CTIME | LA_MTIME;
- rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle, 0);
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
+ handle, 0);
}
+ EXIT;
cleanup:
- mdd_rename_unlock(env, mdd_spobj, mdd_tpobj);
+ if (likely(tdlh) && sdlh != tdlh)
+ mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
+ if (likely(sdlh))
+ mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
cleanup_unlocked:
mdd_trans_stop(env, mdd, rc, handle);
if (mdd_sobj)
mdd_object_put(env, mdd_sobj);
- RETURN(rc);
+ return rc;
}
struct md_dir_operations mdd_dir_ops = {