int rq_portal, rp_portal, connect_op;
char *name = obddev->obd_type->typ_name;
char *mgmt_name = NULL;
- int rc = 0;
+ int rc;
struct obd_device *mgmt_obd;
mgmtcli_register_for_events_t register_f;
ENTRY;
cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
- ldlm_get_ref();
+ rc = ldlm_get_ref();
if (rc) {
CERROR("ldlm_get_ref failed: %d\n", rc);
GOTO(err, rc);
* -------------------------------------------------------------------------- */
int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
- struct obd_uuid *cluuid)
+ struct obd_uuid *cluuid, int initial_conn)
{
- if (exp->exp_connection) {
+ if (exp->exp_connection && !initial_conn) {
struct lustre_handle *hdl;
hdl = &exp->exp_imp_reverse->imp_remote_handle;
/* Might be a re-connect after a partition. */
-#warning "FIXME ASAP"
- memcpy(&hdl->cookie, &conn->cookie, sizeof(conn->cookie));
- if (1 || !memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
+ if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
CERROR("%s reconnecting\n", cluuid->uuid);
conn->cookie = exp->exp_handle.h_cookie;
- /*RETURN(EALREADY);*/
- RETURN(0);
+ RETURN(EALREADY);
} else {
CERROR("%s reconnecting from %s, "
"handle mismatch (ours "LPX64", theirs "
char *str, *tmp;
int rc = 0, abort_recovery;
unsigned long flags;
+ int initial_conn = 0;
+ char peer_str[PTL_NALFMT_SIZE];
ENTRY;
OBD_RACE(OBD_FAIL_TGT_CONN_RACE);
obd_str2uuid (&cluuid, str);
/* XXX extract a nettype and format accordingly */
- snprintf(remote_uuid.uuid, sizeof remote_uuid,
- "NET_"LPX64"_UUID", req->rq_peer.peer_nid);
+ switch (sizeof(ptl_nid_t)) {
+ /* NB the casts only avoid compiler warnings */
+ case 8:
+ snprintf(remote_uuid.uuid, sizeof remote_uuid,
+ "NET_"LPX64"_UUID", (__u64)req->rq_peer.peer_nid);
+ break;
+ case 4:
+ snprintf(remote_uuid.uuid, sizeof remote_uuid,
+ "NET_%x_UUID", (__u32)req->rq_peer.peer_nid);
+ break;
+ default:
+ LBUG();
+ }
spin_lock_bh(&target->obd_processing_task_lock);
abort_recovery = target->obd_abort_recovery;
rc = lustre_pack_reply(req, 0, NULL, NULL);
if (rc)
GOTO(out, rc);
-
+
+ if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_INITIAL)
+ initial_conn = 1;
+
/* lctl gets a backstage, all-access pass. */
if (obd_uuid_equals(&cluuid, &target->obd_uuid))
goto dont_check_exports;
spin_unlock(&target->obd_dev_lock);
LASSERT(export->exp_obd == target);
- rc = target_handle_reconnect(&conn, export, &cluuid);
+ rc = target_handle_reconnect(&conn, export, &cluuid,
+ initial_conn);
break;
}
export = NULL;
/* If we found an export, we already unlocked. */
if (!export) {
spin_unlock(&target->obd_dev_lock);
- } else if (req->rq_reqmsg->conn_cnt == 1) {
+ } else if (req->rq_export == NULL &&
+ atomic_read(&export->exp_rpc_count) > 0) {
+ CWARN("%s: refuse connection from %s/%s to 0x%p/%d\n",
+ target->obd_name, cluuid.uuid,
+ ptlrpc_peernid2str(&req->rq_peer, peer_str),
+ export, atomic_read(&export->exp_refcount));
+ GOTO(out, rc = -EBUSY);
+ } else if (req->rq_export != NULL &&
+ atomic_read(&export->exp_rpc_count) > 1) {
+ CWARN("%s: refuse reconnection from %s@%s to 0x%p/%d\n",
+ target->obd_name, cluuid.uuid,
+ ptlrpc_peernid2str(&req->rq_peer, peer_str),
+ export, atomic_read(&export->exp_rpc_count));
+ GOTO(out, rc = -EBUSY);
+ }
+ else if (req->rq_reqmsg->conn_cnt == 1 && !initial_conn) {
CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n",
cluuid.uuid);
-#warning "FIXME ASAP"
- /*GOTO(out, rc = -EALREADY);*/
+ GOTO(out, rc = -EALREADY);
}
/* Tell the client if we're in recovery. */
/* If this is the first client, start the recovery timer */
+ CWARN("%s: connection from %s@%s %s\n", target->obd_name, cluuid.uuid,
+ ptlrpc_peernid2str(&req->rq_peer, peer_str),
+ target->obd_recovering ? "(recovering)" : "");
if (target->obd_recovering) {
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
target_start_recovery_timer(target, handler);
}
-
+#if 0
/* Tell the client if we support replayable requests */
if (target->obd_replayable)
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
-
+#endif
if (export == NULL) {
if (target->obd_recovering) {
- CERROR("denying connection for new client %s: "
+ CERROR("denying connection for new client %s@%s: "
"%d clients in recovery for %lds\n", cluuid.uuid,
+ ptlrpc_peernid2str(&req->rq_peer, peer_str),
target->obd_recoverable_clients,
(target->obd_recovery_timer.expires-jiffies)/HZ);
rc = -EBUSY;
rc = obd_connect(&conn, target, &cluuid);
}
}
-
+ /* Tell the client if we support replayable requests */
+ if (target->obd_replayable)
+ lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
/* If all else goes well, this is our RPC return code. */
req->rq_status = 0;
if (rc && rc != EALREADY)
GOTO(out, rc);
- /* XXX track this all the time? */
- if (target->obd_recovering) {
- target->obd_connected_clients++;
- }
-
req->rq_repmsg->handle = conn;
/* If the client and the server are the same node, we will already
LASSERT(export != NULL);
spin_lock_irqsave(&export->exp_lock, flags);
-#warning "FIXME ASAP"
- if (0 && export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) {
- CERROR("%s: already connected at a higher conn_cnt: %d > %d\n",
- cluuid.uuid, export->exp_conn_cnt,
+ if (initial_conn) {
+ req->rq_repmsg->conn_cnt = export->exp_conn_cnt + 1;
+ } else if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) {
+ CERROR("%s@%s: already connected at a higher conn_cnt: %d > %d\n",
+ cluuid.uuid, ptlrpc_peernid2str(&req->rq_peer, peer_str),
+ export->exp_conn_cnt,
req->rq_reqmsg->conn_cnt);
spin_unlock_irqrestore(&export->exp_lock, flags);
GOTO(out, rc = -EALREADY);
- }
+ }
export->exp_conn_cnt = req->rq_reqmsg->conn_cnt;
spin_unlock_irqrestore(&export->exp_lock, flags);
GOTO(out, rc = 0);
}
+ if (target->obd_recovering) {
+ target->obd_connected_clients++;
+ }
+
memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
sizeof conn);
obd->obd_recovery_handler = handler;
obd->obd_recovery_timer.function = target_recovery_expired;
obd->obd_recovery_timer.data = (unsigned long)obd;
- init_timer(&obd->obd_recovery_timer);
spin_unlock_bh(&obd->obd_processing_task_lock);
reset_recovery_timer(obd);
queue_len = obd->obd_requests_queued_for_recovery;
next_transno = obd->obd_next_recovery_transno;
+ CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, "
+ "req_transno: "LPU64", next_transno: "LPU64"\n",
+ max, connected, completed, queue_len, req_transno, next_transno);
if (obd->obd_abort_recovery) {
CDEBUG(D_HA, "waking for aborted recovery\n");
wake_up = 1;
* Also, if this request has a transno less than the one we're waiting
* for, we should process it now. It could (and currently always will)
* be an open request for a descriptor that was opened some time ago.
+ *
+ * Also, a resent, replayed request that has already been
+ * handled will pass through here and be processed immediately.
*/
if (obd->obd_processing_task == current->pid ||
transno < obd->obd_next_recovery_transno) {
/* Processing the queue right now, don't re-add. */
+ lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
LASSERT(list_empty(&req->rq_list));
spin_unlock_bh(&obd->obd_processing_task_lock);
OBD_FREE(reqmsg, req->rq_reqlen);
return 1;
}
+ /* A resent, replayed request that is still on the queue; just drop it.
+ The queued request will handle this. */
+ if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) ==
+ (MSG_RESENT | MSG_REPLAY)) {
+ DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ OBD_FREE(reqmsg, req->rq_reqlen);
+ OBD_FREE(saved_req, sizeof *saved_req);
+ return 0;
+ }
+
memcpy(saved_req, req, sizeof *req);
memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
req = saved_req;
list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
spin_lock_bh(&obd->obd_processing_task_lock);
- --obd->obd_recoverable_clients;
+ /* only count the first "replay over" request from each
+ export */
+ if (req->rq_export->exp_replay_needed) {
+ --obd->obd_recoverable_clients;
+ req->rq_export->exp_replay_needed = 0;
+ }
recovery_done = (obd->obd_recoverable_clients == 0);
spin_unlock_bh(&obd->obd_processing_task_lock);