Whamcloud - gitweb
Branch: b1_4
authorgreen <green>
Tue, 13 Sep 2005 16:53:20 +0000 (16:53 +0000)
committergreen <green>
Tue, 13 Sep 2005 16:53:20 +0000 (16:53 +0000)
b=7313
r=adilger

Allow locks flagged in a certain way (used by liblustre) to be cancelled without
waiting for reply from client.
This is a prototype version of code. We land it so that it will get into next
code drop to Cray.

19 files changed:
lustre/ChangeLog
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/liblustre/dir.c
lustre/liblustre/namei.c
lustre/llite/dcache.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/mdc/mdc_locks.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_reint.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/ptlrpc_module.c

index 9cc52cb..f1f2dcc 100644 (file)
@@ -76,6 +76,17 @@ Details    : the Linux NFS server relies on an MDS_OPEN_OWNEROVERRIDE hack to
             Lustre now respects this flag to disable mode checks when
             truncating a file owned by the user
 
+Severity   : minor
+Frequency  : liblustre-only, when liblustre client dies unexpectedly or becomes
+             busy
+Bugzilla   : 7313
+Description: Revoking locks from clients that went dead or catatonic might take
+             a lot of time.
+Details    : New lock flags FL_CANCEL_ON_BLOCK used by liblustre makes
+             cancellation of such locks instant on servers without waiting for
+             any reply from clients. Clients drops these locks when cancel
+             notification from server is received without replying.
+
 ------------------------------------------------------------------------------
 
 08-26-2005  Cluster File Systems, Inc. <info@clusterfs.com>
index 5fb0019..01b72a9 100644 (file)
@@ -101,6 +101,13 @@ typedef enum {
 /* Flags sent in AST lock_flags to be mapped into the receiving lock. */
 #define LDLM_AST_FLAGS         (LDLM_FL_DISCARD_DATA)
 
+/* Immediatelly cancel such locks when they block some other locks. Send
+   cancel notification to original lock holder, but expect no reply. */
+#define LDLM_FL_CANCEL_ON_BLOCK 0x800000
+
+/* Flags flags inherited from parent lock when doing intents. */
+#define LDLM_INHERIT_FLAGS         (LDLM_FL_CANCEL_ON_BLOCK)
+
 /* The blocking callback is overloaded to perform two functions.  These flags
  * indicate which operation should be performed. */
 #define LDLM_CB_BLOCKING    1
index 66d1b2b..d3aae2c 100644 (file)
@@ -159,7 +159,7 @@ int mdc_intent_lock(struct obd_export *exp,
                     void *lmm, int lmmsize,
                     struct lookup_intent *, int,
                     struct ptlrpc_request **reqp,
-                    ldlm_blocking_callback cb_blocking);
+                    ldlm_blocking_callback cb_blocking, int extra_lock_flags);
 int mdc_enqueue(struct obd_export *exp,
                 int lock_type,
                 struct lookup_intent *it,
@@ -170,7 +170,7 @@ int mdc_enqueue(struct obd_export *exp,
                 int lmmlen,
                 ldlm_completion_callback cb_completion,
                 ldlm_blocking_callback cb_blocking,
-                void *cb_data);
+                void *cb_data, int extra_lock_flags);
 
 /* mdc/mdc_request.c */
 int mdc_init_ea_size(struct obd_export *mdc_exp, struct obd_export *lov_exp);
index e0470f4..eaa71ed 100644 (file)
@@ -648,6 +648,7 @@ int ptlrpc_reply(struct ptlrpc_request *req);
 int ptlrpc_error(struct ptlrpc_request *req);
 void ptlrpc_resend_req(struct ptlrpc_request *request);
 int ptl_send_rpc(struct ptlrpc_request *request);
+int ptl_send_rpc_nowait(struct ptlrpc_request *request);
 int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
 
 /* ptlrpc/client.c */
index cda70ee..1975d79 100644 (file)
@@ -835,7 +835,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
 
         /* Some flags from the enqueue want to make it into the AST, via the
          * lock's l_flags. */
-        lock->l_flags |= (*flags & LDLM_AST_DISCARD_DATA);
+        lock->l_flags |= (*flags & (LDLM_AST_DISCARD_DATA|LDLM_INHERIT_FLAGS));
 
         /* This distinction between local lock trees is very important; a client
          * namespace only has information about locks taken by that client, and
@@ -941,6 +941,7 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list)
                 } else {
                         rc = 0;
                 }
+
                 if (rc == -ERESTART)
                         retval = rc;
                 else if (rc)
index 1adbba3..ee26f2e 100644 (file)
@@ -429,6 +429,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         struct ldlm_request *body;
         struct ptlrpc_request *req;
         int rc = 0, size = sizeof(*body);
+        int instant_cancel = 0;
         ENTRY;
 
         if (flag == LDLM_CB_CANCELING) {
@@ -461,6 +462,9 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         }
 #endif
 
+        if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
+                instant_cancel = 1;
+
         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
                               LDLM_BL_CALLBACK, 1, &size, NULL);
         if (req == NULL) {
@@ -476,14 +480,20 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
 
         LDLM_DEBUG(lock, "server preparing blocking AST");
         req->rq_replen = lustre_msg_size(0, NULL);
-
-        if (lock->l_granted_mode == lock->l_req_mode)
+        if (instant_cancel) {
+                ldlm_lock_cancel(lock);
+//                ldlm_reprocess_all(lock->l_resource);
+        } else if (lock->l_granted_mode == lock->l_req_mode)
                 ldlm_add_waiting_lock(lock);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
         req->rq_send_state = LUSTRE_IMP_FULL;
         req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
-        rc = ptlrpc_queue_wait(req);
+        if (unlikely(instant_cancel)) {
+                rc = ptl_send_rpc_nowait(req);
+        } else {
+                rc = ptlrpc_queue_wait(req);
+        }
         if (rc != 0)
                 rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
 
@@ -951,6 +961,10 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
         LDLM_DEBUG(lock, "client blocking AST callback handler START");
 
         lock->l_flags |= LDLM_FL_CBPENDING;
+
+        if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
+                lock->l_flags |= LDLM_FL_CANCEL;
+
         do_ast = (!lock->l_readers && !lock->l_writers);
 
         if (do_ast) {
@@ -1237,13 +1251,15 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         switch (req->rq_reqmsg->opc) {
         case LDLM_BL_CALLBACK:
                 CDEBUG(D_INODE, "blocking ast\n");
-                ldlm_callback_reply(req, 0);
+                if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
+                        ldlm_callback_reply(req, 0);
                 if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock))
                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
                 break;
         case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "completion ast\n");
-                ldlm_callback_reply(req, 0);
+                if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
+                        ldlm_callback_reply(req, 0);
                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
                 break;
         case LDLM_GL_CALLBACK:
index c43bd4b..8e7cd4e 100644 (file)
@@ -182,6 +182,7 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         ldlm_lock_addref_internal(lock, mode);
         ldlm_lock2handle(lock, lockh);
         lock->l_flags |= LDLM_FL_LOCAL;
+        lock->l_flags |= *flags & LDLM_INHERIT_FLAGS;
         lock->l_lvb_swabber = lvb_swabber;
         if (policy != NULL)
                 memcpy(&lock->l_policy_data, policy, sizeof(*policy));
@@ -360,6 +361,7 @@ int ldlm_cli_enqueue(struct obd_export *exp,
         memcpy(&lock->l_remote_handle, &reply->lock_handle,
                sizeof(lock->l_remote_handle));
         *flags = reply->lock_flags;
+        lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS;
 
         CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n",
                lock, reply->lock_handle.cookie, *flags);
@@ -570,7 +572,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 /* Set this flag to prevent others from getting new references*/
                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 lock->l_flags |= LDLM_FL_CBPENDING;
-                local_only = (lock->l_flags & LDLM_FL_LOCAL_ONLY);
+                local_only = (lock->l_flags &
+                              (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 ldlm_cancel_callback(lock);
 
@@ -608,9 +611,10 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 if (rc == ESTALE) {
                         char str[PTL_NALFMT_SIZE];
                         CERROR("client/server (nid %s) out of sync"
-                               " -- not fatal\n",
+                               " -- not fatal, flags %d\n",
                                ptlrpc_peernid2str(&req->rq_import->
-                                                  imp_connection->c_peer, str));
+                                                  imp_connection->c_peer, str),
+lock->l_flags);
                 } else if (rc == -ETIMEDOUT) {
                         ptlrpc_req_finished(req);
                         GOTO(restart, rc);
index 4c1917a..358856f 100644 (file)
@@ -88,7 +88,7 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
                 rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR,
                                  &data, &lockh, NULL, 0,
                                  ldlm_completion_ast, llu_mdc_blocking_ast,
-                                 inode);
+                                 inode, LDLM_FL_CANCEL_ON_BLOCK);
                 request = (struct ptlrpc_request *)it.d.lustre.it_data;
                 if (request)
                         ptlrpc_req_finished(request);
index e173996..912851a 100644 (file)
@@ -265,7 +265,8 @@ int llu_pb_revalidate(struct pnode *pnode, int flags, struct lookup_intent *it)
                                 pb->pb_ino, pb->pb_name.name,pb->pb_name.len,0);
 
         rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, flags,
-                             &req, llu_mdc_blocking_ast);
+                             &req, llu_mdc_blocking_ast,
+                             LDLM_FL_CANCEL_ON_BLOCK);
         /* If req is NULL, then mdc_intent_lock only tried to do a lock match;
          * if all was well, it will return 1 if it found locks, 0 otherwise. */
         if (req == NULL && rc >= 0)
@@ -431,7 +432,8 @@ static int llu_lookup_it(struct inode *parent, struct pnode *pnode,
                                 pnode->p_base->pb_name.len, flags);
 
         rc = mdc_intent_lock(llu_i2mdcexp(parent), &op_data, NULL, 0, it,
-                             flags, &req, llu_mdc_blocking_ast);
+                             flags, &req, llu_mdc_blocking_ast,
+                             LDLM_FL_CANCEL_ON_BLOCK);
         if (rc < 0)
                 GOTO(out, rc);
 
index 55759ef..7ee9a58 100644 (file)
@@ -303,7 +303,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags,
                                de->d_name.name, de->d_name.len, 0);
 
         rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags,
-                             &req, ll_mdc_blocking_ast);
+                             &req, ll_mdc_blocking_ast, 0);
         /* If req is NULL, then mdc_intent_lock only tried to do a lock match;
          * if all was well, it will return 1 if it found locks, 0 otherwise. */
         if (req == NULL && rc >= 0)
index f9d4924..68196c3 100644 (file)
@@ -224,7 +224,8 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
 
                 rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_PLAIN, &it,
                                  LCK_PR, &data, &lockh, NULL, 0,
-                                 ldlm_completion_ast, ll_mdc_blocking_ast, dir);
+                                 ldlm_completion_ast, ll_mdc_blocking_ast, dir,
+                                 0);
 
                 request = (struct ptlrpc_request *)it.d.lustre.it_data;
                 if (request)
index c8b4e7c..10ebaf1 100644 (file)
@@ -154,7 +154,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
 
         rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, itp, LCK_PW, &data,
                          &lockh, lmm, lmmsize, ldlm_completion_ast,
-                         ll_mdc_blocking_ast, NULL);
+                         ll_mdc_blocking_ast, NULL, 0);
         if (rc < 0)
                 CERROR("lock enqueue: err: %d\n", rc);
         RETURN(rc);
index 779c1e1..4ec2685 100644 (file)
@@ -408,7 +408,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
         it->it_create_mode &= ~current->fs->umask;
 
         rc = mdc_intent_lock(ll_i2mdcexp(parent), &op_data, NULL, 0, it,
-                             lookup_flags, &req, ll_mdc_blocking_ast);
+                             lookup_flags, &req, ll_mdc_blocking_ast, 0);
 
         if (rc < 0)
                 GOTO(out, retval = ERR_PTR(rc));
index ead4b89..202afbf 100644 (file)
@@ -235,14 +235,14 @@ int mdc_enqueue(struct obd_export *exp,
                 int lmmsize,
                 ldlm_completion_callback cb_completion,
                 ldlm_blocking_callback cb_blocking,
-                void *cb_data)
+                void *cb_data, int extra_lock_flags)
 {
         struct ptlrpc_request *req;
         struct obd_device *obddev = class_exp2obd(exp);
         struct ldlm_res_id res_id =
                 { .name = {data->fid1.id, data->fid1.generation} };
         int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
-        int rc, flags = LDLM_FL_HAS_INTENT;
+        int rc, flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
         int repsize[4] = {sizeof(struct ldlm_reply),
                           sizeof(struct mds_body),
                           obddev->u.cli.cl_max_mds_easize,
@@ -472,7 +472,7 @@ EXPORT_SYMBOL(mdc_enqueue);
 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
                     void *lmm, int lmmsize, struct lookup_intent *it,
                     int lookup_flags, struct ptlrpc_request **reqp,
-                    ldlm_blocking_callback cb_blocking)
+                    ldlm_blocking_callback cb_blocking, int extra_lock_flags)
 {
         struct lustre_handle lockh;
         struct ptlrpc_request *request;
@@ -526,7 +526,8 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
 
                 rc = mdc_enqueue(exp, LDLM_PLAIN, it, it_to_lock_mode(it),
                                  op_data, &lockh, lmm, lmmsize,
-                                 ldlm_completion_ast, cb_blocking, NULL);
+                                 ldlm_completion_ast, cb_blocking, NULL,
+                                 extra_lock_flags);
                 if (rc < 0)
                         RETURN(rc);
                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
index 0f90dd6..ac61f2f 100644 (file)
@@ -671,7 +671,7 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
         return(rc);
 }
 
-static int mds_getattr_name(int offset, struct ptlrpc_request *req,
+static int mds_getattr_name(int offset, struct ptlrpc_request *req, int flags,
                             struct lustre_handle *child_lockh)
 {
         struct obd_device *obd = req->rq_export->exp_obd;
@@ -753,7 +753,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
                 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
                                                  &parent_lockh, &dparent,
                                                  LCK_PR, name, namesize,
-                                                 child_lockh, &dchild, LCK_PR);
+                                                 child_lockh, &dchild, LCK_PR,
+                                                 flags);
                 if (rc)
                         GOTO(cleanup, rc);
         } else {
@@ -1238,7 +1239,7 @@ int mds_handle(struct ptlrpc_request *req)
                  * want to cancel.
                  */
                 lockh.cookie = 0;
-                rc = mds_getattr_name(0, req, &lockh);
+                rc = mds_getattr_name(0, req, 0, &lockh);
                 /* this non-intent call (from an ioctl) is special */
                 req->rq_status = rc;
                 if (rc == 0 && lockh.cookie)
@@ -1990,7 +1991,9 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         case IT_LOOKUP:
         case IT_READDIR:
                 fixup_handle_for_resent_req(req, lock, &new_lock, &lockh);
-                rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh);
+                rep->lock_policy_res2 = mds_getattr_name(offset, req,
+                                                     flags & LDLM_INHERIT_FLAGS,
+                                                     &lockh);
                 /* FIXME: LDLM can set req->rq_status. MDS sets
                    policy_res{1,2} with disposition and status.
                    - replay: returns 0 & req->status is old status
index 1f564f3..6fffb8b 100644 (file)
@@ -103,7 +103,8 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2);
 int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
                           struct lustre_handle *p1_lockh, int p1_lock_mode,
                           struct ldlm_res_id *p2_res_id,
-                          struct lustre_handle *p2_lockh, int p2_lock_mode);
+                          struct lustre_handle *p2_lockh, int p2_lock_mode,
+                          int p2_lock_flags);
 void mds_commit_cb(struct obd_device *, __u64 last_rcvd, void *data, int error);
 int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
                        struct ptlrpc_request *req, int rc, __u32 op_data);
@@ -115,7 +116,8 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 struct dentry **dparentp, int parent_mode,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
-                                struct dentry **dchildp, int child_mode);
+                                struct dentry **dchildp, int child_mode,
+                                int child_lock_flags);
 int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
                        struct lustre_handle *child_lockh);
 int mds_osc_setattr_async(struct obd_device *obd, struct inode *inode,
index 215140b..67a162a 100644 (file)
@@ -946,12 +946,14 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
 int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
                           struct lustre_handle *p1_lockh, int p1_lock_mode,
                           struct ldlm_res_id *p2_res_id,
-                          struct lustre_handle *p2_lockh, int p2_lock_mode)
+                          struct lustre_handle *p2_lockh, int p2_lock_mode,
+                          int p2_lock_flags)
 {
         struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id };
         struct lustre_handle *handles[2] = { p1_lockh, p2_lockh };
         int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
-        int rc, flags;
+        int flags[2] = { LDLM_FL_LOCAL_ONLY, LDLM_FL_LOCAL_ONLY | p2_lock_flags };
+        int rc;
         ENTRY;
 
         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
@@ -966,14 +968,15 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
                 res_id[0] = p2_res_id;
                 lock_modes[1] = p1_lock_mode;
                 lock_modes[0] = p2_lock_mode;
+                flags[1] = LDLM_FL_LOCAL_ONLY;
+                flags[0] = p2_lock_flags | LDLM_FL_LOCAL_ONLY;
         }
 
         CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
                res_id[0]->name[0], res_id[1]->name[0]);
 
-        flags = LDLM_FL_LOCAL_ONLY;
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0],
-                              LDLM_PLAIN, NULL, lock_modes[0], &flags,
+                              LDLM_PLAIN, NULL, lock_modes[0], &flags[0],
                               mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
                               NULL, 0, NULL, handles[0]);
         if (rc != ELDLM_OK)
@@ -984,10 +987,9 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
                 ldlm_lock_addref(handles[1], lock_modes[1]);
         } else if (res_id[1]->name[0] != 0) {
-                flags = LDLM_FL_LOCAL_ONLY;
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
                                       *res_id[1], LDLM_PLAIN, NULL,
-                                      lock_modes[1], &flagsmds_blocking_ast,
+                                      lock_modes[1], &flags[1],mds_blocking_ast,
                                       ldlm_completion_ast, NULL, NULL, NULL, 0,
                                       NULL, handles[1]);
                 if (rc != ELDLM_OK) {
@@ -1178,7 +1180,8 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 struct dentry **dparentp, int parent_mode,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
-                                struct dentry **dchildp, int child_mode)
+                                struct dentry **dchildp, int child_mode,
+                                int child_lock_flags)
 {
         struct ldlm_res_id child_res_id = { .name = {0} };
         struct ldlm_res_id parent_res_id = { .name = {0} };
@@ -1233,7 +1236,8 @@ retry_locks:
         /* Step 3: Lock parent and child in resource order.  If child doesn't
          *         exist, we still have to lock the parent and re-lookup. */
         rc = enqueue_ordered_locks(obd,&parent_res_id,parent_lockh,parent_mode,
-                                   &child_res_id, child_lockh, child_mode);
+                                   &child_res_id, child_lockh, child_mode,
+                                   child_lock_flags);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1386,7 +1390,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
                                          &parent_lockh, &dparent, LCK_PW,
                                          rec->ur_name, rec->ur_namelen,
-                                         &child_lockh, &dchild, LCK_EX);
+                                         &child_lockh, &dchild, LCK_EX, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
@@ -1635,7 +1639,7 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
 
         rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX,
-                                   &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX);
+                                   &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX, 0);
         if (rc)
                 GOTO(cleanup, rc);
 
index f24391a..44a6193 100644 (file)
@@ -387,6 +387,62 @@ int ptlrpc_error(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
+int ptl_send_rpc_nowait(struct ptlrpc_request *request)
+{
+        int rc;
+        struct ptlrpc_connection *connection;
+        unsigned long flags;
+        ENTRY;
+
+        LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);
+
+        if (request->rq_import->imp_obd &&
+            request->rq_import->imp_obd->obd_fail) {
+                CDEBUG(D_HA, "muting rpc for failed imp obd %s\n",
+                       request->rq_import->imp_obd->obd_name);
+                /* this prevents us from waiting in ptlrpc_queue_wait */
+                request->rq_err = 1;
+                RETURN(-ENODEV);
+        }
+        
+        connection = request->rq_import->imp_connection;
+
+        request->rq_reqmsg->handle = request->rq_import->imp_remote_handle;
+        request->rq_reqmsg->type = PTL_RPC_MSG_REQUEST;
+        request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt;
+
+        spin_lock_irqsave (&request->rq_lock, flags);
+        /* If the MD attach succeeds, there _will_ be a reply_in callback */
+        request->rq_receiving_reply = 0;
+        /* Clear any flags that may be present from previous sends. */
+        request->rq_replied = 0;
+        request->rq_err = 0;
+        request->rq_timedout = 0;
+        request->rq_net_err = 0;
+        request->rq_resend = 0;
+        request->rq_restart = 0;
+        spin_unlock_irqrestore (&request->rq_lock, flags);
+
+        ptlrpc_request_addref(request);       /* +1 ref for the SENT callback */
+
+        request->rq_sent = CURRENT_SECONDS;
+        ptlrpc_pinger_sending_on_import(request->rq_import);
+        rc = ptl_send_buf(&request->rq_req_md_h, 
+                          request->rq_reqmsg, request->rq_reqlen,
+                          PTL_NOACK_REQ, &request->rq_req_cbid, 
+                          connection,
+                          request->rq_request_portal,
+                          request->rq_xid);
+        if (rc == 0) {
+                ptlrpc_lprocfs_rpc_sent(request);
+        } else {
+                ptlrpc_req_finished (request);          /* drop callback ref */
+        }
+
+        return rc;
+}
+
+
 int ptl_send_rpc(struct ptlrpc_request *request)
 {
         int rc;
index 4df0fe7..35d91ee 100644 (file)
@@ -95,6 +95,7 @@ EXPORT_SYMBOL(ptlrpc_reply);
 EXPORT_SYMBOL(ptlrpc_error);
 EXPORT_SYMBOL(ptlrpc_resend_req);
 EXPORT_SYMBOL(ptl_send_rpc);
+EXPORT_SYMBOL(ptl_send_rpc_nowait);
 
 /* client.c */
 EXPORT_SYMBOL(ptlrpc_init_client);