extern void mds_start_transno(struct mds_obd *mds);
extern int mds_finish_transno(struct mds_obd *mds, void *handle,
struct ptlrpc_request *req, int rc);
+extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
+ struct ldlm_res_id *p1_res_id,
+ struct ldlm_res_id *p2_res_id,
+ struct ldlm_res_id *c1_res_id,
+ struct ldlm_res_id *c2_res_id,
+ struct lustre_handle *p1_lockh,
+ struct lustre_handle *p2_lockh,
+ struct lustre_handle *c1_lockh,
+ struct lustre_handle *c2_lockh);
int mds_open(struct mds_update_record *rec, int offset,
- struct ptlrpc_request *req)
+ struct ptlrpc_request *req, struct lustre_handle *child_lockh)
{
struct mds_obd *mds = mds_req2mds(req);
struct obd_device *obd = req->rq_export->exp_obd;
struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0);
- struct obd_ucred uc;
- struct obd_run_ctxt saved;
- struct lustre_handle lockh;
- int lock_mode;
struct file *file;
struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
struct dentry *dchild, *parent;
struct mds_export_data *med;
struct mds_file_data *mfd = NULL;
struct vfsmount *mnt = mds->mds_vfsmnt;
+ struct ldlm_res_id child_res_id = { .name = {0} };
+ struct lustre_handle parent_lockh;
__u32 flags;
- struct list_head *tmp;
- int rc = 0;
+ int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
ENTRY;
#warning replay of open needs to be redone
RETURN(-ENOMEM);
}
- lock_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
- parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode,
- &lockh);
+ /* Step 1: Find and lock the parent */
+ parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
+ parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
+ &parent_lockh);
if (IS_ERR(parent)) {
rc = PTR_ERR(parent);
CERROR("parent lookup error %d\n", rc);
RETURN(rc);
}
dir = parent->d_inode;
+ if (dir == NULL)
+ GOTO(out_step_1, rc = -ENOENT);
- down(&dir->i_sem);
+ /* Step 2: Lookup the child */
dchild = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3),
parent, req->rq_reqmsg->buflens[3] - 1);
- if (IS_ERR(dchild)) {
- up(&dir->i_sem);
- GOTO(out_unlock, rc = PTR_ERR(dchild));
- }
+ if (IS_ERR(dchild))
+ GOTO(out_step_2, rc = PTR_ERR(dchild));
if (dchild->d_inode)
rep->lock_policy_res1 |= IT_OPEN_POS;
else
rep->lock_policy_res1 |= IT_OPEN_NEG;
- /* Negative dentry, just create the file */
- if (dchild->d_inode) {
- up(&dir->i_sem);
- if ((rec->ur_flags & (O_CREAT|O_EXCL)) == (O_CREAT|O_EXCL)) {
- mds_pack_inode2fid(&body->fid1, dchild->d_inode);
- mds_pack_inode2body(body, dchild->d_inode);
- if (S_ISREG(dchild->d_inode->i_mode))
- rc = mds_pack_md(obd, req->rq_repmsg, 3, body,
- dchild->d_inode);
- if (rc == 0)
- rc = -EEXIST;
- GOTO(out_ldput, rc);
- }
- } else if ((rec->ur_flags & O_CREAT) && !dchild->d_inode) {
+ /* Step 3: If the child was negative, and we're supposed to,
+ * create it. */
+ if ((rec->ur_flags & O_CREAT) && !dchild->d_inode) {
int err;
void *handle;
mds_start_transno(mds);
if (IS_ERR(handle)) {
rc = PTR_ERR(handle);
mds_finish_transno(mds, handle, req, rc);
- up(&dir->i_sem);
- GOTO(out_ldput, rc);
+ GOTO(out_step_3, rc);
}
rc = vfs_create(dir, dchild, rec->ur_mode);
- up(&dir->i_sem);
rc = mds_finish_transno(mds, handle, req, rc);
err = fsfilt_commit(obd, dir, handle);
if (rc || err) {
CERROR("error on commit: err = %d\n", err);
if (!rc)
rc = err;
- GOTO(out_ldput, rc);
+ GOTO(out_step_3, rc);
}
+ created = 1;
+ child_mode = LCK_PW;
} else if (!dchild->d_inode) {
- up(&dir->i_sem);
- GOTO(out_ldput, rc = 0);
- }
+ /* It's negative and we weren't supposed to create it */
+ GOTO(out_step_3, rc = -ENOENT);
+ }
+
+ /* Step 4: It's positive, so lock the child */
+ child_res_id.name[0] = dchild->d_inode->i_ino;
+ child_res_id.name[1] = dchild->d_inode->i_generation;
+ reacquire:
+ lock_flags = 0;
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
+ child_res_id, LDLM_PLAIN, NULL, 0, child_mode,
+ &lock_flags, ldlm_completion_ast,
+ mds_blocking_ast, NULL, NULL, child_lockh);
+ if (rc != ELDLM_OK) {
+ CERROR("ldlm_cli_enqueue: %d\n", rc);
+ GOTO(out_step_3, rc = -EIO);
+ }
- /*
- * It already exists.
- */
mds_pack_inode2fid(&body->fid1, dchild->d_inode);
mds_pack_inode2body(body, dchild->d_inode);
+ if (S_ISREG(dchild->d_inode->i_mode)) {
+ rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
+ if (rc)
+ GOTO(out_step_4, rc);
+ } else {
+ /* If this isn't a regular file, we can't open it. */
+ GOTO(out_step_3, rc = 0); /* returns the lock to the client */
+ }
- if (!S_ISREG(dchild->d_inode->i_mode))
- GOTO(out_ldput, rc = 0);
+ if (!created && (rec->ur_flags & O_CREAT) && (rec->ur_flags & O_EXCL)) {
+ /* File already exists, we didn't just create it, and we
+ * were passed O_EXCL; err-or. */
+ GOTO(out_step_3, rc = -EEXIST); // returns a lock to the client
+ }
- rc = mds_pack_md(obd, req->rq_repmsg, 3, body, dchild->d_inode);
- if (rc) {
- CERROR("failure to get EA for %ld\n", dchild->d_inode->i_ino);
- GOTO(out_ldput, req->rq_status = rc);
+ /* If we're opening a file without an EA, the client needs a write
+ * lock. */
+ if (child_mode != LCK_PW && S_ISREG(dchild->d_inode->i_mode) &&
+ !(body->valid & OBD_MD_FLEASIZE)) {
+ ldlm_lock_decref(child_lockh, child_mode);
+ child_mode = LCK_PW;
+ goto reacquire;
}
+ /* Step 5: Open it */
rep->lock_policy_res1 |= IT_OPEN_OPEN;
mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
if (!mfd) {
CERROR("mds: out of memory\n");
- GOTO(out_ldput, req->rq_status = -ENOMEM);
+ GOTO(out_step_4, req->rq_status = -ENOMEM);
}
flags = rec->ur_flags;
mntget(mnt);
file = dentry_open(dchild, mnt, flags & ~O_DIRECT & ~O_TRUNC);
if (IS_ERR(file))
- GOTO(out_unlock, req->rq_status = PTR_ERR(file));
+ GOTO(out_step_5, rc = PTR_ERR(file));
file->private_data = mfd;
mfd->mfd_file = file;
list_add(&mfd->mfd_list, &med->med_open_head);
spin_unlock(&med->med_open_lock);
- out_unlock:
- l_dput(parent);
- ldlm_lock_decref(&lockh, lock_mode);
- if (rc && rc != -EEXIST && mfd != NULL) {
+ body->handle.addr = (__u64)(unsigned long)mfd;
+ body->handle.cookie = mfd->mfd_servercookie;
+ CDEBUG(D_INODE, "file %p: mfd %p, cookie "LPX64"\n",
+ mfd->mfd_file, mfd, mfd->mfd_servercookie);
+ GOTO(out_step_2, rc = 0); /* returns a lock to the client */
+
+ out_step_5:
+ if (mfd != NULL) {
kmem_cache_free(mds_file_cache, mfd);
mfd = NULL;
}
- if (rc)
- RETURN(rc);
-
- out_pack:
- if (mfd) {
- body->handle.addr = (__u64)(unsigned long)mfd;
- body->handle.cookie = mfd->mfd_servercookie;
- CDEBUG(D_INODE, "file %p: mfd %p, cookie "LPX64"\n",
- mfd->mfd_file, mfd, mfd->mfd_servercookie);
- }
- RETURN(0);
-
- out_ldput:
+ out_step_4:
+ ldlm_lock_decref(child_lockh, child_mode);
+ out_step_3:
l_dput(dchild);
- goto out_unlock;
+ out_step_2:
+ l_dput(parent);
+ ldlm_lock_decref(&parent_lockh, parent_mode);
+ out_step_1:
+ RETURN(rc);
}