"cookies not random?\n", target->obd_name,
libcfs_nid2str(req->rq_peer.nid), cluuid.uuid);
GOTO(out, rc = -EALREADY);
+ } else if (export->exp_delayed && target->obd_recovering) {
+ /* VBR: don't allow delayed connection during recovery */
+ CWARN("%s: NID %s (%s) export was already marked as delayed "
+ "and will wait for end of recovery\n", target->obd_name,
+ libcfs_nid2str(req->rq_peer.nid), cluuid.uuid);
+ GOTO(out, rc = -EBUSY);
} else {
OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_DELAY_RECONNECT, 2 * obd_timeout);
}
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
client_nid = &req->rq_peer.nid;
+ /* VBR: for delayed connections we start recovery */
+ if (export && export->exp_delayed && !export->exp_in_recovery) {
+ LASSERT(!target->obd_recovering);
+ lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_DELAYED |
+ MSG_CONNECT_RECOVERING);
+ spin_lock_bh(&target->obd_processing_task_lock);
+ target->obd_version_recov = 1;
+ spin_unlock_bh(&target->obd_processing_task_lock);
+ target_start_and_reset_recovery_timer(target, handler, req, 0);
+ }
+
if (export == NULL) {
if (target->obd_recovering) {
CERROR("%s: denying connection for new client %s (%s): "
if (export->exp_connection != NULL)
ptlrpc_put_connection(export->exp_connection);
+
export->exp_connection = ptlrpc_get_connection(req->rq_peer,
req->rq_self,
&remote_uuid);
GOTO(set_flags, rc = 0);
}
- if (target->obd_recovering)
+ if (target->obd_recovering && !export->exp_in_recovery) {
+ spin_lock(&export->exp_lock);
+ export->exp_in_recovery = 1;
+ spin_unlock(&export->exp_lock);
target->obd_connected_clients++;
-
+ }
memcpy(&conn,
lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn),
sizeof conn);
OBD_FREE(req, sizeof *req);
}
-static void target_finish_recovery(struct obd_device *obd)
+static void target_send_delayed_replies(struct obd_device *obd)
{
- struct list_head *tmp, *n;
+ struct ptlrpc_request *req, *tmp;
LCONSOLE_INFO("%s: sending delayed replies to recovered clients\n",
obd->obd_name);
+ list_for_each_entry_safe(req, tmp, &obd->obd_delayed_reply_queue, rq_list) {
+ list_del_init(&req->rq_list);
+ DEBUG_REQ(D_HA, req, "delayed:");
+ ptlrpc_reply(req);
+ target_release_saved_req(req);
+ }
+ obd->obd_recovery_end = cfs_time_current_sec();
+}
+
+static void target_finish_recovery(struct obd_device *obd)
+{
ldlm_reprocess_all_ns(obd->obd_namespace);
/* when recovery finished, cleanup orphans on mds and ost */
LCONSOLE_WARN("%s: recovery %s: rc %d\n", obd->obd_name,
rc < 0 ? "failed" : "complete", rc);
}
-
- list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
- struct ptlrpc_request *req;
- req = list_entry(tmp, struct ptlrpc_request, rq_list);
- list_del(&req->rq_list);
- DEBUG_REQ(D_HA, req, "delayed:");
- ptlrpc_reply(req);
- target_release_saved_req(req);
- }
- obd->obd_recovery_end = cfs_time_current_sec();
+ target_send_delayed_replies(obd);
}
static void abort_recovery_queue(struct obd_device *obd)
struct obd_device *obd = data;
ENTRY;
+ LCONSOLE_WARN("%s: recovery is aborted by administrative request;"
+ "%d clients are not recovered (%d clients did)\n",
+ obd->obd_name, obd->obd_recoverable_clients,
+ obd->obd_connected_clients);
+
spin_lock_bh(&obd->obd_processing_task_lock);
if (!obd->obd_recovering) {
spin_unlock_bh(&obd->obd_processing_task_lock);
return;
}
obd->obd_recovering = obd->obd_abort_recovery = 0;
+ obd->obd_version_recov = 0;
+ obd->obd_recoverable_clients = 0;
target_cancel_recovery_timer(obd);
spin_unlock_bh(&obd->obd_processing_task_lock);
- LCONSOLE_WARN("%s: recovery period over; %d clients never reconnected "
- "after %lds (%d clients did)\n",
- obd->obd_name, obd->obd_recoverable_clients,
- cfs_time_current_sec()- obd->obd_recovery_start,
- obd->obd_connected_clients);
class_disconnect_stale_exports(obd);
abort_recovery_queue(obd);
EXIT;
}
+static void reset_recovery_timer(struct obd_device *, int, int);
static void target_recovery_expired(unsigned long castmeharder)
{
struct obd_device *obd = (struct obd_device *)castmeharder;
- CERROR("%s: recovery timed out, aborting\n", obd->obd_name);
+ int version_recov = obd->obd_version_recov;
+ LCONSOLE_WARN("%s: recovery period over; %d clients never reconnected "
+ "after %lds (%d clients did)\n",
+ obd->obd_name, obd->obd_recoverable_clients,
+ cfs_time_current_sec()- obd->obd_recovery_start,
+ obd->obd_connected_clients);
+
+
+ /* VBR: stale exports should be marked as delayed */
+ class_handle_stale_exports(obd);
+
spin_lock_bh(&obd->obd_processing_task_lock);
- if (obd->obd_recovering)
- obd->obd_abort_recovery = 1;
+ /* VBR: not all clients are connected, so there can be gap */
+ if (obd->obd_recovering && !version_recov) {
+ obd->obd_version_recov = 1;
+ }
cfs_waitq_signal(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
+ /* reset timer if recovery will proceed with versions now */
+ reset_recovery_timer(obd, OBD_RECOVERY_FACTOR * obd_timeout,
+ !version_recov);
}
struct ptlrpc_request *req,
int new_client)
{
- int req_timeout = OBD_RECOVERY_FACTOR *
+ int req_timeout = OBD_RECOVERY_FACTOR *
lustre_msg_get_timeout(req->rq_reqmsg);
check_and_start_recovery_timer(obd, handler);
max = obd->obd_max_recoverable_clients;
req_transno = lustre_msg_get_transno(req->rq_reqmsg);
connected = obd->obd_connected_clients;
- completed = max - obd->obd_recoverable_clients;
+ completed = max - obd->obd_recoverable_clients -
+ obd->obd_delayed_clients;
queue_len = obd->obd_requests_queued_for_recovery;
next_transno = obd->obd_next_recovery_transno;
- CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, "
- "req_transno: "LPU64", next_transno: "LPU64"\n",
- max, connected, completed, queue_len, req_transno, next_transno);
+ CDEBUG(D_HA,"max: %d, connected: %d, delayed %d, completed: %d, "
+ "queue_len: %d, req_transno: "LPU64", next_transno: "LPU64"\n",
+ max, connected, obd->obd_delayed_clients, completed, queue_len,
+ req_transno, next_transno);
if (obd->obd_abort_recovery) {
CDEBUG(D_HA, "waking for aborted recovery\n");
wake_up = 1;
} else if (req_transno == next_transno) {
CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
wake_up = 1;
- } else if (queue_len + completed == max) {
+ } else if (queue_len == obd->obd_recoverable_clients) {
CDEBUG(D_ERROR,
"waking for skipped transno (skip: "LPD64
", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
struct obd_device *obd = target_req2obd(req);
struct ptlrpc_request *saved_req;
struct lustre_msg *reqmsg;
- int recovery_done = 0;
+ struct obd_export *exp = req->rq_export;
+ int recovery_done = 0, delayed_done = 0;
LASSERT ((rc == 0) == req->rq_packed_final);
spin_lock_bh(&obd->obd_processing_task_lock);
if (obd->obd_stopping) {
spin_unlock_bh(&obd->obd_processing_task_lock);
- OBD_FREE(reqmsg, req->rq_reqlen);
- OBD_FREE(saved_req, sizeof *req);
- req->rq_status = -ENOTCONN;
- /* rv is ignored anyhow */
- return -ENOTCONN;
+ goto out_noconn;
+ }
+
+ if (!exp->exp_vbr_failed) {
+ ptlrpc_rs_addref(req->rq_reply_state); /* +1 ref for saved reply */
+ req = saved_req;
+ req->rq_reqmsg = reqmsg;
+ CFS_INIT_LIST_HEAD(&req->rq_list);
+ CFS_INIT_LIST_HEAD(&req->rq_replay_list);
+ class_export_get(exp);
+ list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
}
- ptlrpc_rs_addref(req->rq_reply_state); /* +1 ref for saved reply */
- req = saved_req;
- req->rq_reqmsg = reqmsg;
- class_export_get(req->rq_export);
- list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
/* only count the first "replay over" request from each
export */
- if (req->rq_export->exp_replay_needed) {
- --obd->obd_recoverable_clients;
-
- spin_lock(&req->rq_export->exp_lock);
- req->rq_export->exp_replay_needed = 0;
- spin_unlock(&req->rq_export->exp_lock);
+ if (exp->exp_replay_needed) {
+ spin_lock(&exp->exp_lock);
+ exp->exp_replay_needed = 0;
+ spin_unlock(&exp->exp_lock);
+
+ if (!exp->exp_delayed) {
+ --obd->obd_recoverable_clients;
+ } else {
+ spin_lock(&exp->exp_lock);
+ exp->exp_delayed = 0;
+ spin_unlock(&exp->exp_lock);
+ delayed_done = 1;
+ if (obd->obd_delayed_clients == 0) {
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ LBUG();
+ }
+ --obd->obd_delayed_clients;
+ }
}
recovery_done = (obd->obd_recoverable_clients == 0);
spin_unlock_bh(&obd->obd_processing_task_lock);
+ if (delayed_done) {
+ /* start pinging export */
+ spin_lock(&obd->obd_dev_lock);
+ list_add_tail(&exp->exp_obd_chain_timed,
+ &obd->obd_exports_timed);
+ spin_unlock(&obd->obd_dev_lock);
+ target_send_delayed_replies(obd);
+ }
+
OBD_RACE(OBD_FAIL_LDLM_RECOV_CLIENTS);
if (recovery_done) {
spin_lock_bh(&obd->obd_processing_task_lock);
- obd->obd_recovering = obd->obd_abort_recovery = 0;
+ obd->obd_recovering = 0;
+ obd->obd_version_recov = 0;
+ obd->obd_abort_recovery = 0;
target_cancel_recovery_timer(obd);
spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ if (!delayed_done)
target_finish_recovery(obd);
CDEBUG(D_HA, "%s: recovery complete\n",
obd_uuid2str(&obd->obd_uuid));
} else {
CWARN("%s: %d recoverable clients remain\n",
- obd->obd_name, obd->obd_recoverable_clients);
+ obd->obd_name, obd->obd_recoverable_clients);
cfs_waitq_signal(&obd->obd_next_transno_waitq);
}
+ /* VBR: disconnect export with failed recovery */
+ if (exp->exp_vbr_failed) {
+ CWARN("%s: disconnect export %s\n", obd->obd_name,
+ exp->exp_client_uuid.uuid);
+ class_fail_export(exp);
+ goto out_noconn;
+ }
+
return 1;
+
+out_noconn:
+ OBD_FREE(reqmsg, req->rq_reqlen);
+ OBD_FREE(saved_req, sizeof *req);
+ req->rq_status = -ENOTCONN;
+ /* rv is ignored anyhow */
+ return -ENOTCONN;
+}
+
+int target_handle_reply(struct ptlrpc_request *req, int rc, int fail)
+{
+ struct obd_device *obd = NULL;
+
+ if (req->rq_export)
+ obd = target_req2obd(req);
+
+ /* handle replay reply for version recovery */
+ if (obd && obd->obd_version_recov &&
+ (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
+ LASSERT(req->rq_repmsg);
+ lustre_msg_add_flags(req->rq_repmsg, MSG_VERSION_REPLAY);
+ }
+
+ /* handle last replay */
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
+ if (obd &&
+ lustre_msg_get_flags(req->rq_reqmsg) & MSG_DELAY_REPLAY) {
+ DEBUG_REQ(D_HA, req,
+ "delayed LAST_REPLAY, queuing reply");
+ rc = target_queue_last_replay_reply(req, rc);
+ LASSERT(req->rq_export->exp_delayed == 0);
+ return rc;
+ }
+
+ if (obd && obd->obd_recovering) { /* normal recovery */
+ DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
+ rc = target_queue_last_replay_reply(req, rc);
+ return rc;
+ }
+
+ /* Lost a race with recovery; let the error path DTRT. */
+ rc = req->rq_status = -ENOTCONN;
+ }
+ target_send_reply(req, rc, fail);
+ return 0;
}
static inline struct ldlm_pool *ldlm_exp2pl(struct obd_export *exp)
{
struct obd_device *obd;
ENTRY;
-
- /*
- * Check that we still have all structures alive as this may
+
+ /*
+ * Check that we still have all structures alive as this may
* be some late rpc in shutdown time.
*/
if (unlikely(!req->rq_export || !req->rq_export->exp_obd ||
RETURN(0);
}
- /*
- * OBD is alive here as export is alive, which we checked above.
+ /*
+ * OBD is alive here as export is alive, which we checked above.
*/
obd = req->rq_export->exp_obd;
LASSERT (list_empty(&rs->rs_obd_list));
LASSERT (list_empty(&rs->rs_exp_list));
- exp = class_export_get (req->rq_export);
+ exp = class_export_get(req->rq_export);
obd = exp->exp_obd;
/* disable reply scheduling onto srv_reply_queue while I'm setting up */
rs->rs_transno = req->rq_transno;
rs->rs_export = exp;
- spin_lock(&obd->obd_uncommitted_replies_lock);
+ spin_lock(&exp->exp_uncommitted_replies_lock);
- if (rs->rs_transno > obd->obd_last_committed) {
+ /* VBR: use exp_last_committed */
+ if (rs->rs_transno > exp->exp_last_committed) {
/* not committed already */
list_add_tail (&rs->rs_obd_list,
- &obd->obd_uncommitted_replies);
+ &exp->exp_uncommitted_replies);
}
- spin_unlock (&obd->obd_uncommitted_replies_lock);
+ spin_unlock (&exp->exp_uncommitted_replies_lock);
spin_lock (&exp->exp_lock);
list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies);
int target_handle_ping(struct ptlrpc_request *req)
{
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY &&
+ req->rq_export->exp_in_recovery) {
+ spin_lock(&req->rq_export->exp_lock);
+ req->rq_export->exp_in_recovery = 0;
+ spin_unlock(&req->rq_export->exp_lock);
+ }
obd_ping(req->rq_export);
return lustre_pack_reply(req, 1, NULL, NULL);
}
void target_committed_to_req(struct ptlrpc_request *req)
{
- struct obd_device *obd = req->rq_export->exp_obd;
-
- if (!obd->obd_no_transno && req->rq_repmsg != NULL)
+ struct obd_export *exp = req->rq_export;
+ if (!exp->exp_obd->obd_no_transno && req->rq_repmsg != NULL) {
lustre_msg_set_last_committed(req->rq_repmsg,
- obd->obd_last_committed);
- else
+ exp->exp_last_committed);
+ } else {
DEBUG_REQ(D_IOCTL, req, "not sending last_committed update (%d/"
- "%d)", obd->obd_no_transno, req->rq_repmsg == NULL);
-
+ "%d)", exp->exp_obd->obd_no_transno,
+ req->rq_repmsg == NULL);
+ }
CDEBUG(D_INFO, "last_committed x"LPU64", this req x"LPU64"\n",
- obd->obd_last_committed, req->rq_xid);
+ exp->exp_obd->obd_last_committed, req->rq_xid);
}
EXPORT_SYMBOL(target_committed_to_req);
oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
lustre_swab_obd_quotactl);
if (oqctl == NULL) {
- CERROR("Can't unpack obd_quotactl\n");
+ CERROR("Can't unpack obd_quatactl\n");
RETURN(-EPROTO);
}