- lots of cleanups and fixes in comments.
struct lustre_handle *lh, ldlm_mode_t mode)
{
{
- /* FIXME: this is debug stuff, remove it later. */
+ /* XXX: this is debug stuff, remove it later. */
struct ldlm_lock *lock = ldlm_handle2lock(lh);
if (!lock) {
CERROR("Invalid lock handle "LPX64"\n",
/* As not all attributes are kept under update lock, e.g.
owner/group/acls are under lookup lock, we need both
ibits for GETATTR. */
- /* For CMD, UPDATA lock and LOOKUP lock can not be got
+
+ /* For CMD, UPDATE lock and LOOKUP lock can not be got
* at the same for cross-object, so we can not match
* the 2 lock at the same time FIXME: but how to handle
* the above situation */
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
child = parent;
CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
- ", ldlm_rep=%p\n",
- PFID(mdt_object_fid(child)), ldlm_rep);
+ ", ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep);
if (is_resent) {
/* Do not take lock for resent case. */
} else {
mdt_lock_handle_init(lhc);
lhc->mlh_mode = LCK_CR;
- /* object's name is on another MDS,
- * no lookup lock is needed here but update is */
+
+ /*
+ * Object's name is on another MDS, no lookup lock is
+ * needed here but update is.
+ */
child_bits &= ~MDS_INODELOCK_LOOKUP;
//child_bits |= MDS_INODELOCK_UPDATE;
rc = mdt_object_lock(info, child, lhc, child_bits);
}
if (rc == 0) {
- /* finally, we can get attr for child. */
+ /* Finally, we can get attr for child. */
rc = mdt_getattr_internal(info, child);
if (rc != 0)
mdt_object_unlock(info, child, lhc, 1);
return rc ? rc : rc1;
}
-static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
+static int mdt_reint_internal(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc,
+ __u32 op)
{
struct req_capsule *pill = &info->mti_pill;
struct mdt_device *mdt = info->mti_mdt;
* allocating response, caller anyway may want to get ldlm_reply from it
* and will get oops.
*/
- if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
+ if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
RETURN(-EFAULT);
- }
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
struct mdt_client_data *mcd;
mcd = req->rq_export->exp_mdt_data.med_mcd;
if (mcd->mcd_last_xid == req->rq_xid) {
- mdt_reconstruct(info);
+ mdt_reconstruct(info, lhc);
RETURN(lustre_msg_get_status(req->rq_repmsg));
}
DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
mcd->mcd_last_xid);
}
- rc = mdt_reint_rec(info);
+ rc = mdt_reint_rec(info, lhc);
RETURN(rc);
}
opc = mdt_reint_opcode(info, reint_fmts);
if (opc >= 0) {
- rc = mdt_reint_internal(info, opc);
+ /*
+ * No lock possible here from client to pass it to reint code
+ * path.
+ */
+ rc = mdt_reint_internal(info, NULL, opc);
} else
rc = opc;
ENTRY;
if (lustre_handle_is_used(handle)) {
-
if (decref)
fid_unlock(mdt_object_fid(o), handle, mode);
else
struct ldlm_lock **lockp,
int flags)
{
- struct mdt_lock_handle tmp_lock = { {0}, };
- struct mdt_lock_handle *lhc = &tmp_lock;
+ struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
struct ldlm_lock *new_lock = NULL;
__u64 child_bits;
struct ldlm_reply *ldlm_rep;
struct ldlm_lock **lockp,
int flags)
{
- long opc;
- int rc;
- struct ldlm_reply *rep;
+ struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
+ struct ldlm_reply *rep;
+ long opc;
+ int rc;
static const struct req_format *intent_fmts[REINT_MAX] = {
[REINT_CREATE] = &RQF_LDLM_INTENT_CREATE,
RETURN(-EPROTO);
}
- rc = mdt_reint_internal(info, opc);
+ /* Get lock from request for possible resent case. */
+ mdt_fixup_resent(&info->mti_pill, *lockp, NULL, lhc);
+
+ rc = mdt_reint_internal(info, lhc, opc);
rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
if (rep == NULL)
RETURN(-EFAULT);
- /* mdc expect this in any case */
+
+ /* MDC expects this in any case */
if (rc != 0)
mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
/* cross-ref case, the lock should be returned to the client */
if (rc == -EREMOTE) {
- struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_NEW];
- LASSERT(lhc->mlh_lh.cookie != 0);
- rep->lock_policy_res2 = 0;
- return mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+ LASSERT(lustre_handle_is_used(&lhc->mlh_lh));
+ rep->lock_policy_res2 = 0;
+ return mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
}
rep->lock_policy_res2 = rc;
};
enum {
- MDT_LH_PARENT,
- MDT_LH_CHILD,
- MDT_LH_OLD,
- MDT_LH_NEW,
+ MDT_LH_PARENT, /* parent lockh */
+ MDT_LH_CHILD, /* child lockh */
+ MDT_LH_OLD, /* old lockh for rename */
+ MDT_LH_NEW, /* new lockh for rename */
+ MDT_LH_RMT, /* used for return lh to caller */
MDT_LH_NR
};
__u32 mti_has_trans:1, /* has txn already? */
mti_no_need_trans:1;
- /* opdata for mdt_open(), has the same as ldlm_reply:lock_policy_res1.
- * mdt_update_last_rcvd() stores this value onto disk for recovery
- * when mdt_trans_stop_cb() is called.
+ /* opdata for mdt_reint_open(), has the same as
+ * ldlm_reply:lock_policy_res1. mdt_update_last_rcvd() stores this
+ * value onto disk for recovery when mdt_trans_stop_cb() is called.
*/
__u64 mti_opdata;
- /* temporary stuff used by thread to save stack consumption.
- * if something is in a union, make sure they do not conflict */
+ /* temporary stuff used by thread to save stack consumption. if
+ * something is in a union, make sure they do not conflict */
struct lu_fid mti_tmp_fid1;
struct lu_fid mti_tmp_fid2;
int mdt_close_unpack(struct mdt_thread_info *info);
int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op);
-int mdt_reint_rec(struct mdt_thread_info *);
+int mdt_reint_rec(struct mdt_thread_info *, struct mdt_lock_handle *);
void mdt_pack_size2body(struct mdt_body *b, const struct lu_attr *attr,
struct mdt_object *o);
void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
void mdt_lock_handle_init(struct mdt_lock_handle *lh);
void mdt_lock_handle_fini(struct mdt_lock_handle *lh);
-void mdt_reconstruct(struct mdt_thread_info *);
+void mdt_reconstruct(struct mdt_thread_info *, struct mdt_lock_handle *);
int mdt_fs_setup(const struct lu_context *, struct mdt_device *);
void mdt_fs_cleanup(const struct lu_context *, struct mdt_device *);
struct mdt_object *o,
struct mdt_lock_handle *child_lockh);
-int mdt_open(struct mdt_thread_info *info);
+int mdt_reint_open(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc);
struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle);
int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o,
void mdt_shrink_reply(struct mdt_thread_info *info, int offset);
int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *,
const struct md_attr *);
-void mdt_reconstruct_open(struct mdt_thread_info *);
+void mdt_reconstruct_open(struct mdt_thread_info *, struct mdt_lock_handle *);
void mdt_dump_lmm(int level, const struct lov_mds_md *lmm);
struct mdt_client_data *mcd);
-/* TODO: much of the code here can be shared with mdt_open()
+/* TODO: much of the code here can be shared with mdt_reint_open()
* and mdt_mfd_open(). This will be done later. :-)
*/
-void mdt_reconstruct_open(struct mdt_thread_info *info)
+void mdt_reconstruct_open(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
{
struct req_capsule *pill = &info->mti_pill;
struct ptlrpc_request *req = mdt_info_req(info);
mdt_req_from_mcd(req, med->med_mcd);
mdt_set_disposition(info, ldlm_rep, mcd->mcd_last_data);
- CERROR("This is re-CCCCCCCCCCConstruct open: disp="LPX64", result=%d\n",
- ldlm_rep->lock_policy_res1,
- req->rq_status);
+ CERROR("This is reconstruct open: disp="LPX64", result=%d\n",
+ ldlm_rep->lock_policy_res1, req->rq_status);
if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) && req->rq_status) {
- /* we did not create successfully, return error to client. */
+ /* We did not create successfully, return error to client. */
mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
GOTO(out, 0);
}
-
- lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
+
+ /*
+ * XXX: is this correct to set here transno and status to zero? This is
+ * especially suspicious after calling mdt_req_from_mcd() above, which
+ * set these fields already.
+ */
lustre_msg_set_transno(req->rq_repmsg, 0);
lustre_msg_set_status(req->rq_repmsg, 0);
+
ldlm_rep->lock_policy_res1 = 0;
ldlm_rep->lock_policy_res2 = 0;
- result = mdt_open(info);
+ result = mdt_reint_open(info, lhc);
req->rq_status = result;
EXIT;
out:
RETURN(rc);
}
-/* cross-ref request. Currently it can only be a pure open (w/o create) */
-int mdt_cross_open(struct mdt_thread_info* info, const struct lu_fid *fid,
- struct ldlm_reply *rep, __u32 flags)
+int mdt_pin(struct mdt_thread_info* info)
+{
+ ENTRY;
+ RETURN(-EOPNOTSUPP);
+}
+
+/* Cross-ref request. Currently it can only be a pure open (w/o create) */
+static int mdt_cross_open(struct mdt_thread_info* info,
+ const struct lu_fid *fid,
+ struct ldlm_reply *rep, __u32 flags)
{
struct md_attr *ma = &info->mti_attr;
struct mdt_object *o;
if (rc == 0)
rc = mdt_mfd_open(info, NULL, o, flags, 0, rep);
} else if (rc == 0) {
- /* FIXME: something wrong here
- * lookup was positive but there
- * is no object! */
+ /*
+ * FIXME: something wrong here lookup was positive but there is
+ * no object!
+ */
CERROR("Cross-ref object doesn't exists!\n");
rc = -EFAULT;
} else {
- /* FIXME: something wrong here
- * the object is on another MDS! */
+ /* FIXME: something wrong here the object is on another MDS! */
CERROR("The object isn't on this server! FLD error?\n");
rc = -EFAULT;
}
RETURN(rc);
}
-int mdt_pin(struct mdt_thread_info* info)
-{
- ENTRY;
- RETURN(-EOPNOTSUPP);
-}
-
-int mdt_open(struct mdt_thread_info *info)
+int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
{
struct mdt_device *mdt = info->mti_mdt;
struct ptlrpc_request *req = mdt_info_req(info);
lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
- /* this is a replay request. */
+ /* This is a replay request. */
result = mdt_open_by_fid(info, ldlm_rep);
if (result != -ENOENT)
GOTO(out, result);
- /* We didn't find the correct object, so we
- * need to re-create it via a regular replay. */
+ /*
+ * We didn't find the correct object, so we need to re-create it
+ * via a regular replay.
+ */
if (!(create_flags & MDS_OPEN_CREAT)) {
DEBUG_REQ(D_ERROR, req,"OPEN_CREAT not in open replay");
GOTO(out, result = -EFAULT);
if (result == -ENOENT || result == -ESTALE) {
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
if (result == -ESTALE) {
- /*ESTALE means the parent is a dead(unlinked) dir,
- *so it should return -ENOENT to in accordance
- *with the original mds implementaion.*/
+ /*
+ * ESTALE means the parent is a dead(unlinked) dir, so
+ * it should return -ENOENT to in accordance with the
+ * original mds implementaion.
+ */
GOTO(out_parent, result = -ENOENT);
}
if (!(create_flags & MDS_OPEN_CREAT))
*child_fid = *info->mti_rr.rr_fid2;
/* new object will be created. see the following */
} else {
+ /*
+ * Check for O_EXCL is moved to the mdt_mfd_open, we need to
+ * return FID back in that case.
+ */
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
- /* check for O_EXCL is moved to the mdt_mfd_open, we need to
- * return FID back in that case */
}
child = mdt_object_find(info->mti_ctxt, mdt, child_fid);
GOTO(out_parent, result = PTR_ERR(child));
if (result == -ENOENT) {
- /* not found and with MDS_OPEN_CREAT: let's create it */
+ /* Not found and with MDS_OPEN_CREAT: let's create it. */
mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
result = mdo_create(info->mti_ctxt,
mdt_object_child(parent),
}
created = 1;
} else {
- /* we have to get attr & lov ea for this object*/
+ /* We have to get attr & lov ea for this object */
result = mo_attr_get(info->mti_ctxt,
mdt_object_child(child), ma);
+ /*
+ * The object is on remote node, return its FID for remote open.
+ */
if (result == -EREMOTE) {
- /* the object is on remote node
- * return its FID for remote open */
- struct mdt_lock_handle *lhc;
int rc;
- lhc = &info->mti_lh[MDT_LH_NEW];
- mdt_lock_handle_init(lhc);
- lhc->mlh_mode = LCK_CR;
- rc = mdt_object_lock(info, child, lhc,
- MDS_INODELOCK_LOOKUP);
+
+ /*
+ * Check if this lock already was sent to client and
+ * this is resent case. For resent case do not take lock
+ * again, use what is already granted.
+ */
+ LASSERT(lhc != NULL);
+
+ if (lustre_handle_is_used(&lhc->mlh_lh)) {
+ LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
+ MSG_RESENT);
+
+ struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh);
+ if (!lock) {
+ CERROR("Invalid lock handle "LPX64"\n",
+ lhc->mlh_lh.cookie);
+ LBUG();
+ }
+ LASSERT(fid_res_name_eq(mdt_object_fid(child),
+ &lock->l_resource->lr_name));
+ LDLM_LOCK_PUT(lock);
+ rc = 0;
+ } else {
+ mdt_lock_handle_init(lhc);
+ lhc->mlh_mode = LCK_CR;
+
+ rc = mdt_object_lock(info, child, lhc,
+ MDS_INODELOCK_LOOKUP);
+ }
repbody->fid1 = *mdt_object_fid(child);
repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
if (rc != 0)
GOTO(out_child, result);
}
}
+
/* Try to open it now. */
result = mdt_mfd_open(info, parent, child, create_flags,
created, ldlm_rep);
//mds_steal_ack_locks(req);
}
-static void mdt_reconstruct_generic(struct mdt_thread_info *mti)
+static void mdt_reconstruct_generic(struct mdt_thread_info *mti,
+ struct mdt_lock_handle *lhc)
{
struct ptlrpc_request *req = mdt_info_req(mti);
struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
return mdt_req_from_mcd(req, med->med_mcd);
}
-static void mdt_reconstruct_create(struct mdt_thread_info *mti)
+static void mdt_reconstruct_create(struct mdt_thread_info *mti,
+ struct mdt_lock_handle *lhc)
{
struct ptlrpc_request *req = mdt_info_req(mti);
struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
mdt_object_put(mti->mti_ctxt, child);
}
-static void mdt_reconstruct_setattr(struct mdt_thread_info *mti)
+static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
+ struct mdt_lock_handle *lhc)
{
struct ptlrpc_request *req = mdt_info_req(mti);
struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
mdt_object_put(mti->mti_ctxt, obj);
}
-static void mdt_reconstruct_with_shrink(struct mdt_thread_info *mti)
+static void mdt_reconstruct_with_shrink(struct mdt_thread_info *mti,
+ struct mdt_lock_handle *lhc)
{
- mdt_reconstruct_generic(mti);
+ mdt_reconstruct_generic(mti, lhc);
mdt_shrink_reply(mti, REPLY_REC_OFF + 1);
}
-typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti);
+typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti,
+ struct mdt_lock_handle *lhc);
static mdt_reconstructor reconstructors[REINT_MAX] = {
[REINT_SETATTR] = mdt_reconstruct_setattr,
[REINT_OPEN] = mdt_reconstruct_open
};
-void mdt_reconstruct(struct mdt_thread_info *mti)
+void mdt_reconstruct(struct mdt_thread_info *mti,
+ struct mdt_lock_handle *lhc)
{
ENTRY;
- reconstructors[mti->mti_rr.rr_opcode](mti);
+ reconstructors[mti->mti_rr.rr_opcode](mti, lhc);
EXIT;
}
return(rc);
}
-static int mdt_reint_setattr(struct mdt_thread_info *info)
+static int mdt_reint_setattr(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
{
struct md_attr *ma = &info->mti_attr;
struct mdt_reint_record *rr = &info->mti_rr;
return rc;
}
-static int mdt_reint_create(struct mdt_thread_info *info)
+static int mdt_reint_create(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
{
int rc;
ENTRY;
}
-static int mdt_reint_unlink(struct mdt_thread_info *info)
+static int mdt_reint_unlink(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
{
struct mdt_reint_record *rr = &info->mti_rr;
struct ptlrpc_request *req = mdt_info_req(info);
struct lu_fid *child_fid = &info->mti_tmp_fid1;
struct mdt_object *mp;
struct mdt_object *mc;
- struct mdt_lock_handle *lhp;
- struct mdt_lock_handle *lhc;
+ struct mdt_lock_handle *parent_lh;
+ struct mdt_lock_handle *child_lh;
int rc;
ENTRY;
GOTO(out, rc = -ENOENT);
/* step 1: lock the parent */
- lhp = &info->mti_lh[MDT_LH_PARENT];
- lhp->mlh_mode = LCK_EX;
- mp = mdt_object_find_lock(info, rr->rr_fid1, lhp,
+ parent_lh = &info->mti_lh[MDT_LH_PARENT];
+ parent_lh->mlh_mode = LCK_EX;
+ mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
MDS_INODELOCK_UPDATE);
if (IS_ERR(mp))
GOTO(out, rc = PTR_ERR(mp));
GOTO(out_unlock_parent, rc);
}
- /*step 2: find & lock the child */
- lhc = &info->mti_lh[MDT_LH_CHILD];
- lhc->mlh_mode = LCK_EX;
+ /* step 2: find & lock the child */
+ child_lh = &info->mti_lh[MDT_LH_CHILD];
+ child_lh->mlh_mode = LCK_EX;
rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mp),
rr->rr_name, child_fid);
if (rc != 0)
GOTO(out_unlock_parent, rc);
/* we will lock the child regardless it is local or remote. No harm. */
- mc = mdt_object_find_lock(info, child_fid, lhc, MDS_INODELOCK_FULL);
+ mc = mdt_object_find_lock(info, child_fid, child_lh, MDS_INODELOCK_FULL);
if (IS_ERR(mc))
GOTO(out_unlock_parent, rc = PTR_ERR(mc));
mdt_fail_write(info->mti_ctxt, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_UNLINK_WRITE);
- /*Now we can only make sure we need MA_INODE, in mdd layer,
- *will check whether need MA_LOV and MA_COOKIE*/
+ /*
+ * Now we can only make sure we need MA_INODE, in mdd layer, will check
+ * whether need MA_LOV and MA_COOKIE.
+ */
ma->ma_need = MA_INODE;
rc = mdo_unlink(info->mti_ctxt, mdt_object_child(mp),
mdt_object_child(mc), rr->rr_name, ma);
GOTO(out_unlock_child, rc);
out_unlock_child:
- mdt_object_unlock_put(info, mc, lhc, rc);
+ mdt_object_unlock_put(info, mc, child_lh, rc);
out_unlock_parent:
- mdt_object_unlock_put(info, mp, lhp, rc);
+ mdt_object_unlock_put(info, mp, parent_lh, rc);
out:
mdt_shrink_reply(info, REPLY_REC_OFF + 1);
return rc;
}
-static int mdt_reint_link(struct mdt_thread_info *info)
+static int mdt_reint_link(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
{
struct mdt_reint_record *rr = &info->mti_rr;
struct ptlrpc_request *req = mdt_info_req(info);
RETURN(rc);
}
-static int mdt_reint_rename(struct mdt_thread_info *info)
+static int mdt_reint_rename(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
{
struct mdt_reint_record *rr = &info->mti_rr;
struct req_capsule *pill = &info->mti_pill;
return rc;
}
-
-typedef int (*mdt_reinter)(struct mdt_thread_info *info);
+typedef int (*mdt_reinter)(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc);
static mdt_reinter reinters[REINT_MAX] = {
[REINT_SETATTR] = mdt_reint_setattr,
[REINT_LINK] = mdt_reint_link,
[REINT_UNLINK] = mdt_reint_unlink,
[REINT_RENAME] = mdt_reint_rename,
- [REINT_OPEN] = mdt_open
+ [REINT_OPEN] = mdt_reint_open
};
-int mdt_reint_rec(struct mdt_thread_info *info)
+int mdt_reint_rec(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
{
int rc;
ENTRY;
- rc = reinters[info->mti_rr.rr_opcode](info);
+ rc = reinters[info->mti_rr.rr_opcode](info, lhc);
RETURN(rc);
}