Whamcloud - gitweb
b=256
authorpschwan <pschwan>
Tue, 29 Oct 2002 05:27:13 +0000 (05:27 +0000)
committerpschwan <pschwan>
Tue, 29 Oct 2002 05:27:13 +0000 (05:27 +0000)
abstract ll_lookup2; make ll_lookup2 and ll_revalidate2 both use this
function.  For the first time, we might almost be properly cache coherant.

lustre/ChangeLog
lustre/include/linux/lustre_lite.h
lustre/llite/dcache.c
lustre/llite/dir.c
lustre/llite/namei.c
lustre/tests/lov.xml

index 53f9548..bae803f 100644 (file)
@@ -2,6 +2,7 @@ TBA
        * version v0_5_16
        * bug fixes:
          - limit Lustre IOVs to PTL_MD_MAX_IOV (611336)
+         - abstract ll_lookup2, fix ll_revalidate2 to use abstraction (256)
         * protocol change to lustre_msg: move |version| and add |flags|
         * added replay of create, unlink, link and rename operations during
           MDS failover; recovery should be much more robust now
index 825b53d..13c1e5d 100644 (file)
@@ -167,6 +167,8 @@ int ll_save_intent(struct dentry * de, struct lookup_intent * it);
 struct lookup_intent * ll_get_intent(struct dentry * de);
 ****/
 
+#define IT_RELEASED_MAGIC 0xDEADCAFE
+
 #define LL_SAVE_INTENT(de, it)                                                 \
 do {                                                                           \
         LASSERT(ll_d2d(de) != NULL);                                           \
@@ -183,10 +185,12 @@ do {                                                                           \
         it = de->d_it;                                                         \
                                                                                \
         LASSERT(ll_d2d(de) != NULL);                                           \
+        LASSERT(it->it_op != IT_RELEASED_MAGIC);                               \
                                                                                \
         CDEBUG(D_DENTRY, "D_IT UP dentry %p fsdata %p intent: %s\n",           \
                de, ll_d2d(de), ldlm_it2str(de->d_it->it_op));                  \
         de->d_it = NULL;                                                       \
+        it->it_op = IT_RELEASED_MAGIC;                                         \
         up(&ll_d2d(de)->lld_it_sem);                                           \
 } while(0)
 
index 02e78ae..0990504 100644 (file)
@@ -63,12 +63,37 @@ void ll_intent_release(struct dentry *de, struct lookup_intent *it)
                 } else
                         ldlm_lock_decref(handle, it->it_lock_mode);
         }
-        //up(&ll_d2d(de)->lld_it_sem);
+
+        if (it->it_op != IT_RELEASED_MAGIC) {
+                up(&ll_d2d(de)->lld_it_sem);
+                it->it_op = IT_RELEASED_MAGIC;
+        }
         EXIT;
 }
 
 int ll_revalidate2(struct dentry *de, int flags, struct lookup_intent *it)
 {
+        int ll_intent_lock(struct inode *parent, struct dentry *dentry,
+                           struct lookup_intent *it, void *);
+        int rc;
+        ENTRY;
+
+        rc = ll_intent_lock(de->d_parent->d_inode, de, it, NULL);
+        if (rc < 0) {
+                /* Something bad happened; overwrite it_status? */
+        }
+
+        if (it != NULL && it->it_status == 0) {
+                LL_SAVE_INTENT(de, it);
+        } else if (it != NULL) {
+                de->d_it = NULL;
+                CDEBUG(D_DENTRY,
+                       "D_IT dentry %p fsdata %p intent: %s status %d\n",
+                       de, ll_d2d(de), ldlm_it2str(it->it_op), it->it_status);
+        }
+
+        RETURN(1);
+#if 0
         struct ll_sb_info *sbi = ll_s2sbi(de->d_sb);
         struct lustre_handle lockh;
         __u64 res_id[RES_NAME_SIZE] = {0};
@@ -125,6 +150,7 @@ int ll_revalidate2(struct dentry *de, int flags, struct lookup_intent *it)
         de->d_it = it;
 
         RETURN(rc);
+#endif
 }
 
 int ll_set_dd(struct dentry *de)
index d824376..32a4e32 100644 (file)
@@ -71,7 +71,7 @@ static int ll_dir_readpage(struct file *file, struct page *page)
         struct ptlrpc_request *request;
         struct lustre_handle lockh;
         struct mds_body *body;
-        struct lookup_intent it = {IT_READDIR};
+        struct lookup_intent it = { .it_op = IT_READDIR };
 
         ENTRY;
 
@@ -81,9 +81,11 @@ static int ll_dir_readpage(struct file *file, struct page *page)
                 GOTO(readpage_out, rc);
         }
 
-        rc = ll_lock(inode, NULL, &it, &lockh);
+        rc = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, &it, LCK_PR,
+                         inode, NULL, &lockh, NULL, 0, inode, sizeof(*inode));
         request = (struct ptlrpc_request *)it.it_data;
-        ptlrpc_req_finished(request);
+        if (request)
+                ptlrpc_req_finished(request);
         if (rc != ELDLM_OK) {
                 CERROR("lock enqueue: err: %d\n", rc);
                 unlock_page(page);
index 1dcc8f1..4ff4c4e 100644 (file)
@@ -109,38 +109,6 @@ static int ll_test_inode(struct inode *inode, void *opaque)
 
 extern struct dentry_operations ll_d_ops;
 
-int ll_lock(struct inode *dir, struct dentry *dentry,
-            struct lookup_intent *it, struct lustre_handle *lockh)
-{
-        struct ll_sb_info *sbi = ll_i2sbi(dir);
-        char *tgt = NULL;
-        int tgtlen = 0;
-        int err, lock_mode;
-
-        /* CREAT needs to be tested before open (both could be set) */
-        if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SETATTR | IT_MKNOD))) {
-                lock_mode = LCK_PW;
-        } else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK |
-                                IT_RMDIR | IT_RENAME | IT_RENAME2 | IT_READLINK|
-                                IT_LINK | IT_LINK2 | IT_LOOKUP)) {
-                /* XXXphil PW for LINK2/RENAME2? */
-                lock_mode = LCK_PR;
-        } else if (it->it_op & IT_SYMLINK) {
-                lock_mode = LCK_PW;
-                tgt = it->it_data;
-                tgtlen = strlen(tgt);
-                it->it_data = NULL;
-        } else {
-                LBUG();
-                RETURN(-EINVAL);
-        }
-
-        err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, lock_mode,
-                          dir, dentry, lockh, tgt, tgtlen, dir, sizeof(*dir));
-
-        RETURN(err);
-}
-
 int ll_unlock(__u32 mode, struct lustre_handle *lockh)
 {
         ENTRY;
@@ -180,16 +148,38 @@ struct inode *ll_iget(struct super_block *sb, ino_t hash,
 }
 #endif
 
-static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
-                                 struct lookup_intent *it)
+static int ll_intent_to_lock_mode(struct lookup_intent *it)
 {
-        struct ptlrpc_request *request = NULL;
-        struct inode * inode = NULL;
-        struct ll_sb_info *sbi = ll_i2sbi(dir);
-        struct ll_read_inode2_cookie lic;
+        /* CREAT needs to be tested before open (both could be set) */
+        if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SETATTR | IT_MKNOD))) {
+                return LCK_PW;
+        } else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK |
+                                IT_RMDIR | IT_RENAME | IT_RENAME2 | IT_READLINK|
+                                IT_LINK | IT_LINK2 | IT_LOOKUP | IT_SYMLINK)) {
+                return LCK_PW;
+        }
+
+        LBUG();
+        RETURN(-EINVAL);
+}
+
+#define LL_LOOKUP_POSITIVE 1
+#define LL_LOOKUP_NEGATIVE 2
+
+typedef int (*intent_finish_cb)(int flag, struct ptlrpc_request *,
+                                struct dentry *, struct lookup_intent *,
+                                int offset, obd_id ino);
+
+int ll_intent_lock(struct inode *parent, struct dentry *dentry,
+                   struct lookup_intent *it,
+                   intent_finish_cb intent_finish)
+{
+        struct ll_sb_info *sbi = ll_i2sbi(parent);
         struct lustre_handle lockh;
-        struct lookup_intent lookup_it = { IT_LOOKUP };
-        int rc, offset;
+        struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
+        struct ptlrpc_request *request = NULL;
+        char *tgt = NULL;
+        int rc, lock_mode, tgtlen = 0, offset, flag = LL_LOOKUP_POSITIVE;
         obd_id ino = 0;
 
         ENTRY;
@@ -201,35 +191,69 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
                dentry->d_name.name, ldlm_it2str(it->it_op));
 
         if (dentry->d_name.len > EXT2_NAME_LEN)
-                RETURN(ERR_PTR(-ENAMETOOLONG));
+                RETURN(-ENAMETOOLONG);
 
-        rc = ll_lock(dir, dentry, it, &lockh);
+        lock_mode = ll_intent_to_lock_mode(it);
+        if (it->it_op & IT_SYMLINK) {
+                tgt = it->it_data;
+                tgtlen = strlen(tgt);
+                it->it_data = NULL;
+        }
+
+        rc = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, lock_mode,
+                         parent, dentry, &lockh, tgt, tgtlen, parent,
+                         sizeof(*parent));
         if (rc < 0)
-                RETURN(ERR_PTR(rc));
+                RETURN(rc);
         memcpy(it->it_lock_handle, &lockh, sizeof(lockh));
 
         request = (struct ptlrpc_request *)it->it_data;
         if (it->it_disposition) {
+                struct mds_body *mds_body;
                 int mode, symlen = 0;
                 obd_flag valid;
 
+                /* it_disposition == 1 indicates that the server performed the
+                 * intent on our behalf.  This long block is all about fixing up
+                 * the local state so that it is correct as of the moment
+                 * _before_ the operation was applied; that way, the VFS will
+                 * think that everything is normal and call Lustre's regular
+                 * FS function.
+                 *
+                 * If we're performing a creation, that means that unless the
+                 * creation failed with EEXIST, we should fake up a negative
+                 * dentry.  Likewise for the target of a hard link.
+                 *
+                 * For everything else, we want to lookup to succeed. */
+
+                /* One additional note: we add an extra reference to the request
+                 * because we need to keep it around until ll_create gets
+                 * called.  For anything else which results in
+                 * LL_LOOKUP_POSITIVE, we can do the iget() immediately with the
+                 * contents of the reply (in the intent_finish callback).  In
+                 * the create case, however, we need to wait until
+                 * ll_create_node to do the iget() or the VFS will abort with
+                 * -EEXISTS. */
+
                 offset = 1;
-                lic.lic_body = lustre_msg_buf(request->rq_repmsg, offset);
-                ino = lic.lic_body->fid1.id;
-                mode = lic.lic_body->mode;
+                mds_body = lustre_msg_buf(request->rq_repmsg, offset);
+                ino = mds_body->fid1.id;
+                mode = mds_body->mode;
                 if (it->it_op & (IT_CREAT | IT_MKDIR | IT_SYMLINK | IT_MKNOD)) {
-                        mdc_store_create_replay_data(request, dir->i_sb);
+                        mdc_store_create_replay_data(request, parent->i_sb);
                         /* For create ops, we want the lookup to be negative,
                          * unless the create failed in a way that indicates
                          * that the file is already there */
-                        if (it->it_status != -EEXIST)
-                                GOTO(negative, NULL);
+                        if (it->it_status != -EEXIST) {
+                                atomic_inc(&request->rq_refcount);
+                                GOTO(out, flag = LL_LOOKUP_NEGATIVE);
+                        }
                 } else if (it->it_op & (IT_GETATTR | IT_SETATTR | IT_LOOKUP |
                                         IT_READLINK)) {
                         /* For check ops, we want the lookup to succeed */
                         it->it_data = NULL;
                         if (it->it_status)
-                                GOTO(neg_req, NULL);
+                                GOTO(out, flag = LL_LOOKUP_NEGATIVE);
                 } else if (it->it_op & (IT_RENAME | IT_LINK)) {
                         /* For rename, we want the source lookup to succeed */
                         if (it->it_status) {
@@ -242,25 +266,27 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
                          * the file truly doesn't exist */
                         it->it_data = NULL;
                         if (it->it_status == -ENOENT)
-                                GOTO(neg_req, NULL);
-                        goto iget;
+                                GOTO(out, flag = LL_LOOKUP_NEGATIVE);
+                        GOTO(out, flag = LL_LOOKUP_POSITIVE);
                 } else if (it->it_op == IT_OPEN) {
                         it->it_data = NULL;
                         if (it->it_status && it->it_status != -EEXIST)
-                                GOTO(neg_req, NULL);
+                                GOTO(out, flag = LL_LOOKUP_NEGATIVE);
                 } else if (it->it_op & (IT_RENAME2 | IT_LINK2)) {
+                        struct mds_body *body =
+                                lustre_msg_buf(request->rq_repmsg, offset);
                         it->it_data = NULL;
                         /* This means the target lookup is negative */
-                        if (lic.lic_body->valid == 0)
-                                GOTO(neg_req, NULL);
-                        goto iget;
+                        if (body->valid == 0)
+                                GOTO(out, flag = LL_LOOKUP_NEGATIVE);
+                        GOTO(out, flag = LL_LOOKUP_POSITIVE);
                 }
 
                 /* Do a getattr now that we have the lock */
                 valid = OBD_MD_FLNOTOBD | OBD_MD_FLEASIZE;
                 if (it->it_op == IT_READLINK) {
                         valid |= OBD_MD_LINKNAME;
-                        symlen = lic.lic_body->size;
+                        symlen = mds_body->size;
                 }
                 ptlrpc_req_finished(request);
                 request = NULL;
@@ -272,13 +298,17 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
                 }
                 offset = 0;
         } else {
-                struct ll_inode_info *lli = ll_i2info(dir);
+                struct ll_inode_info *lli = ll_i2info(parent);
                 int mode;
 
+                /* it_disposition == 0 indicates that it just did a simple lock
+                 * request, for which we are very thankful.  move along with
+                 * the local lookup then. */
+
                 memcpy(&lli->lli_intent_lock_handle, &lockh, sizeof(lockh));
                 offset = 0;
 
-                ino = ll_inode_by_name(dir, dentry, &mode);
+                ino = ll_inode_by_name(parent, dentry, &mode);
                 if (!ino) {
                         CERROR("inode %*s not found by name\n",
                                dentry->d_name.len, dentry->d_name.name);
@@ -293,28 +323,60 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
                 }
         }
 
- iget:
-        lic.lic_body = lustre_msg_buf(request->rq_repmsg, offset);
-        if (S_ISREG(lic.lic_body->mode) &&
-            lic.lic_body->valid & OBD_MD_FLEASIZE) {
-                LASSERT(request->rq_repmsg->bufcount > offset);
-                lic.lic_lmm = lustre_msg_buf(request->rq_repmsg, offset + 1);
-        } else
-                lic.lic_lmm = NULL;
+        EXIT;
+ out:
+        if (intent_finish != NULL)
+                rc = intent_finish(flag, request, dentry, it, offset, ino);
+        else
+                ptlrpc_req_finished(request);
 
-        /* No rpc's happen during iget4, -ENOMEM's are possible */
-        LASSERT(ino != 0);
-        inode = ll_iget(dir->i_sb, ino, &lic);
-        if (!inode) {
-                ptlrpc_free_req(request);
+        if (it->it_op == IT_LOOKUP || rc < 0)
                 ll_intent_release(dentry, it);
-                RETURN(ERR_PTR(-ENOMEM));
+
+        return rc;
+
+ drop_req:
+        ptlrpc_free_req(request);
+ drop_lock:
+#warning FIXME: must release lock here
+        return rc;
+}
+
+static int lookup2_finish(int flag, struct ptlrpc_request *request,
+                          struct dentry *dentry, struct lookup_intent *it,
+                          int offset, obd_id ino)
+{
+        struct inode *inode = NULL;
+        struct ll_read_inode2_cookie lic;
+
+        if (flag == LL_LOOKUP_POSITIVE) {
+                ENTRY;
+                lic.lic_body = lustre_msg_buf(request->rq_repmsg, offset);
+
+                if (S_ISREG(lic.lic_body->mode) &&
+                    lic.lic_body->valid & OBD_MD_FLEASIZE) {
+                        LASSERT(request->rq_repmsg->bufcount > offset);
+                        lic.lic_lmm = lustre_msg_buf(request->rq_repmsg,
+                                                     offset + 1);
+                } else {
+                        lic.lic_lmm = NULL;
+                }
+
+                /* No rpc's happen during iget4, -ENOMEM's are possible */
+                LASSERT(ino != 0);
+                inode = ll_iget(dentry->d_sb, ino, &lic);
+                if (!inode) {
+                        /* XXX make sure that request is freed in this case;
+                         * I think it is, but double-check refcounts. -phil */
+                        RETURN(-ENOMEM);
+                }
+                EXIT;
+        } else {
+                ENTRY;
         }
 
-        EXIT;
- neg_req:
         ptlrpc_req_finished(request);
- negative:
+
         dentry->d_op = &ll_d_ops;
         if (ll_d2d(dentry) == NULL) {
                 ll_set_dd(dentry);
@@ -333,16 +395,21 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
                        it->it_status);
         }
 
-        if (it->it_op == IT_LOOKUP)
-                ll_intent_release(dentry, it);
+        RETURN(0);
+}
 
-        return NULL;
+static struct dentry *ll_lookup2(struct inode *parent, struct dentry *dentry,
+                                 struct lookup_intent *it)
+{
+        int rc;
 
- drop_req:
-        ptlrpc_free_req(request);
- drop_lock:
-#warning FIXME: must release lock here
-        return ERR_PTR(rc);
+        rc = ll_intent_lock(parent, dentry, it, lookup2_finish);
+        if (rc < 0) {
+                CERROR("ll_intent_lock: %d\n", rc);
+                return ERR_PTR(rc);
+        }
+
+        return 0;
 }
 
 static struct inode *ll_create_node(struct inode *dir, const char *name,
index b768f5d..532c1ec 100644 (file)
@@ -2,7 +2,7 @@
 <!DOCTYPE lustre>
 <lustre>
   <ldlm name='ldlm' uuid='ldlm_UUID'/>
-  <node uuid='localhost_UUID' name='localhost'>
+  <node name='localhost' uuid='localhost_UUID'>
     <profile>
       <ldlm_ref uuidref='ldlm_UUID'/>
       <network_ref uuidref='NET_localhost_tcp_UUID'/>
@@ -14,7 +14,7 @@
       <ost_ref uuidref='OST_localhost_2_UUID'/>
       <mountpoint_ref uuidref='MNT_localhost_UUID'/>
     </profile>
-    <network type='tcp' uuid='NET_localhost_tcp_UUID' name='NET_localhost_tcp'>
+    <network name='NET_localhost_tcp' uuid='NET_localhost_tcp_UUID' type='tcp'>
       <server>localhost</server>
       <port>988</port>
     </network>
     <network_ref uuidref='NET_localhost_tcp_UUID'/>
     <node_ref uuidref='localhost_UUID'/>
   </mds>
-  <lov uuid='lov1_UUID' name='lov1'>
+  <lov name='lov1' uuid='lov1_UUID'>
     <mds_ref uuidref='mds1_UUID'/>
-    <devices pattern='0' stripesize='65536' stripecount='0'>
+    <devices stripecount='0' stripesize='65536' pattern='0'>
       <osc_ref uuidref='OSC_localhost_UUID'/>
       <osc_ref uuidref='OSC_localhost_2_UUID'/>
     </devices>
   </lov>
-  <lovconfig uuid='LVCFG_lov1_UUID' name='LVCFG_lov1'>
+  <lovconfig name='LVCFG_lov1' uuid='LVCFG_lov1_UUID'>
     <lov_ref uuidref='lov1_UUID'/>
   </lovconfig>
-  <obd type='obdfilter' uuid='OBD_localhost_UUID' name='OBD_localhost'>
+  <obd uuid='OBD_localhost_UUID' name='OBD_localhost' type='obdfilter'>
     <fstype>extN</fstype>
     <device size='100000'>/tmp/ost1</device>
     <autoformat>no</autoformat>
     <network_ref uuidref='NET_localhost_tcp_UUID'/>
     <obd_ref uuidref='OBD_localhost_UUID'/>
   </ost>
-  <obd type='obdfilter' uuid='OBD_localhost_2_UUID' name='OBD_localhost_2'>
+  <obd name='OBD_localhost_2' uuid='OBD_localhost_2_UUID' type='obdfilter'>
     <fstype>extN</fstype>
     <device size='100000'>/tmp/ost2</device>
     <autoformat>no</autoformat>
   </obd>
-  <osc uuid='OSC_localhost_2_UUID' name='OSC_localhost_2'>
+  <osc name='OSC_localhost_2' uuid='OSC_localhost_2_UUID'>
     <ost_ref uuidref='OST_localhost_2_UUID'/>
     <obd_ref uuidref='OBD_localhost_2_UUID'/>
   </osc>
-  <ost uuid='OST_localhost_2_UUID' name='OST_localhost_2'>
+  <ost name='OST_localhost_2' uuid='OST_localhost_2_UUID'>
     <network_ref uuidref='NET_localhost_tcp_UUID'/>
     <obd_ref uuidref='OBD_localhost_2_UUID'/>
   </ost>