Whamcloud - gitweb
land b_eq on HEAD
[fs/lustre-release.git] / lustre / mds / mds_reint.c
index dcacdcf..b44dc22 100644 (file)
@@ -270,21 +270,53 @@ int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
         RETURN(0);
 }
 
-void mds_steal_ack_locks(struct obd_export *exp, struct ptlrpc_request *req)
+void mds_steal_ack_locks(struct ptlrpc_request *req)
 {
-        unsigned long flags;
-        struct ptlrpc_request *oldrep = exp->exp_outstanding_reply;
-
-        if (oldrep == NULL)
+        struct obd_export         *exp = req->rq_export;
+        struct list_head          *tmp;
+        struct ptlrpc_reply_state *oldrep;
+        struct ptlrpc_service     *svc;
+        unsigned long              flags;
+        int                        i;
+
+        /* CAVEAT EMPTOR: spinlock order */
+        spin_lock_irqsave (&exp->exp_lock, flags);
+        list_for_each (tmp, &exp->exp_outstanding_replies) {
+                oldrep = list_entry(tmp, struct ptlrpc_reply_state,rs_exp_list);
+
+                if (oldrep->rs_xid != req->rq_xid)
+                        continue;
+
+                if (oldrep->rs_msg.opc != req->rq_reqmsg->opc)
+                        CERROR ("Resent req xid "LPX64" has mismatched opc: "
+                                "new %d old %d\n", req->rq_xid,
+                                req->rq_reqmsg->opc, oldrep->rs_msg.opc);
+
+                svc = oldrep->rs_srv_ni->sni_service;
+                spin_lock (&svc->srv_lock);
+
+                list_del_init (&oldrep->rs_exp_list);
+
+                CWARN("Stealing %d locks from rs %p x"LPD64".t"LPD64
+                      " o%d NID"LPX64"\n",
+                      oldrep->rs_nlocks, oldrep, 
+                      oldrep->rs_xid, oldrep->rs_transno, oldrep->rs_msg.opc,
+                      exp->exp_connection->c_peer.peer_nid);
+
+                for (i = 0; i < oldrep->rs_nlocks; i++)
+                        ptlrpc_save_lock(req, 
+                                         &oldrep->rs_locks[i],
+                                         oldrep->rs_modes[i]);
+                oldrep->rs_nlocks = 0;
+
+                DEBUG_REQ(D_HA, req, "stole locks for");
+                ptlrpc_schedule_difficult_reply (oldrep);
+
+                spin_unlock (&svc->srv_lock);
+                spin_unlock_irqrestore (&exp->exp_lock, flags);
                 return;
-        memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
-               sizeof req->rq_ack_locks);
-        spin_lock_irqsave(&req->rq_lock, flags);
-        oldrep->rq_resent = 1;
-        wake_up(&oldrep->rq_reply_waitq);
-        spin_unlock_irqrestore(&req->rq_lock, flags);
-        DEBUG_REQ(D_HA, oldrep, "stole locks from");
-        DEBUG_REQ(D_HA, req, "stole locks for");
+        }
+        spin_unlock_irqrestore (&exp->exp_lock, flags);
 }
 
 void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd)
@@ -294,8 +326,7 @@ void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd)
         req->rq_repmsg->transno = req->rq_transno = mcd->mcd_last_transno;
         req->rq_repmsg->status = req->rq_status = mcd->mcd_last_result;
 
-        if (req->rq_export->exp_outstanding_reply)
-                mds_steal_ack_locks(req->rq_export, req);
+        mds_steal_ack_locks(req);
 }
 
 static void reconstruct_reint_setattr(struct mds_update_record *rec,
@@ -444,7 +475,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                         if (rc) {
                                 ldlm_lock_decref(&lockh, LCK_PW);
                         } else {
-                                ldlm_put_lock_into_req(req, &lockh, LCK_PW);
+                                ptlrpc_save_lock (req, &lockh, LCK_PW);
                         }
                 }
         case 0:
@@ -695,7 +726,7 @@ cleanup:
                 if (rc) {
                         ldlm_lock_decref(&lockh, LCK_PW);
                 } else {
-                        ldlm_put_lock_into_req(req, &lockh, LCK_PW);
+                        ptlrpc_save_lock (req, &lockh, LCK_PW);
                 }
                 l_dput(dparent);
         case 0:
@@ -1181,8 +1212,11 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 rc = vfs_unlink(dparent->d_inode, dchild);
 
                 if (!rc && log_unlink)
-                        if (mds_log_op_unlink(obd, child_inode, req->rq_repmsg,
-                                              offset + 1) > 0)
+                        if (mds_log_op_unlink(obd, child_inode,
+                                lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
+                                req->rq_repmsg->buflens[offset + 1],
+                                lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
+                                req->rq_repmsg->buflens[offset + 2]) > 0)
                                 body->valid |= OBD_MD_FLCOOKIE;
                 break;
         }
@@ -1234,14 +1268,14 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 if (rc)
                         ldlm_lock_decref(&child_reuse_lockh, LCK_EX);
                 else
-                        ldlm_put_lock_into_req(req, &child_reuse_lockh, LCK_EX);
+                        ptlrpc_save_lock(req, &child_reuse_lockh, LCK_EX);
         case 2: /* child lock */
                 ldlm_lock_decref(&child_lockh, LCK_EX);
         case 1: /* child and parent dentry, parent lock */
                 if (rc)
                         ldlm_lock_decref(&parent_lockh, LCK_PW);
                 else
-                        ldlm_put_lock_into_req(req, &parent_lockh, LCK_PW);
+                        ptlrpc_save_lock(req, &parent_lockh, LCK_PW);
                 l_dput(dchild);
                 l_dput(dparent);
         case 0:
@@ -1353,8 +1387,8 @@ cleanup:
                         ldlm_lock_decref(&src_lockh, LCK_EX);
                         ldlm_lock_decref(&tgt_dir_lockh, LCK_EX);
                 } else {
-                        ldlm_put_lock_into_req(req, &src_lockh, LCK_EX);
-                        ldlm_put_lock_into_req(req, &tgt_dir_lockh, LCK_EX);
+                        ptlrpc_save_lock(req, &src_lockh, LCK_EX);
+                        ptlrpc_save_lock(req, &tgt_dir_lockh, LCK_EX);
                 }
         case 2: /* target dentry */
                 l_dput(de_tgt_dir);
@@ -1720,11 +1754,11 @@ cleanup:
                         ldlm_lock_decref(&(dlm_handles[0]), LCK_PW);
                 } else {
                         if (lock_count == 4)
-                                ldlm_put_lock_into_req(req,
-                                                    &(dlm_handles[3]), LCK_EX);
-                        ldlm_put_lock_into_req(req, &(dlm_handles[2]), LCK_EX);
-                        ldlm_put_lock_into_req(req, &(dlm_handles[1]), LCK_PW);
-                        ldlm_put_lock_into_req(req, &(dlm_handles[0]), LCK_PW);
+                                ptlrpc_save_lock(req,
+                                              &(dlm_handles[3]), LCK_EX);
+                        ptlrpc_save_lock(req, &(dlm_handles[2]), LCK_EX);
+                        ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_PW);
+                        ptlrpc_save_lock(req, &(dlm_handles[0]), LCK_PW);
                 }
                 l_dput(de_new);
                 l_dput(de_old);