const char *name, const struct lu_fid* fid, int mask)
{
struct mdd_object *mdd_obj = md2mdd_obj(pobj);
+ struct dynlock_handle *dlh;
int rc;
- mdd_read_lock(env, mdd_obj);
+ dlh = mdd_pdo_read_lock(env, mdd_obj, name);
rc = __mdd_lookup(env, pobj, name, fid, mask);
- mdd_read_unlock(env, mdd_obj);
+ mdd_pdo_read_unlock(env, mdd_obj, dlh);
return rc;
}
if (parent == NULL) {
if (pf != NULL)
*pf = *pfid;
- GOTO(out, rc = EREMOTE);
+ GOTO(out, rc = -EREMOTE);
} else if (IS_ERR(parent))
GOTO(out, rc = PTR_ERR(parent));
p1 = parent;
/*check pobj may create or not*/
if (need_check)
- rc = mdd_permission_internal(env, pobj,
+ rc = mdd_permission_internal_locked(env, pobj,
MAY_WRITE | MAY_EXEC);
RETURN(rc);
} else if (tmp_la->la_uid == uc->mu_fsuid) {
return 0;
} else {
+ mdd_read_lock(env, pobj);
rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
+ mdd_read_unlock(env, pobj);
if (rc)
return rc;
else if (!(tmp_la->la_mode & S_ISVTX))
RETURN(-EPERM);
if (need_check)
- rc = mdd_permission_internal(env, pobj,
- MAY_WRITE | MAY_EXEC);
+ rc = mdd_permission_internal_locked(env, pobj,
+ MAY_WRITE |
+ MAY_EXEC);
}
RETURN(rc);
}
RETURN(rc);
}
-static void mdd_lock2(const struct lu_env *env,
- struct mdd_object *o0, struct mdd_object *o1)
-{
- mdd_write_lock(env, o0);
- mdd_write_lock(env, o1);
-}
-
-static void mdd_unlock2(const struct lu_env *env,
- struct mdd_object *o0, struct mdd_object *o1)
-{
- mdd_write_unlock(env, o1);
- mdd_write_unlock(env, o0);
-}
-
/* insert new index, add reference if isdir, update times */
static int __mdd_index_insert(const struct lu_env *env,
struct mdd_object *pobj, const struct lu_fid *lf,
rc = -ENOTDIR;
if (rc == 0) {
- if (isdir)
+ if (isdir) {
+ mdd_write_lock(env, pobj);
mdd_ref_add_internal(env, pobj, th);
+ mdd_write_unlock(env, pobj);
+ }
#if 0
la->la_valid = LA_MTIME|LA_CTIME;
la->la_atime = ma->ma_attr.la_atime;
rc = next->do_index_ops->dio_delete(env, next,
(struct dt_key *)name,
handle, capa);
- if (rc == 0 && is_dir)
+ if (rc == 0 && is_dir) {
+ mdd_write_lock(env, pobj);
mdd_ref_del_internal(env, pobj, handle);
+ mdd_write_unlock(env, pobj);
+ }
} else
rc = -ENOTDIR;
mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
struct mdd_device *mdd = mdo2mdd(src_obj);
struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
struct thandle *handle;
+ struct dynlock_handle *dlh;
int rc;
ENTRY;
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- mdd_lock2(env, mdd_tobj, mdd_sobj);
+ dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
+ mdd_write_lock(env, mdd_sobj);
rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
if (rc)
GOTO(out, rc);
la_copy->la_valid = LA_CTIME | LA_MTIME;
- rc = mdd_attr_set_internal(env, mdd_tobj, la_copy, handle, 0);
+ rc = mdd_attr_set_internal_locked(env, mdd_tobj, la_copy, handle, 0);
out:
- mdd_unlock2(env, mdd_tobj, mdd_sobj);
+ mdd_write_unlock(env, mdd_sobj);
+ mdd_pdo_write_unlock(env, mdd_tobj, dlh);
mdd_trans_stop(env, mdd, rc, handle);
RETURN(rc);
}
struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
struct thandle *handle;
+ struct dynlock_handle *dlh;
int rc, is_dir;
ENTRY;
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- mdd_lock2(env, mdd_pobj, mdd_cobj);
+ dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
+ mdd_write_lock(env, mdd_cobj);
rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
if (rc)
}
la_copy->la_valid = LA_CTIME | LA_MTIME;
- rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
+ rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
if (rc)
GOTO(cleanup, rc);
strlen("unlinked"), "unlinked", 0,
NULL, NULL);
cleanup:
- mdd_unlock2(env, mdd_pobj, mdd_cobj);
+ mdd_write_unlock(env, mdd_cobj);
+ mdd_pdo_write_unlock(env, mdd_pobj, dlh);
mdd_trans_stop(env, mdd, rc, handle);
RETURN(rc);
}
else
RETURN(0);
#endif
- RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
+ RETURN(mdd_permission_internal_locked(env, obj,
+ MAY_WRITE | MAY_EXEC));
}
static int mdd_name_insert(const struct lu_env *env,
struct mdd_object *mdd_obj = md2mdd_obj(pobj);
struct mdd_device *mdd = mdo2mdd(pobj);
struct thandle *handle;
+ struct dynlock_handle *dlh;
int rc;
ENTRY;
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- mdd_write_lock(env, mdd_obj);
+ dlh = mdd_pdo_write_lock(env, mdd_obj, name);
rc = mdd_ni_sanity_check(env, pobj, name, fid);
if (rc)
GOTO(out_unlock, rc);
BYPASS_CAPA);
out_unlock:
- mdd_write_unlock(env, mdd_obj);
-
+ mdd_pdo_write_unlock(env, mdd_obj, dlh);
mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
RETURN(rc);
}
rc = __mdd_lookup(env, pobj, name, fid, MAY_WRITE | MAY_EXEC);
RETURN(rc);
#endif
- RETURN(mdd_permission_internal(env, obj, MAY_WRITE | MAY_EXEC));
+ RETURN(mdd_permission_internal_locked(env, obj,
+ MAY_WRITE | MAY_EXEC));
}
static int mdd_name_remove(const struct lu_env *env,
struct mdd_device *mdd = mdo2mdd(pobj);
struct mdd_object *mdd_obj = md2mdd_obj(pobj);
struct thandle *handle;
+ struct dynlock_handle *dlh;
int rc;
ENTRY;
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- mdd_write_lock(env, mdd_obj);
+ dlh = mdd_pdo_write_lock(env, mdd_obj, name);
rc = mdd_nr_sanity_check(env, pobj, name);
if (rc)
GOTO(out_unlock, rc);
BYPASS_CAPA);
out_unlock:
- mdd_write_unlock(env, mdd_obj);
-
+ mdd_pdo_write_unlock(env, mdd_obj, dlh);
mdd_trans_stop(env, mdd, rc, handle);
RETURN(rc);
}
struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
struct thandle *handle;
+ struct dynlock_handle *dlh;
int rc;
ENTRY;
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
+ dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
if (mdd_tobj)
- mdd_lock2(env, mdd_tpobj, mdd_tobj);
- else
- mdd_write_lock(env, mdd_tpobj);
+ mdd_write_lock(env, mdd_tobj);
/*TODO rename sanity checking*/
rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
mdd_ref_del_internal(env, mdd_tobj, handle);
cleanup:
if (tobj)
- mdd_unlock2(env, mdd_tpobj, mdd_tobj);
- else
- mdd_write_unlock(env, mdd_tpobj);
+ mdd_write_unlock(env, mdd_tobj);
+ mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
mdd_trans_stop(env, mdd, rc, handle);
RETURN(rc);
}
rc = mdd_exec_permission_lite(env, mdd_obj);
else
#endif
- rc = mdd_permission_internal(env, mdd_obj, mask);
+ rc = mdd_permission_internal_locked(env, mdd_obj, mask);
if (rc)
RETURN(rc);
struct lov_mds_md *lmm = NULL;
struct thandle *handle;
int rc, created = 0, inserted = 0, lmm_size = 0;
+ struct dynlock_handle *dlh;
struct timeval start;
ENTRY;
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- mdd_write_lock(env, mdd_pobj);
+ dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
/*
* XXX check that link can be added to the parent in mkdir case.
created = 1;
#ifdef CONFIG_FS_POSIX_ACL
+ mdd_read_lock(env, mdd_pobj);
rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
+ mdd_read_unlock(env, mdd_pobj);
if (rc) {
mdd_write_unlock(env, son);
GOTO(cleanup, rc);
*la_copy = ma->ma_attr;
la_copy->la_valid = LA_CTIME | LA_MTIME;
- rc = mdd_attr_set_internal(env, mdd_pobj, la_copy, handle, 0);
+ rc = mdd_attr_set_internal_locked(env, mdd_pobj, la_copy, handle, 0);
if (rc)
GOTO(cleanup, rc);
mdd_lov_create_finish(env, mdd, rc);
if (lmm)
OBD_FREE(lmm, lmm_size);
- mdd_write_unlock(env, mdd_pobj);
+ mdd_pdo_write_unlock(env, mdd_pobj, dlh);
mdd_trans_stop(env, mdd, rc, handle);
mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
RETURN(rc);
}
-static int mdd_rename_lock(const struct lu_env *env,
- struct mdd_device *mdd,
- struct mdd_object *src_pobj,
- struct mdd_object *tgt_pobj)
+/*
+ * Get locks on parents in proper order
+ * RETURN: < 0 - error, rename_order if successful
+ */
+enum rename_order {
+ MDD_RN_SAME,
+ MDD_RN_SRCTGT,
+ MDD_RN_TGTSRC
+};
+
+static int mdd_rename_order(const struct lu_env *env, struct mdd_device *mdd,
+ struct mdd_object *src_pobj,
+ struct mdd_object *tgt_pobj)
{
+ /* order of locking, 1 - tgt-src, 0 - src-tgt*/
int rc;
ENTRY;
- if (src_pobj == tgt_pobj) {
- mdd_write_lock(env, src_pobj);
- RETURN(0);
- }
+ if (src_pobj == tgt_pobj)
+ RETURN(MDD_RN_SAME);
/* compared the parent child relationship of src_p&tgt_p */
if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
- mdd_lock2(env, src_pobj, tgt_pobj);
- RETURN(0);
+ rc = MDD_RN_SRCTGT;
} else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
- mdd_lock2(env, tgt_pobj, src_pobj);
- RETURN(0);
- }
-
- rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
- if (rc < 0)
- RETURN(rc);
-
- if (rc == 1) {
- mdd_lock2(env, tgt_pobj, src_pobj);
- RETURN(0);
+ rc = MDD_RN_TGTSRC;
+ } else {
+ rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
+ if (rc == -EREMOTE)
+ rc == 0;
+
+ if (rc == 1)
+ rc = MDD_RN_TGTSRC;
+ else if (rc == 0)
+ rc = MDD_RN_SRCTGT;
}
- mdd_lock2(env, src_pobj, tgt_pobj);
- RETURN(0);
-}
-
-static void mdd_rename_unlock(const struct lu_env *env,
- struct mdd_object *src_pobj,
- struct mdd_object *tgt_pobj)
-{
- mdd_write_unlock(env, src_pobj);
- if (src_pobj != tgt_pobj)
- mdd_write_unlock(env, tgt_pobj);
+ RETURN(rc);
}
static int mdd_rename_sanity_check(const struct lu_env *env,
RETURN(-ENOENT);
/* The sobj maybe on the remote, check parent permission only here */
- rc = mdd_permission_internal(env, src_pobj, MAY_WRITE | MAY_EXEC);
+ rc = mdd_permission_internal_locked(env, src_pobj,
+ MAY_WRITE | MAY_EXEC);
if (rc)
RETURN(rc);
struct mdd_object *mdd_sobj = NULL;
struct mdd_object *mdd_tobj = NULL;
struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
+ struct dynlock_handle *sdlh, *tdlh;
struct thandle *handle;
int is_dir;
- int rc;
+ int rc, one_lock;
ENTRY;
LASSERT(ma->ma_attr.la_mode & S_IFMT);
RETURN(PTR_ERR(handle));
/* FIXME: Should consider tobj and sobj too in rename_lock. */
- rc = mdd_rename_lock(env, mdd, mdd_spobj, mdd_tpobj);
- if (rc)
+ rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
+ if (rc < 0)
GOTO(cleanup_unlocked, rc);
+ /* get locks in determined order */
+ if (rc == MDD_RN_SAME) {
+ sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+ /* check hashes to determine do we need one lock or two */
+ if (mdd_name2hash(sname) != mdd_name2hash(tname))
+ tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+ else
+ tdlh = NULL;
+ } else if (rc == MDD_RN_SRCTGT) {
+ sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+ tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+ } else {
+ tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
+ sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
+ }
+
rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
lf, is_dir, mdd_tobj);
if (rc)
la_copy->la_valid = LA_CTIME;
if (mdd_sobj) {
/*XXX: how to update ctime for remote sobj? */
- rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy, handle);
+ rc = mdd_attr_set_internal_locked(env, mdd_sobj, la_copy,
+ handle, 1);
if (rc)
GOTO(cleanup, rc);
}
}
la_copy->la_valid = LA_CTIME | LA_MTIME;
- rc = mdd_attr_set_internal(env, mdd_spobj, la_copy, handle, 0);
+ rc = mdd_attr_set_internal_locked(env, mdd_spobj, la_copy, handle, 0);
if (rc)
GOTO(cleanup, rc);
if (mdd_spobj != mdd_tpobj) {
la_copy->la_valid = LA_CTIME | LA_MTIME;
- rc = mdd_attr_set_internal(env, mdd_tpobj, la_copy, handle, 0);
+ rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la_copy,
+ handle, 0);
}
cleanup:
- mdd_rename_unlock(env, mdd_spobj, mdd_tpobj);
+ mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
+ if (tdlh)
+ mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
cleanup_unlocked:
mdd_trans_stop(env, mdd, rc, handle);
if (mdd_sobj)
--- /dev/null
+/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * mdd/mdd_handler.c
+ * Lustre Metadata Server (mdd) routines
+ *
+ * Copyright (C) 2006 Cluster File Systems, Inc.
+ * Author: Mike Pershin <tappro@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
+ *
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
+ */
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/module.h>
+#include <lustre_ver.h>
+#include "mdd_internal.h"
+
+void mdd_write_lock(const struct lu_env *env, struct mdd_object *obj)
+{
+ struct dt_object *next = mdd_object_child(obj);
+
+ next->do_ops->do_write_lock(env, next);
+}
+
+void mdd_read_lock(const struct lu_env *env, struct mdd_object *obj)
+{
+ struct dt_object *next = mdd_object_child(obj);
+
+ next->do_ops->do_read_lock(env, next);
+}
+
+void mdd_write_unlock(const struct lu_env *env, struct mdd_object *obj)
+{
+ struct dt_object *next = mdd_object_child(obj);
+
+ next->do_ops->do_write_unlock(env, next);
+}
+
+void mdd_read_unlock(const struct lu_env *env, struct mdd_object *obj)
+{
+ struct dt_object *next = mdd_object_child(obj);
+
+ next->do_ops->do_read_unlock(env, next);
+}
+
+
+/* Methods for parallel directory locking */
+
+void mdd_pdlock_init(struct mdd_object *obj)
+{
+ dynlock_init(&obj->mod_pdlock);
+
+}
+
+unsigned long mdd_name2hash(const char *name)
+{
+ unsigned long value = 0;
+ int namelen = strlen(name);
+ int i = 0;
+ while (namelen > i) {
+ value += name[i] * (i << 7);
+ i++;
+ }
+ return value;
+}
+
+struct dynlock_handle *mdd_pdo_write_lock(const struct lu_env *env,
+ struct mdd_object *obj,
+ const char *name)
+{
+ unsigned long value = mdd_name2hash(name);
+ return dynlock_lock(&obj->mod_pdlock, value, DLT_WRITE, GFP_NOFS);
+}
+
+struct dynlock_handle *mdd_pdo_read_lock(const struct lu_env *env,
+ struct mdd_object *obj,
+ const char *name)
+{
+ unsigned long value = mdd_name2hash(name);
+ return dynlock_lock(&obj->mod_pdlock, value, DLT_READ, GFP_NOFS);
+}
+
+void mdd_pdo_write_unlock(const struct lu_env *env, struct mdd_object *obj,
+ struct dynlock_handle *dlh)
+{
+ return dynlock_unlock(&obj->mod_pdlock, dlh);
+}
+
+void mdd_pdo_read_unlock(const struct lu_env *env, struct mdd_object *obj,
+ struct dynlock_handle *dlh)
+{
+ return dynlock_unlock(&obj->mod_pdlock, dlh);
+}
+