struct lustre_handle *c1_lockh,
struct lustre_handle *c2_lockh);
-void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req)
+void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req,
+ struct lustre_handle *child_lockh)
{
struct mds_export_data *med = &req->rq_export->exp_mds_data;
struct mds_client_data *mcd = med->med_mcd;
struct dentry *parent, *child;
struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0);
struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
- int disp;
+ int disp, rc;
ENTRY;
- /* copy rc, transno and disp; steal locks */
+ ENTRY;
+ /* copy rc, transno and disp; steal locks */
req->rq_transno = mcd->mcd_last_transno;
req->rq_status = mcd->mcd_last_result;
disp = rep->lock_policy_res1 = mcd->mcd_last_data;
/* We never care about these. */
disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG);
if (!disp) {
+ EXIT;
return; /* error looking up parent or child */
}
GOTO(out_dput, 0);
}
+ if (!med->med_outstanding_reply) {
+ LBUG(); /* XXX need to get enqueue client lock */
+ }
+
+ /* get lock (write for O_CREAT, read otherwise) */
+
mds_pack_inode2fid(&body->fid1, child->d_inode);
mds_pack_inode2body(body, child->d_inode);
+ if (S_ISREG(child->d_inode->i_mode)) {
+ rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
+ child->d_inode);
+ if (rc)
+ LASSERT(rc == req->rq_status);
+ } else {
+ /* XXX need to check this case */
+ }
+ /* If we're opening a file without an EA, change to a write
+ lock (unless we already have one). */
+
/* If we have -EEXIST as the status, and we were asked to create
* exclusively, we can tell we failed because the file already existed.
- *
- * We can also tell that Phil had scallops for dinner.
*/
if (req->rq_status == -EEXIST &&
((rec->ur_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))) {
GOTO(out_dput, 0);
}
- if ((obd->obd_flags & OBD_RECOVERING) == 0) {
+ if (med->med_outstanding_reply) {
struct list_head *t;
mfd = NULL;
+ /* XXX can we just look in the old reply to find the handle in
+ * XXX O(1) here? */
list_for_each(t, &med->med_open_head) {
mfd = list_entry(t, struct mds_file_data, mfd_list);
if (mfd->mfd_xid == req->rq_xid)
out_dput:
l_dput(child);
l_dput(parent);
+ EXIT;
}
int mds_open(struct mds_update_record *rec, int offset,
struct ldlm_res_id child_res_id = { .name = {0} };
struct lustre_handle parent_lockh;
int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
+ int cleanup_phase = 0;
+ void *handle = NULL;
ENTRY;
- /* XXX macroize with reint_create, etc. */
- if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
- struct mds_client_data *mcd =
- req->rq_export->exp_mds_data.med_mcd;
- if (mcd->mcd_last_xid == req->rq_xid) {
- reconstruct_open(rec, req);
- RETURN(0);
- }
- DEBUG_REQ(D_HA, req, "no reply for resent request");
- }
+ MDS_CHECK_RESENT(req, reconstruct_open(rec, req, child_lockh));
med = &req->rq_export->exp_mds_data;
rep->lock_policy_res1 |= IT_OPEN_LOOKUP;
if (IS_ERR(parent)) {
rc = PTR_ERR(parent);
CERROR("parent lookup error %d\n", rc);
- RETURN(rc);
+ GOTO(cleanup, rc);
}
LASSERT(parent->d_inode);
+ cleanup_phase = 1; /* parent dentry and lock */
+
/* Step 2: Lookup the child */
dchild = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3),
parent, req->rq_reqmsg->buflens[3] - 1);
if (IS_ERR(dchild))
- GOTO(out_step_2, rc = PTR_ERR(dchild));
+ GOTO(cleanup, rc = PTR_ERR(dchild));
+
+ cleanup_phase = 2; /* child dentry */
if (dchild->d_inode)
rep->lock_policy_res1 |= IT_OPEN_POS;
/* Step 3: If the child was negative, and we're supposed to,
* create it. */
- if ((rec->ur_flags & O_CREAT) && !dchild->d_inode) {
- int err;
- void *handle;
+ if (!dchild->d_inode) {
+ if (!(rec->ur_flags & O_CREAT)) {
+ /* It's negative and we weren't supposed to create it */
+ GOTO(cleanup, rc = -ENOENT);
+ }
+
rep->lock_policy_res1 |= IT_OPEN_CREATE;
handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE);
if (IS_ERR(handle)) {
rc = PTR_ERR(handle);
- mds_finish_transno(mds, parent->d_inode, handle, req,
- rc, rep->lock_policy_res1);
- GOTO(out_step_3, rc);
+ handle = NULL;
+ GOTO(cleanup, rc);
}
rc = vfs_create(parent->d_inode, dchild, rec->ur_mode);
- err = mds_finish_transno(mds, parent->d_inode, handle, req, rc,
- rep->lock_policy_res1);
- if (err) {
- CERROR("error on commit: err = %d\n", err);
- if (!rc)
- rc = err;
- GOTO(out_step_3, rc);
- }
+ if (rc)
+ GOTO(cleanup, rc);
created = 1;
child_mode = LCK_PW;
- } else if (!dchild->d_inode) {
- /* It's negative and we weren't supposed to create it */
- GOTO(out_step_3, rc = -ENOENT);
}
/* Step 4: It's positive, so lock the child */
mds_blocking_ast, NULL, NULL, child_lockh);
if (rc != ELDLM_OK) {
CERROR("ldlm_cli_enqueue: %d\n", rc);
- GOTO(out_step_3, rc = -EIO);
+ GOTO(cleanup, rc = -EIO);
}
+ cleanup_phase = 3; /* child lock */
+
mds_pack_inode2fid(&body->fid1, dchild->d_inode);
mds_pack_inode2body(body, dchild->d_inode);
if (S_ISREG(dchild->d_inode->i_mode)) {
rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
if (rc)
- GOTO(out_step_4, rc);
+ GOTO(cleanup, rc);
} else {
/* If this isn't a regular file, we can't open it. */
- GOTO(out_step_3, rc = 0); /* returns the lock to the client */
+
+ /* We want to drop the child dentry, because we're not returning
+ * failure (which would do this for us in step 2), and we're not
+ * handing it off to the open file in dentry_open. */
+ l_dput(dchild);
+ GOTO(cleanup, rc = 0); /* returns the lock to the client */
}
if (!created && (rec->ur_flags & O_CREAT) && (rec->ur_flags & O_EXCL)) {
/* File already exists, we didn't just create it, and we
* were passed O_EXCL; err-or. */
- GOTO(out_step_3, rc = -EEXIST); // returns a lock to the client
+ GOTO(cleanup, rc = -EEXIST); // returns a lock to the client
}
/* If we're opening a file without an EA, the client needs a write
* lock. */
- if (child_mode != LCK_PW && S_ISREG(dchild->d_inode->i_mode) &&
- !(body->valid & OBD_MD_FLEASIZE)) {
+ if (child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) {
ldlm_lock_decref(child_lockh, child_mode);
child_mode = LCK_PW;
goto reacquire;
mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
if (!mfd) {
CERROR("mds: out of memory\n");
- GOTO(out_step_4, rc = -ENOMEM);
+ GOTO(cleanup, rc = -ENOMEM);
}
+ cleanup_phase = 4; /* mfd allocated */
+
/* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
mntget(mds->mds_vfsmnt);
file = dentry_open(dchild, mds->mds_vfsmnt,
rec->ur_flags & ~(O_DIRECT | O_TRUNC));
if (IS_ERR(file)) {
- dchild = NULL; /* prevent a double dput in step 3 */
- GOTO(out_step_5, rc = PTR_ERR(file));
+ dchild = NULL; /* prevent a double dput in step 2 */
+ GOTO(cleanup, rc = PTR_ERR(file));
}
file->private_data = mfd;
body->handle.cookie = mfd->mfd_servercookie;
CDEBUG(D_INODE, "file %p: mfd %p, cookie "LPX64"\n",
mfd->mfd_file, mfd, mfd->mfd_servercookie);
- GOTO(out_step_2, rc = 0); /* returns a lock to the client */
+ GOTO(cleanup, rc = 0); /* returns a lock to the client */
- out_step_5:
- if (mfd != NULL) {
- kmem_cache_free(mds_file_cache, mfd);
- mfd = NULL;
- }
- out_step_4:
- ldlm_lock_decref(child_lockh, child_mode);
- out_step_3:
- if (dchild)
- l_dput(dchild);
- out_step_2:
- l_dput(parent);
- if (rc) {
- ldlm_lock_decref(&parent_lockh, parent_mode);
- } else {
- memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
- sizeof(parent_lockh));
- req->rq_ack_locks[0].mode = parent_mode;
+ cleanup:
+ rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle,
+ req, rc, rep->lock_policy_res1);
+ switch (cleanup_phase) {
+ case 4:
+ if (rc)
+ kmem_cache_free(mds_file_cache, mfd);
+ case 3:
+ /* This is the same logic as in the IT_OPEN part of
+ * ldlm_intent_policy: if we found the dentry, or we tried to
+ * open it (meaning that we created, if it wasn't found), then
+ * we return the lock to the caller and client. */
+ if (!(rep->lock_policy_res1 & (IT_OPEN_OPEN | IT_OPEN_POS)))
+ ldlm_lock_decref(child_lockh, child_mode);
+ case 2:
+ if (rc)
+ l_dput(dchild);
+ case 1:
+ l_dput(parent);
+ if (rc) {
+ ldlm_lock_decref(&parent_lockh, parent_mode);
+ } else {
+ memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
+ sizeof(parent_lockh));
+ req->rq_ack_locks[0].mode = parent_mode;
+ }
}
RETURN(rc);
}