struct md_device_operations {
int (*mdo_root_get)(struct md_device *m, struct ll_fid *f);
- int (*mdo_mkdir)(struct md_object *o, const char *name);
+ int (*mdo_mkdir)(struct md_object *o, const char *name,
+ struct md_object *child);
};
struct mdt_device {
struct mdt_object {
struct lu_object_header mot_header;
struct md_object mot_obj;
- /*
- * lock handle for dlm lock.
- */
- struct lustre_handle mot_lh;
};
+struct mdt_lock_handle {
+ struct lustre_handle mlh_lh;
+ ldlm_mode_t mlh_mode;
+};
+
+void mdt_lock_handle_init(struct mdt_lock_handle *lh);
+void mdt_lock_handle_fini(struct mdt_lock_handle *lh);
+
struct mdd_object {
struct md_object mod_obj;
};
MDT_REP_BUF_NR_MAX = 8
};
+enum {
+ MDT_LH_PARENT,
+ MDT_LH_CHILD,
+ MDT_LH_NR
+};
+
/*
* Common data shared by mdt-level handlers. This is allocated per-thread to
* reduce stack consumption.
*/
struct mdt_thread_info {
- struct mdt_device *mti_mdt;
+ struct mdt_device *mti_mdt;
/*
* number of buffers in reply message.
*/
- int mti_rep_buf_nr;
+ int mti_rep_buf_nr;
/*
* sizes of reply buffers.
*/
- int mti_rep_buf_size[MDT_REP_BUF_NR_MAX];
+ int mti_rep_buf_size[MDT_REP_BUF_NR_MAX];
/*
* Body for "habeo corpus" operations.
*/
- struct mds_body *mti_body;
+ struct mds_body *mti_body;
/*
* Host object. This is released at the end of mdt_handler().
*/
- struct mdt_object *mti_object;
+ struct mdt_object *mti_object;
/*
* Additional fail id that can be set by handler. Passed to
* target_send_reply().
*/
- int mti_fail_id;
- /*
- * Offset of incoming buffers. 0 for top-level request processing. +ve
- * for intent handling.
- */
- int mti_offset;
+ int mti_fail_id;
+ /*
+ * A couple of lock handles.
+ */
+ struct mdt_lock_handle mti_lh[MDT_LH_NR];
+
};
int fid_lock(const struct ll_fid *, struct lustre_handle *, ldlm_mode_t);
lu_object_put(&o->mot_obj.mo_lu);
}
+static struct ll_fid *mdt_object_fid(struct mdt_object *o)
+{
+ return lu_object_fid(&o->mot_obj.mo_lu);
+}
+
+static int mdt_object_lock(struct mdt_object *o, struct mdt_lock_handle *lh)
+{
+ LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
+ LASSERT(lh->mlh_mode != LCK_MINMODE);
+
+ return fid_lock(mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
+}
+
+static void mdt_object_unlock(struct mdt_object *o, struct mdt_lock_handle *lh)
+{
+ if (lustre_handle_is_used(&lh->mlh_lh)) {
+ fid_unlock(mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
+ lh->mlh_lh.cookie = 0;
+ }
+}
+
+struct mdt_object *mdt_object_find_lock(struct mdt_device *d, struct ll_fid *f,
+ struct mdt_lock_handle *lh)
+{
+ struct mdt_object *o;
+
+ o = mdt_object_find(d, f);
+ if (!IS_ERR(o)) {
+ int result;
+
+ result = mdt_object_lock(o, lh);
+ if (result != 0) {
+ mdt_object_put(o);
+ o = ERR_PTR(result);
+ }
+ }
+ return o;
+}
+
struct mdt_handler {
const char *mh_name;
int mh_fail_id;
RETURN(result);
}
+void mdt_lock_handle_init(struct mdt_lock_handle *lh)
+{
+ lh->mlh_lh.cookie = 0ull;
+ lh->mlh_mode = LCK_MINMODE;
+}
+
+void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
+{
+ LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
+}
+
static void mdt_thread_info_init(struct mdt_thread_info *info)
{
+ int i;
+
memset(info, 0, sizeof *info);
info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
/*
* Poison size array.
*/
- for (info->mti_rep_buf_nr = 0;
- info->mti_rep_buf_nr < MDT_REP_BUF_NR_MAX; info->mti_rep_buf_nr++)
- info->mti_rep_buf_size[info->mti_rep_buf_nr] = ~0;
+ for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
+ info->mti_rep_buf_size[i] = ~0;
+ info->mti_rep_buf_nr = i;
+ for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
+ mdt_lock_handle_init(&info->mti_lh[i]);
}
static void mdt_thread_info_fini(struct mdt_thread_info *info)
{
+ int i;
+
if (info->mti_object != NULL) {
mdt_object_put(info->mti_object);
info->mti_object = NULL;
}
+ for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
+ mdt_lock_handle_fini(&info->mti_lh[i]);
}
static int mds_msg_check_version(struct lustre_msg *msg)
.ldo_object_print = mdt_object_print
};
-static struct ll_fid *mdt_object_fid(struct mdt_object *o)
-{
- return lu_object_fid(&o->mot_obj.mo_lu);
-}
-
-static int mdt_object_lock(struct mdt_object *o, ldlm_mode_t mode)
+struct md_object *mdt_object_child(struct mdt_object *o)
{
- return fid_lock(mdt_object_fid(o), &o->mot_lh, mode);
+ return lu2md(lu_object_next(&o->mot_obj.mo_lu));
}
-static void mdt_object_unlock(struct mdt_object *o, ldlm_mode_t mode)
+static inline struct md_device_operations *mdt_child_ops(struct mdt_device *d)
{
- fid_unlock(mdt_object_fid(o), &o->mot_lh, mode);
+ return d->mdt_child->md_ops;
}
-struct md_object *mdt_object_child(struct mdt_object *o)
+int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
+ struct ll_fid *pfid, const char *name, struct ll_fid *cfid)
{
- return lu2md(lu_object_next(&o->mot_obj.mo_lu));
-}
+ struct mdt_object *o;
+ struct mdt_object *child;
+ struct mdt_lock_handle *lh;
-int mdt_mkdir(struct mdt_device *d, struct ll_fid *pfid, const char *name)
-{
- struct mdt_object *o;
int result;
- o = mdt_object_find(d, pfid);
+ (lh = &info->mti_lh[MDT_LH_PARENT])->mlh_mode = LCK_PW;
+
+ o = mdt_object_find_lock(d, pfid, lh);
if (IS_ERR(o))
- return PTR_ERR(o);
- result = mdt_object_lock(o, LCK_PW);
- if (result == 0) {
- result = d->mdt_child->md_ops->mdo_mkdir(mdt_object_child(o),
- name);
- mdt_object_unlock(o, LCK_PW);
- }
- mdt_object_put(o);
+ return PTR_ERR(o);
+
+ child = mdt_object_find(d, cfid);
+ if (!IS_ERR(child)) {
+ result = mdt_child_ops(d)->mdo_mkdir(mdt_object_child(o), name,
+ mdt_object_child(child));
+ mdt_object_put(child);
+ } else
+ result = PTR_ERR(child);
+ mdt_object_unlock(o, lh);
+ mdt_object_put(o);
return result;
}