RETURN(0);
}
-void mds_steal_ack_locks(struct obd_export *exp, struct ptlrpc_request *req)
+void mds_steal_ack_locks(struct ptlrpc_request *req)
{
- unsigned long flags;
- struct ptlrpc_request *oldrep = exp->exp_outstanding_reply;
-
- if (oldrep == NULL)
+ struct obd_export *exp = req->rq_export;
+ struct list_head *tmp;
+ struct ptlrpc_reply_state *oldrep;
+ struct ptlrpc_service *svc;
+ unsigned long flags;
+ int i;
+
+ /* CAVEAT EMPTOR: spinlock order */
+ spin_lock_irqsave (&exp->exp_lock, flags);
+ list_for_each (tmp, &exp->exp_outstanding_replies) {
+ oldrep = list_entry(tmp, struct ptlrpc_reply_state,rs_exp_list);
+
+ if (oldrep->rs_xid != req->rq_xid)
+ continue;
+
+ if (oldrep->rs_msg.opc != req->rq_reqmsg->opc)
+ CERROR ("Resent req xid "LPX64" has mismatched opc: "
+ "new %d old %d\n", req->rq_xid,
+ req->rq_reqmsg->opc, oldrep->rs_msg.opc);
+
+ svc = oldrep->rs_srv_ni->sni_service;
+ spin_lock (&svc->srv_lock);
+
+ list_del_init (&oldrep->rs_exp_list);
+
+ CWARN("Stealing %d locks from rs %p x"LPD64".t"LPD64
+ " o%d NID"LPX64"\n",
+ oldrep->rs_nlocks, oldrep,
+ oldrep->rs_xid, oldrep->rs_transno, oldrep->rs_msg.opc,
+ exp->exp_connection->c_peer.peer_nid);
+
+ for (i = 0; i < oldrep->rs_nlocks; i++)
+ ptlrpc_save_lock(req,
+ &oldrep->rs_locks[i],
+ oldrep->rs_modes[i]);
+ oldrep->rs_nlocks = 0;
+
+ DEBUG_REQ(D_HA, req, "stole locks for");
+ ptlrpc_schedule_difficult_reply (oldrep);
+
+ spin_unlock (&svc->srv_lock);
+ spin_unlock_irqrestore (&exp->exp_lock, flags);
return;
- memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
- sizeof req->rq_ack_locks);
- spin_lock_irqsave(&req->rq_lock, flags);
- oldrep->rq_resent = 1;
- wake_up(&oldrep->rq_reply_waitq);
- spin_unlock_irqrestore(&req->rq_lock, flags);
- DEBUG_REQ(D_HA, oldrep, "stole locks from");
- DEBUG_REQ(D_HA, req, "stole locks for");
+ }
+ spin_unlock_irqrestore (&exp->exp_lock, flags);
}
void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd)
req->rq_repmsg->transno = req->rq_transno = mcd->mcd_last_transno;
req->rq_repmsg->status = req->rq_status = mcd->mcd_last_result;
- if (req->rq_export->exp_outstanding_reply)
- mds_steal_ack_locks(req->rq_export, req);
+ mds_steal_ack_locks(req);
}
static void reconstruct_reint_setattr(struct mds_update_record *rec,
if (rc) {
ldlm_lock_decref(&lockh, LCK_PW);
} else {
- ldlm_put_lock_into_req(req, &lockh, LCK_PW);
+ ptlrpc_save_lock (req, &lockh, LCK_PW);
}
}
case 0:
if (rc) {
ldlm_lock_decref(&lockh, LCK_PW);
} else {
- ldlm_put_lock_into_req(req, &lockh, LCK_PW);
+ ptlrpc_save_lock (req, &lockh, LCK_PW);
}
l_dput(dparent);
case 0:
rc = vfs_unlink(dparent->d_inode, dchild);
if (!rc && log_unlink)
- if (mds_log_op_unlink(obd, child_inode, req->rq_repmsg,
- offset + 1) > 0)
+ if (mds_log_op_unlink(obd, child_inode,
+ lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
+ req->rq_repmsg->buflens[offset + 1],
+ lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
+ req->rq_repmsg->buflens[offset + 2]) > 0)
body->valid |= OBD_MD_FLCOOKIE;
break;
}
if (rc)
ldlm_lock_decref(&child_reuse_lockh, LCK_EX);
else
- ldlm_put_lock_into_req(req, &child_reuse_lockh, LCK_EX);
+ ptlrpc_save_lock(req, &child_reuse_lockh, LCK_EX);
case 2: /* child lock */
ldlm_lock_decref(&child_lockh, LCK_EX);
case 1: /* child and parent dentry, parent lock */
if (rc)
ldlm_lock_decref(&parent_lockh, LCK_PW);
else
- ldlm_put_lock_into_req(req, &parent_lockh, LCK_PW);
+ ptlrpc_save_lock(req, &parent_lockh, LCK_PW);
l_dput(dchild);
l_dput(dparent);
case 0:
ldlm_lock_decref(&src_lockh, LCK_EX);
ldlm_lock_decref(&tgt_dir_lockh, LCK_EX);
} else {
- ldlm_put_lock_into_req(req, &src_lockh, LCK_EX);
- ldlm_put_lock_into_req(req, &tgt_dir_lockh, LCK_EX);
+ ptlrpc_save_lock(req, &src_lockh, LCK_EX);
+ ptlrpc_save_lock(req, &tgt_dir_lockh, LCK_EX);
}
case 2: /* target dentry */
l_dput(de_tgt_dir);
ldlm_lock_decref(&(dlm_handles[0]), LCK_PW);
} else {
if (lock_count == 4)
- ldlm_put_lock_into_req(req,
- &(dlm_handles[3]), LCK_EX);
- ldlm_put_lock_into_req(req, &(dlm_handles[2]), LCK_EX);
- ldlm_put_lock_into_req(req, &(dlm_handles[1]), LCK_PW);
- ldlm_put_lock_into_req(req, &(dlm_handles[0]), LCK_PW);
+ ptlrpc_save_lock(req,
+ &(dlm_handles[3]), LCK_EX);
+ ptlrpc_save_lock(req, &(dlm_handles[2]), LCK_EX);
+ ptlrpc_save_lock(req, &(dlm_handles[1]), LCK_PW);
+ ptlrpc_save_lock(req, &(dlm_handles[0]), LCK_PW);
}
l_dput(de_new);
l_dput(de_old);