* Recovery functions
*/
+static int target_exp_enqueue_req_replay(struct ptlrpc_request *req)
+{
+ __u64 transno = lustre_msg_get_transno(req->rq_reqmsg);
+ struct obd_export *exp = req->rq_export;
+ struct ptlrpc_request *reqiter;
+ int dup = 0;
+
+ LASSERT(exp);
+
+ spin_lock(&exp->exp_lock);
+ list_for_each_entry(reqiter, &exp->exp_req_replay_queue,
+ rq_replay_list) {
+ if (lustre_msg_get_transno(reqiter->rq_reqmsg) == transno) {
+ dup = 1;
+ break;
+ }
+ }
+
+ if (dup) {
+ /* we expect it with RESENT and REPLAY flags */
+ if ((lustre_msg_get_flags(req->rq_reqmsg) &
+ (MSG_RESENT | MSG_REPLAY)) != (MSG_RESENT | MSG_REPLAY))
+ CERROR("invalid flags %x of resent replay\n",
+ lustre_msg_get_flags(req->rq_reqmsg));
+ } else {
+ list_add_tail(&req->rq_replay_list, &exp->exp_req_replay_queue);
+ }
+
+ spin_unlock(&exp->exp_lock);
+ return dup;
+}
+
+static void target_exp_dequeue_req_replay(struct ptlrpc_request *req)
+{
+ LASSERT(!list_empty(&req->rq_replay_list));
+ LASSERT(req->rq_export);
+
+ spin_lock(&req->rq_export->exp_lock);
+ list_del_init(&req->rq_replay_list);
+ spin_unlock(&req->rq_export->exp_lock);
+}
static void target_release_saved_req(struct ptlrpc_request *req)
{
list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ target_exp_dequeue_req_replay(req);
list_del(&req->rq_list);
DEBUG_REQ(D_ERROR, req, "aborted:");
req->rq_status = -ENOTCONN;
list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ target_exp_dequeue_req_replay(req);
list_del(&req->rq_list);
target_release_saved_req(req);
}
}
continue;
}
+ target_exp_dequeue_req_replay(req);
list_del_init(&req->rq_list);
obd->obd_requests_queued_for_recovery--;
spin_unlock_bh(&obd->obd_processing_task_lock);
__u64 transno = lustre_msg_get_transno(req->rq_reqmsg);
struct ptlrpc_request *saved_req;
struct lustre_msg *reqmsg;
+ int rc = 0;
/* CAVEAT EMPTOR: The incoming request message has been swabbed
* (i.e. buflens etc are in my own byte order), but type-dependent
/* Processing the queue right now, don't re-add. */
LASSERT(list_empty(&req->rq_list));
spin_unlock_bh(&obd->obd_processing_task_lock);
- OBD_FREE(reqmsg, req->rq_reqlen);
- OBD_FREE(saved_req, sizeof *saved_req);
- return 1;
+ GOTO(err_free, rc = 1);
}
- /* A resent, replayed request that is still on the queue; just drop it.
- The queued request will handle this. */
- if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT|MSG_REPLAY)) ==
- (MSG_RESENT | MSG_REPLAY)) {
- DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
+ if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_DROP))) {
spin_unlock_bh(&obd->obd_processing_task_lock);
- OBD_FREE(reqmsg, req->rq_reqlen);
- OBD_FREE(saved_req, sizeof *saved_req);
- return 0;
+ GOTO(err_free, rc = 0);
}
memcpy(saved_req, req, sizeof *req);
req->rq_reqmsg = reqmsg;
class_export_get(req->rq_export);
CFS_INIT_LIST_HEAD(&req->rq_list);
+ CFS_INIT_LIST_HEAD(&req->rq_replay_list);
+
+ if (target_exp_enqueue_req_replay(req)) {
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
+ GOTO(err_exp, rc = 0);
+ }
/* XXX O(n^2) */
list_for_each(tmp, &obd->obd_recovery_queue) {
inserted = 1;
break;
}
+
+ if (unlikely(lustre_msg_get_transno(reqiter->rq_reqmsg) ==
+ transno)) {
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ DEBUG_REQ(D_ERROR, req, "dropping replay: transno "
+ "has been claimed by another client");
+ target_exp_dequeue_req_replay(req);
+ GOTO(err_exp, rc = 0);
+ }
}
if (!inserted) {
process_recovery_queue(obd);
return 0;
+
+err_exp:
+ class_export_put(req->rq_export);
+err_free:
+ OBD_FREE(reqmsg, req->rq_reqlen);
+ OBD_FREE(saved_req, sizeof(*saved_req));
+ return rc;
}
struct obd_device * target_req2obd(struct ptlrpc_request *req)