Whamcloud - gitweb
- merge b_intent changes back to b_md. Two profound metadata changes
authorbraam <braam>
Mon, 3 Feb 2003 08:07:56 +0000 (08:07 +0000)
committerbraam <braam>
Mon, 3 Feb 2003 08:07:56 +0000 (08:07 +0000)
  have eliminated over 600 lines of code:
  - the lock server on the MDS passes its locks back to the client
    instead of giving the client new locks.  This eliminates numerous
    races (for which were implementing complicated fixes earlier).

  - the lock ordering model was changed slightly to introduce a parent
    child older and only then fid order.  This eliminates numerous
    double lock acquisitions from the MDS.

- OST semaphores restored

lustre/mds/mds_open.c

index f4bac4a..330fd26 100644 (file)
@@ -50,17 +50,22 @@ extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
 extern void mds_start_transno(struct mds_obd *mds);
 extern int mds_finish_transno(struct mds_obd *mds, void *handle,
                               struct ptlrpc_request *req, int rc);
+extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
+                                 struct ldlm_res_id *p1_res_id,
+                                 struct ldlm_res_id *p2_res_id,
+                                 struct ldlm_res_id *c1_res_id,
+                                 struct ldlm_res_id *c2_res_id,
+                                 struct lustre_handle *p1_lockh,
+                                 struct lustre_handle *p2_lockh,
+                                 struct lustre_handle *c1_lockh,
+                                 struct lustre_handle *c2_lockh);
 
 int mds_open(struct mds_update_record *rec, int offset,
-             struct ptlrpc_request *req)
+             struct ptlrpc_request *req, struct lustre_handle *child_lockh)
 {
         struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
         struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0);
-        struct obd_ucred uc;
-        struct obd_run_ctxt saved;
-        struct lustre_handle lockh;
-        int lock_mode;
         struct file *file;
         struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
         struct dentry *dchild, *parent;
@@ -68,9 +73,10 @@ int mds_open(struct mds_update_record *rec, int offset,
         struct mds_export_data *med;
         struct mds_file_data *mfd = NULL;
         struct vfsmount *mnt = mds->mds_vfsmnt;
+        struct ldlm_res_id child_res_id = { .name = {0} };
+        struct lustre_handle parent_lockh;
         __u32 flags;
-        struct list_head *tmp;
-        int rc = 0;
+        int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
         ENTRY;
 
 #warning replay of open needs to be redone
@@ -99,9 +105,10 @@ int mds_open(struct mds_update_record *rec, int offset,
                 RETURN(-ENOMEM);
         }
 
-        lock_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
-        parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode,
-                                       &lockh);
+        /* Step 1: Find and lock the parent */
+        parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
+        parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
+                                       &parent_lockh);
         if (IS_ERR(parent)) {
                 rc = PTR_ERR(parent);
                 CERROR("parent lookup error %d\n", rc);
@@ -109,34 +116,23 @@ int mds_open(struct mds_update_record *rec, int offset,
                 RETURN(rc);
         }
         dir = parent->d_inode;
+        if (dir == NULL)
+                GOTO(out_step_1, rc = -ENOENT);
 
-        down(&dir->i_sem);
+        /* Step 2: Lookup the child */
         dchild = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3),
                                 parent, req->rq_reqmsg->buflens[3] - 1);
-        if (IS_ERR(dchild)) {
-                up(&dir->i_sem);
-                GOTO(out_unlock, rc = PTR_ERR(dchild));
-        }
+        if (IS_ERR(dchild))
+                GOTO(out_step_2, rc = PTR_ERR(dchild));
 
         if (dchild->d_inode)
                 rep->lock_policy_res1 |= IT_OPEN_POS;
         else
                 rep->lock_policy_res1 |= IT_OPEN_NEG;
 
-        /* Negative dentry, just create the file */
-        if (dchild->d_inode) { 
-                up(&dir->i_sem);
-               if ((rec->ur_flags & (O_CREAT|O_EXCL)) == (O_CREAT|O_EXCL)) { 
-                        mds_pack_inode2fid(&body->fid1, dchild->d_inode);
-                        mds_pack_inode2body(body, dchild->d_inode);
-                        if (S_ISREG(dchild->d_inode->i_mode))
-                                rc = mds_pack_md(obd, req->rq_repmsg, 3, body,
-                                                 dchild->d_inode);
-                        if (rc == 0)
-                                rc = -EEXIST;
-                       GOTO(out_ldput, rc);
-                }
-        } else if ((rec->ur_flags & O_CREAT) && !dchild->d_inode) {
+        /* Step 3: If the child was negative, and we're supposed to,
+         * create it. */
+        if ((rec->ur_flags & O_CREAT) && !dchild->d_inode) {
                 int err;
                 void *handle;
                 mds_start_transno(mds);
@@ -145,44 +141,70 @@ int mds_open(struct mds_update_record *rec, int offset,
                 if (IS_ERR(handle)) {
                         rc = PTR_ERR(handle);
                         mds_finish_transno(mds, handle, req, rc);
-                        up(&dir->i_sem);
-                        GOTO(out_ldput, rc);
+                        GOTO(out_step_3, rc);
                 }
                 rc = vfs_create(dir, dchild, rec->ur_mode);
-                up(&dir->i_sem);
                 rc = mds_finish_transno(mds, handle, req, rc);
                 err = fsfilt_commit(obd, dir, handle);
                 if (rc || err) {
                         CERROR("error on commit: err = %d\n", err);
                         if (!rc)
                                 rc = err;
-                        GOTO(out_ldput, rc);
+                        GOTO(out_step_3, rc);
                 }
+                created = 1;
+                child_mode = LCK_PW;
         } else if (!dchild->d_inode) {
-                up(&dir->i_sem);
-                GOTO(out_ldput, rc = 0);
-        } 
+                /* It's negative and we weren't supposed to create it */
+                GOTO(out_step_3, rc = -ENOENT);
+        }
+
+        /* Step 4: It's positive, so lock the child */
+        child_res_id.name[0] = dchild->d_inode->i_ino;
+        child_res_id.name[1] = dchild->d_inode->i_generation;
+ reacquire:
+        lock_flags = 0;
+        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
+                              child_res_id, LDLM_PLAIN, NULL, 0, child_mode,
+                              &lock_flags, ldlm_completion_ast,
+                              mds_blocking_ast, NULL, NULL, child_lockh);
+        if (rc != ELDLM_OK) {
+                CERROR("ldlm_cli_enqueue: %d\n", rc);
+                GOTO(out_step_3, rc = -EIO);
+        }
 
-        /*
-         * It already exists.
-         */
         mds_pack_inode2fid(&body->fid1, dchild->d_inode);
         mds_pack_inode2body(body, dchild->d_inode);
+        if (S_ISREG(dchild->d_inode->i_mode)) {
+                rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
+                if (rc)
+                        GOTO(out_step_4, rc);
+        } else {
+                /* If this isn't a regular file, we can't open it. */
+                GOTO(out_step_3, rc = 0); /* returns the lock to the client */
+        }
 
-        if (!S_ISREG(dchild->d_inode->i_mode))
-                GOTO(out_ldput, rc = 0);
+        if (!created && (rec->ur_flags & O_CREAT) && (rec->ur_flags & O_EXCL)) {
+                /* File already exists, we didn't just create it, and we
+                 * were passed O_EXCL; err-or. */
+                GOTO(out_step_3, rc = -EEXIST); // returns a lock to the client
+        }
 
-        rc = mds_pack_md(obd, req->rq_repmsg, 3, body, dchild->d_inode);
-        if (rc) {
-                CERROR("failure to get EA for %ld\n", dchild->d_inode->i_ino);
-                GOTO(out_ldput, req->rq_status = rc);
+        /* If we're opening a file without an EA, the client needs a write
+         * lock. */
+        if (child_mode != LCK_PW && S_ISREG(dchild->d_inode->i_mode) &&
+            !(body->valid & OBD_MD_FLEASIZE)) {
+                ldlm_lock_decref(child_lockh, child_mode);
+                child_mode = LCK_PW;
+                goto reacquire;
         }
 
+        /* Step 5: Open it */
         rep->lock_policy_res1 |= IT_OPEN_OPEN;
         mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
         if (!mfd) {
                 CERROR("mds: out of memory\n");
-                GOTO(out_ldput, req->rq_status = -ENOMEM);
+                GOTO(out_step_4, req->rq_status = -ENOMEM);
         }
 
         flags = rec->ur_flags;
@@ -190,7 +212,7 @@ int mds_open(struct mds_update_record *rec, int offset,
         mntget(mnt);
         file = dentry_open(dchild, mnt, flags & ~O_DIRECT & ~O_TRUNC);
         if (IS_ERR(file))
-                GOTO(out_unlock, req->rq_status = PTR_ERR(file));
+                GOTO(out_step_5, rc = PTR_ERR(file));
 
         file->private_data = mfd;
         mfd->mfd_file = file;
@@ -199,26 +221,24 @@ int mds_open(struct mds_update_record *rec, int offset,
         list_add(&mfd->mfd_list, &med->med_open_head);
         spin_unlock(&med->med_open_lock);
 
- out_unlock:
-        l_dput(parent);
-        ldlm_lock_decref(&lockh, lock_mode);
-        if (rc && rc != -EEXIST && mfd != NULL) {
+        body->handle.addr = (__u64)(unsigned long)mfd;
+        body->handle.cookie = mfd->mfd_servercookie;
+        CDEBUG(D_INODE, "file %p: mfd %p, cookie "LPX64"\n",
+               mfd->mfd_file, mfd, mfd->mfd_servercookie);
+        GOTO(out_step_2, rc = 0); /* returns a lock to the client */
+
+ out_step_5:
+        if (mfd != NULL) {
                 kmem_cache_free(mds_file_cache, mfd);
                 mfd = NULL;
         }
-        if (rc)
-                RETURN(rc);
-
- out_pack:
-        if (mfd) {
-                body->handle.addr = (__u64)(unsigned long)mfd;
-                body->handle.cookie = mfd->mfd_servercookie;
-                CDEBUG(D_INODE, "file %p: mfd %p, cookie "LPX64"\n",
-                       mfd->mfd_file, mfd, mfd->mfd_servercookie);
-        }
-        RETURN(0);
-
- out_ldput:
+ out_step_4:
+        ldlm_lock_decref(child_lockh, child_mode);
+ out_step_3:
         l_dput(dchild);
-        goto out_unlock;
+ out_step_2:
+        l_dput(parent);
+        ldlm_lock_decref(&parent_lockh, parent_mode);
+ out_step_1:
+        RETURN(rc);
 }