Whamcloud - gitweb
- Added match_or_enqueue helper function
authorpschwan <pschwan>
Sat, 13 Jul 2002 19:03:23 +0000 (19:03 +0000)
committerpschwan <pschwan>
Sat, 13 Jul 2002 19:03:23 +0000 (19:03 +0000)
- fixed userspace build problem in lustre_lib.h
- added intent-based lookup code
- fixed intent-based setattr
- added mds_fid2locked_dentry and mds_name2locked_dentry helpers
- don't crash in ptlrpc_reply, just warn of API violation
- update create.pl to open instead of mcreate

17 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_lite.h
lustre/include/linux/lustre_mds.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/llite/dcache.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/llite/super.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_reint.c
lustre/ptlrpc/niobuf.c
lustre/tests/create.pl

index cb446e2..f4d7e3b 100644 (file)
@@ -305,11 +305,26 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl,
                      void *data,
                      __u32 data_len,
                      struct lustre_handle *lockh);
+int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
+                          struct ptlrpc_connection *conn,
+                          struct lustre_handle *connh, 
+                          struct ptlrpc_request *req,
+                          struct ldlm_namespace *ns,
+                          struct lustre_handle *parent_lock_handle,
+                          __u64 *res_id,
+                          __u32 type,
+                          void *cookie, int cookielen,
+                          ldlm_mode_t mode,
+                          int *flags,
+                          ldlm_lock_callback callback,
+                          void *data,
+                          __u32 data_len,
+                          struct lustre_handle *lockh);
 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
                     void *data, __u32 data_len);
-int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *, struct lustre_handle *connh, 
-                     int new_mode, int *flags);
-int ldlm_cli_cancel(struct lustre_handle *lockh, struct lustre_handle *connh);
+int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *,
+                     struct lustre_handle *connh, int new_mode, int *flags);
+int ldlm_cli_cancel(struct lustre_handle *lockh);
 
 #endif /* __KERNEL__ */
 
index 535861d..251e5b2 100644 (file)
 
 #ifndef __KERNEL__
 # include <string.h>
+#else
+# include <asm/semaphore.h>
 #endif
 
 #include <linux/portals_lib.h>
-#include <asm/semaphore.h>
 #include <linux/lustre_idl.h>
 
 #ifdef __KERNEL__
index b63dfc9..07c37dc 100644 (file)
@@ -127,7 +127,8 @@ int ll_lock(struct inode *dir, struct dentry *dentry,
             struct lookup_intent *it, struct lustre_handle *lockh);
 int ll_unlock(__u32 mode, struct lustre_handle *lockh);
 
-
+/* dcache.c */
+void ll_intent_release(struct dentry *de);
 
 /* dir.c */
 extern struct file_operations ll_dir_operations;
index 8e8a59f..e5fbfca 100644 (file)
@@ -131,7 +131,15 @@ void mds_pack_inode2fid(struct ll_fid *fid, struct inode *inode);
 void mds_pack_inode2body(struct mds_body *body, struct inode *inode);
 
 /* mds/handler.c */
-struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt);
+struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
+                                      struct vfsmount **mnt, char *name,
+                                      int namelen, int lock_mode,
+                                      struct lustre_handle *lockh);
+struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid,
+                                     struct vfsmount **mnt, int lock_mode,
+                                     struct lustre_handle *lockh);
+struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
+                              struct vfsmount **mnt);
 int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
                       void *data, int data_len, struct ptlrpc_request **req);
 int mds_reint(int offset, struct ptlrpc_request *req);
index dbb7b71..9861e15 100644 (file)
@@ -288,6 +288,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                 case IT_LINK:
                 case IT_OPEN:
                 case IT_RENAME:
+                case IT_SETATTR:
+                case IT_LOOKUP:
                         bufcount = 3;
                         break;
                 case IT_UNLINK:
@@ -317,23 +319,12 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                 case IT_CREAT:
                 case IT_CREAT|IT_OPEN:
                 case IT_MKDIR:
-                case IT_SETATTR:
                 case IT_SYMLINK:
                 case IT_MKNOD:
                 case IT_LINK:
                 case IT_UNLINK:
                 case IT_RMDIR:
                 case IT_RENAME2:
-                        if (mds_reint_p == NULL)
-                                mds_reint_p =
-                                        inter_module_get_request
-                                        ("mds_reint", "mds");
-                        if (IS_ERR(mds_reint_p)) {
-                                CERROR("MDSINTENT locks require the MDS "
-                                       "module.\n");
-                                LBUG();
-                                RETURN(-EINVAL);
-                        }
                         rc = mds_reint_p(2, req);
                         if (rc || req->rq_status != 0) {
                                 rep->lock_policy_res2 = req->rq_status;
@@ -344,16 +335,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
                 case IT_READDIR:
                 case IT_RENAME:
                 case IT_OPEN:
-                        if (mds_getattr_name_p == NULL)
-                                mds_getattr_name_p =
-                                        inter_module_get_request
-                                        ("mds_getattr_name", "mds");
-                        if (IS_ERR(mds_getattr_name_p)) {
-                                CERROR("MDSINTENT locks require the MDS "
-                                       "module.\n");
-                                LBUG();
-                                RETURN(-EINVAL);
-                        }
+                case IT_SETATTR:
+                case IT_LOOKUP:
                         rc = mds_getattr_name_p(2, req);
                         /* FIXME: we need to sit down and decide on who should
                          * set req->rq_status, who should return negative and
index d69128c..e5c35f0 100644 (file)
@@ -391,6 +391,20 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(out_dec, rc = -ENOMEM);
         }
 
+        if (mds_reint_p == NULL)
+                mds_reint_p = inter_module_get_request("mds_reint", "mds");
+        if (IS_ERR(mds_reint_p)) {
+                CERROR("MDSINTENT locks require the MDS module.\n");
+                GOTO(out_dec, rc = -EINVAL);
+        }
+        if (mds_getattr_name_p == NULL)
+                mds_getattr_name_p = inter_module_get_request
+                        ("mds_getattr_name", "mds");
+        if (IS_ERR(mds_getattr_name_p)) {
+                CERROR("MDSINTENT locks require the MDS module.\n");
+                GOTO(out_dec, rc = -EINVAL);
+        }
+
         for (i = 0; i < LDLM_NUM_THREADS; i++) {
                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
                                          "lustre_dlm");
@@ -407,6 +421,10 @@ out_thread:
         ptlrpc_stop_all_threads(ldlm->ldlm_service);
         ptlrpc_unregister_service(ldlm->ldlm_service);
 out_dec:
+        if (mds_reint_p != NULL)
+                inter_module_put("mds_reint");
+        if (mds_getattr_name_p != NULL)
+                inter_module_put("mds_getattr_name");
         MOD_DEC_USE_COUNT;
         return rc;
 }
@@ -478,6 +496,7 @@ EXPORT_SYMBOL(ldlm_lock_decref);
 EXPORT_SYMBOL(ldlm_cli_convert);
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 EXPORT_SYMBOL(ldlm_cli_cancel);
+EXPORT_SYMBOL(ldlm_match_or_enqueue);
 EXPORT_SYMBOL(ldlm_test);
 EXPORT_SYMBOL(ldlm_lock_dump);
 EXPORT_SYMBOL(ldlm_namespace_new);
index 7e47486..f632587 100644 (file)
@@ -145,6 +145,37 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         return rc;
 }
 
+int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
+                          struct ptlrpc_connection *conn,
+                          struct lustre_handle *connh, 
+                          struct ptlrpc_request *req,
+                          struct ldlm_namespace *ns,
+                          struct lustre_handle *parent_lock_handle,
+                          __u64 *res_id,
+                          __u32 type,
+                          void *cookie, int cookielen,
+                          ldlm_mode_t mode,
+                          int *flags,
+                          ldlm_lock_callback callback,
+                          void *data,
+                          __u32 data_len,
+                          struct lustre_handle *lockh)
+{
+        int rc;
+        ENTRY;
+        rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
+        if (rc == 0) {
+                rc = ldlm_cli_enqueue(cl, conn, connh, req, ns,
+                                      parent_lock_handle, res_id, type, cookie,
+                                      cookielen, mode, flags, callback, data,
+                                      data_len, lockh);
+                if (rc != ELDLM_OK)
+                        CERROR("ldlm_cli_enqueue: err: %d\n", rc);
+                RETURN(rc);
+        } else
+                RETURN(0);
+}
+
 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
                     void *data, __u32 data_len)
 {
@@ -254,8 +285,7 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
         return rc;
 }
 
-int ldlm_cli_cancel(struct lustre_handle *lockh, 
-                    struct lustre_handle *connh)
+int ldlm_cli_cancel(struct lustre_handle *lockh)
 {
         struct ptlrpc_request *req;
         struct ldlm_lock *lock;
index 374b10c..5a4743e 100644 (file)
@@ -79,7 +79,7 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q)
                         struct lustre_handle lockh;
                         ldlm_lock2handle(lock, &lockh);
                         /* can we get away without a connh here? */
-                        rc = ldlm_cli_cancel(&lockh, NULL);
+                        rc = ldlm_cli_cancel(&lockh);
                         if (rc < 0) {
                                 CERROR("ldlm_cli_cancel: %d\n", rc);
                                 LBUG();
index 5d1c29d..41cb398 100644 (file)
@@ -28,10 +28,16 @@ void ll_intent_release(struct dentry *de)
                 EXIT;
                 return;
         }
-
         if (de->d_it->it_lock_mode) {
                 handle = (struct lustre_handle *)de->d_it->it_lock_handle;
-                ldlm_lock_decref(handle, de->d_it->it_lock_mode);
+                if (de->d_it->it_op == IT_SETATTR) {
+                        int rc;
+                        ldlm_lock_decref(handle, de->d_it->it_lock_mode);
+                        rc = ldlm_cli_cancel(handle);
+                        if (rc < 0)
+                                CERROR("ldlm_cli_cancel: %d\n", rc);
+                } else
+                        ldlm_lock_decref(handle, de->d_it->it_lock_mode);
         }
         de->d_it = NULL;
         EXIT;
index 52db328..81f879d 100644 (file)
@@ -220,7 +220,7 @@ static int ll_lock_callback(struct lustre_handle *lockh,
         invalidate_inode_pages(inode);
         up(&inode->i_sem);
 
-        if (ldlm_cli_cancel(lockh, NULL) < 0)
+        if (ldlm_cli_cancel(lockh) < 0)
                 LBUG();
         RETURN(0);
 }
index ab9108f..e752e67 100644 (file)
@@ -98,23 +98,24 @@ int ll_lock(struct inode *dir, struct dentry *dentry,
             struct lookup_intent *it, struct lustre_handle *lockh)
 {
         struct ll_sb_info *sbi = ll_i2sbi(dir);
-        int err;
+        int err, lock_mode;
 
         if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SYMLINK | IT_SETATTR |
-                          IT_MKNOD)) )
-                err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT,
-                                  it, LCK_PW, dir, dentry, lockh, 0, NULL, 0,
-                                  dir, sizeof(*dir));
+                          IT_MKNOD)))
+                lock_mode = LCK_PW;
         else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK |
                               IT_RMDIR | IT_RENAME | IT_RENAME2))
-                err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT,
-                                  it, LCK_PR, dir, dentry, lockh, 0, NULL, 0,
-                                  dir, sizeof(*dir));
+                lock_mode = LCK_PR;
+        else if (it->it_op & IT_LOOKUP)
+                lock_mode = LCK_CR;
         else {
                 LBUG();
                 RETURN(-1);
         }
 
+        err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, lock_mode, dir,
+                          dentry, lockh, 0, NULL, 0, dir, sizeof(*dir));
+
         RETURN(err);
 }
 
@@ -136,13 +137,14 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
         struct ll_inode_md md;
         struct lustre_handle lockh;
         int err, type, offset;
+        struct lookup_intent lookup_it = { IT_LOOKUP };
         obd_id ino;
 
         ENTRY;
 
         if (it == NULL) {
-                LBUG();
-                RETURN(NULL);
+                it = &lookup_it;
+                dentry->d_it = it;
         }
 
         CDEBUG(D_INFO, "name: %*s, intent op: %d\n", dentry->d_name.len,
@@ -160,23 +162,13 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
         memcpy(it->it_lock_handle, &lockh, sizeof(lockh));
 
         if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SYMLINK | IT_MKNOD)) &&
-            it->it_disposition && !it->it_status) {
-#if 0
-                if (it->it_data)
-                        CERROR("leaking request %p\n", it->it_data);
-#endif
+            it->it_disposition && !it->it_status)
                 GOTO(negative, NULL);
-        }
 
-        if ((it->it_op & (IT_RENAME | IT_GETATTR | IT_UNLINK | IT_RMDIR)) &&
-            it->it_disposition && it->it_status) {
-#if 0
-                if (it->it_data)
-                        CERROR("request: %p, status: %d\n", it->it_data,
-                               it->it_status);
-#endif
+        if ((it->it_op & (IT_RENAME | IT_GETATTR | IT_UNLINK | IT_RMDIR |
+                          IT_SETATTR | IT_LOOKUP)) && it->it_disposition &&
+            it->it_status)
                 GOTO(negative, NULL);
-        }
 
         request = (struct ptlrpc_request *)it->it_data;
         if (!it->it_disposition) {
@@ -215,7 +207,9 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
                 inode->i_mode = S_IFDIR;
                 inode->i_nlink = 1;
                 GOTO(out_req, 0);
-        } else if (it->it_op != IT_RENAME2) {
+        } else if (it->it_op == IT_RENAME2) {
+                LBUG();
+        } else {
                 struct mds_body *body;
 
                 offset = 1;
@@ -251,6 +245,9 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry,
  negative:
         dentry->d_op = &ll_d_ops;
         d_add(dentry, inode);
+        if (it->it_op == IT_LOOKUP)
+                ll_intent_release(dentry);
+
         return NULL;
 }
 
index b0596e3..4db2384 100644 (file)
@@ -320,8 +320,7 @@ int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc)
         /* change incore inode */
         ll_attr2inode(inode, attr, do_trunc);
 
-        err = mdc_setattr(&sbi->ll_mdc_conn, inode, attr,
-                          &request);
+        err = mdc_setattr(&sbi->ll_mdc_conn, inode, attr, &request);
         if (err)
                 CERROR("mdc_setattr fails (%d)\n", err);
 
index a806055..19f3ef8 100644 (file)
@@ -150,7 +150,7 @@ static int mdc_lock_callback(struct lustre_handle *lockh,
                 invalidate_inode_pages(inode);
         }
 
-        rc = ldlm_cli_cancel(lockh, NULL);
+        rc = ldlm_cli_cancel(lockh);
         if (rc < 0) {
                 CERROR("ldlm_cli_cancel: %d\n", rc);
                 LBUG();
@@ -179,9 +179,6 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
         case IT_MKDIR:
                 it->it_mode = (it->it_mode | S_IFDIR) & ~current->fs->umask; 
                 break;
-        case IT_SETATTR:
-                it->it_op = IT_GETATTR;
-                break;
         case (IT_CREAT|IT_OPEN):
         case IT_CREAT:
         case IT_MKNOD:
@@ -275,7 +272,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 size[0] = sizeof(struct ldlm_reply);
                 req->rq_replen = lustre_msg_size(1, size);
         } else if (it->it_op == IT_GETATTR || it->it_op == IT_RENAME ||
-                   it->it_op == IT_OPEN) {
+                   it->it_op == IT_OPEN || it->it_op == IT_SETATTR ||
+                   it->it_op == IT_LOOKUP) {
                 size[2] = sizeof(struct mds_body);
                 size[3] = de->d_name.len + 1;
 
@@ -297,27 +295,6 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 size[1] = sizeof(struct mds_body);
                 size[2] = sizeof(struct obdo);
                 req->rq_replen = lustre_msg_size(3, size);
-        } else if (it->it_op == IT_SETATTR) {
-                size[2] = sizeof(struct mds_rec_setattr);
-                size[3] = de->d_name.len + 1;
-
-                req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
-                                       &mdc->mdc_connh, LDLM_ENQUEUE, 4, size,
-                                       NULL);
-                if (!req)
-                        RETURN(-ENOMEM);
-
-                lit = lustre_msg_buf(req->rq_reqmsg, 1);
-                lit->opc = NTOH__u64((__u64)it->it_op);
-                
-                if (!it->it_iattr) 
-                        LBUG();
-
-                mds_setattr_pack(req, 2, dir, it->it_iattr,
-                                de->d_name.name, de->d_name.len);
-                size[0] = sizeof(struct ldlm_reply);
-                size[1] = sizeof(struct mds_body);
-                req->rq_replen = lustre_msg_size(2, size);
         } else if (it->it_op == IT_READDIR) {
                 req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
                                 &mdc->mdc_connh, LDLM_ENQUEUE, 1, size, NULL);
index 18f342b..cf32175 100644 (file)
@@ -91,6 +91,71 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
         return rc;
 }
 
+/* 'dir' is a inode for which a lock has already been taken */
+struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
+                                      struct vfsmount **mnt, char *name,
+                                      int namelen, int lock_mode,
+                                      struct lustre_handle *lockh)
+{
+        struct dentry *dchild;
+        int flags, rc;
+        __u64 res_id[3] = {0};
+        ENTRY;
+
+        down(&dir->d_inode->i_sem);
+        dchild = lookup_one_len(name, dir, namelen);
+        if (IS_ERR(dchild)) {
+                CERROR("child lookup error %ld\n", PTR_ERR(dchild));
+                up(&dir->d_inode->i_sem);
+                LBUG();
+        }
+        up(&dir->d_inode->i_sem);
+
+        if (lock_mode == 0)
+                RETURN(dchild);
+
+        res_id[0] = dchild->d_inode->i_ino;
+        rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
+                                   (struct lustre_handle *)&mds->mds_connh,
+                                   NULL, mds->mds_local_namespace, NULL,
+                                   res_id, LDLM_PLAIN, NULL, 0, lock_mode,
+                                   &flags, (void *)mds_lock_callback, NULL,
+                                   0, lockh);
+        if (rc != ELDLM_OK) {
+                l_dput(dchild);
+                RETURN(NULL);
+        }
+
+        RETURN(dchild);
+}
+
+struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid,
+                                     struct vfsmount **mnt, int lock_mode,
+                                     struct lustre_handle *lockh)
+{
+        struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
+        int flags, rc;
+        __u64 res_id[3] = {0};
+        ENTRY;
+
+        if (IS_ERR(de))
+                RETURN(de);
+
+        res_id[0] = de->d_inode->i_ino;
+        rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
+                                   (struct lustre_handle *)&mds->mds_connh,
+                                   NULL, mds->mds_local_namespace, NULL,
+                                   res_id, LDLM_PLAIN, NULL, 0, lock_mode,
+                                   &flags, (void *)mds_lock_callback, NULL,
+                                   0, lockh);
+        if (rc != ELDLM_OK) {
+                l_dput(de);
+                retval = NULL;
+        }
+
+        RETURN(retval);
+}
+
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
                               struct vfsmount **mnt)
 {
@@ -287,7 +352,7 @@ int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
                 RETURN(0);
         }
 
-        if (ldlm_cli_cancel(lockh, NULL) < 0)
+        if (ldlm_cli_cancel(lockh) < 0)
                 LBUG();
         RETURN(0);
 }
@@ -375,9 +440,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
                 }
                 /* now a normal case for intent locking */
                 rc = 0;
-        } else {
+        } else
                 rc = -ENOENT;
-        }
 
         EXIT;
 out_create_dchild:
index a25d074..5cc9ac5 100644 (file)
@@ -94,28 +94,58 @@ int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
         return rc;
 }
 
+/* In the write-back case, the client holds a lock on a subtree.
+ * In the intent case, the client holds a lock on the child inode.
+ * In the pathname case, the client (may) hold a lock on the child inode. */
 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                              struct ptlrpc_request *req)
 {
         struct mds_obd *mds = mds_req2mds(req);
         struct dentry *de;
         void *handle;
-        int rc = 0;
-        int err;
+        struct lustre_handle child_lockh;
+        int rc = 0, err;
+
+        if (req->rq_reqmsg->bufcount > offset + 1) {
+                struct dentry *dir;
+                struct lustre_handle dir_lockh;
+                char *name;
+                int namelen;
+                /* a name was supplied by the client; fid1 is the directory */
+
+                name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
+                namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
+                dir = mds_fid2locked_dentry(mds, rec->ur_fid1, NULL, LCK_PR,
+                                            &dir_lockh);
+                if (!dir || IS_ERR(dir)) {
+                        l_dput(dir);
+                        LBUG();
+                        GOTO(out_setattr, rc = -ESTALE);
+                }
 
-        de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
-        if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_SETATTR)) {
-                GOTO(out_setattr, rc = -ESTALE);
+                de = mds_name2locked_dentry(mds, dir, NULL, name, namelen,
+                                            0, &child_lockh);
+                l_dput(dir);
+                if (!de || IS_ERR(de)) {
+                        LBUG();
+                        GOTO(out_setattr_de, rc = -ESTALE);
+                }
+        } else {
+                de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+                if (!de || IS_ERR(de)) {
+                        LBUG();
+                        GOTO(out_setattr_de, rc = -ESTALE);
+                }
         }
-
         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
 
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
                        de->d_inode->i_sb->s_dev);
 
+        lock_kernel();
         handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
         if (!handle)
-                GOTO(out_setattr_de, rc = PTR_ERR(handle));
+                GOTO(out_unlock, rc = PTR_ERR(handle));
         rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
 
         if (!rc)
@@ -127,10 +157,13 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 if (!rc)
                         rc = err;
         }
+
         EXIT;
-out_setattr_de:
+ out_unlock:
+        unlock_kernel();
+ out_setattr_de:
         l_dput(de);
-out_setattr:
+ out_setattr:
         req->rq_status = rc;
         return(0);
 }
@@ -533,7 +566,7 @@ out_unlink_de:
 
         if (!rc) { 
                 ldlm_lock_decref(&lockh, LCK_EX);
-                rc = ldlm_cli_cancel(&lockh, NULL);
+                rc = ldlm_cli_cancel(&lockh);
                 if (rc < 0)
                         CERROR("failed to cancel child inode lock ino "
                                "%Ld: %d\n", res_id[0], rc);
@@ -737,7 +770,7 @@ out_rename_deold:
 
         if (!rc) { 
                 ldlm_lock_decref(&oldhandle, LCK_EX);
-                rc = ldlm_cli_cancel(&oldhandle, NULL);
+                rc = ldlm_cli_cancel(&oldhandle);
                 if (rc < 0)
                         CERROR("failed to cancel child inode lock ino "
                                "%Ld: %d\n", res_id[0], rc);
index 201b4d8..a72d14e 100644 (file)
@@ -215,9 +215,11 @@ int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
 
 int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
 {
-        if (req->rq_repmsg == NULL)
+        if (req->rq_repmsg == NULL) {
                 CERROR("bad: someone called ptlrpc_reply when they meant "
                        "ptlrpc_error\n");
+                return -EINVAL;
+        }
 
         /* FIXME: we need to increment the count of handled events */
         req->rq_type = PTL_RPC_TYPE_REPLY;
index f5e6d9b..debdcd1 100644 (file)
@@ -7,6 +7,16 @@ sub usage () {
     exit;
 }
 
+sub unused() {
+    my $tmp = `./mcreate $path`;
+    if ($tmp) {
+        $tmp =~ /.*error: (.*)\n/;
+        print "Created  $path: $1\n";
+    } else {
+        print "Created  $path: Success\n";
+    }
+}
+
 my $mtpt = shift || usage();
 my $mount_count = shift || usage();
 my $i = shift || usage();
@@ -16,13 +26,9 @@ while ($i--) {
     $which = int(rand() * $mount_count) + 1;
     $d = int(rand() * $files);
     $path = "$mtpt$which/$d";
-    my $tmp = `./mcreate $path`;
-    if ($tmp) {
-        $tmp =~ /.*error: (.*)\n/;
-        print "Created  $path: $1\n";
-    } else {
-        print "Created  $path: Success\n";
-    }
+    open(FH, ">$path") || die "open($PATH): $!";
+    print "Opened   $path: Success\n";
+    close(FH) || die;
 
     $which = int(rand() * $mount_count) + 1;
     $d = int(rand() * $files);