- /* On return, we must be sure that the ACK callback has either
- * happened or will not happen. Note that the SENT callback will
- * happen come what may since we successfully posted the PUT. */
- int rc;
- struct l_wait_info lwi;
- unsigned long flags;
-
- again:
- /* serialise with ACK callback */
- spin_lock_irqsave (&req->rq_lock, flags);
- if (!req->rq_want_ack) {
- spin_unlock_irqrestore (&req->rq_lock, flags);
- /* The ACK callback has happened already. Although the
- * SENT callback might still be outstanding (yes really) we
- * don't care; this is just like normal completion. */
- return;
- }
- spin_unlock_irqrestore (&req->rq_lock, flags);
-
- /* Have a bash at unlinking the MD. This will fail until the SENT
- * callback has happened since the MD is busy from the PUT. If the
- * ACK still hasn't arrived after then, a successful unlink will
- * ensure the ACK callback never happens. */
- rc = PtlMDUnlink (req->rq_reply_md_h);
- switch (rc) {
- default:
- LBUG ();
- case PTL_OK:
- /* SENT callback happened; ACK callback preempted */
- LASSERT (req->rq_want_ack);
- spin_lock_irqsave (&req->rq_lock, flags);
- req->rq_want_ack = 0;
- spin_unlock_irqrestore (&req->rq_lock, flags);
- return;
- case PTL_INV_MD:
- return;
- case PTL_MD_INUSE:
- /* Still sending or ACK callback in progress: wait until
- * either callback has completed and try again.
- * Actually we can't wait for the SENT callback because
- * there's no state the SENT callback can touch that will
- * allow it to communicate with us! So we just wait here
- * for a short time, effectively polling for the SENT
- * callback by calling PtlMDUnlink() again, to see if it
- * has finished. Note that if the ACK does arrive, its
- * callback wakes us in short order. --eeb */
- lwi = LWI_TIMEOUT (HZ/4, NULL, NULL);
- rc = l_wait_event(req->rq_reply_waitq, !req->rq_want_ack,
- &lwi);
- CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc);
- /* NB go back and test rq_want_ack with locking, to ensure
- * if ACK callback happened, it has completed stopped
- * referencing this req. */
- goto again;
- }
-}
-
-void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
-{
- int i;
- int netrc;
- unsigned long flags;
- struct ptlrpc_req_ack_lock *ack_lock;
- struct l_wait_info lwi = { 0 };
- wait_queue_t commit_wait;
- struct obd_device *obd =
- req->rq_export ? req->rq_export->exp_obd : NULL;
- struct obd_export *exp = NULL;
-
- if (req->rq_export) {
- for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) {
- if (req->rq_ack_locks[i].mode) {
- exp = req->rq_export;
- break;
+ if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
+ obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
+ DEBUG_REQ(D_ERROR, req, "dropping reply");
+ /* NB this does _not_ send with ACK disabled, to simulate
+ * sending OK, but timing out for the ACK */
+ if (req->rq_reply_state != NULL) {
+ if (!req->rq_reply_state->rs_difficult) {
+ lustre_free_reply_state (req->rq_reply_state);
+ req->rq_reply_state = NULL;
+ } else {
+ struct ptlrpc_service *svc =
+ req->rq_rqbd->rqbd_srv_ni->sni_service;
+ atomic_inc(&svc->srv_outstanding_replies);