int rq_portal, rp_portal, connect_op;
char *name = obddev->obd_type->typ_name;
char *mgmt_name = NULL;
- int rc = 0;
+ int rc;
struct obd_device *mgmt_obd;
mgmtcli_register_for_events_t register_f;
ENTRY;
sema_init(&cli->cl_sem, 1);
cli->cl_conn_count = 0;
- memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, min(lcfg->lcfg_inllen2,
- sizeof(server_uuid)));
+ memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2,
+ min_t(unsigned int, lcfg->lcfg_inllen2, sizeof(server_uuid)));
cli->cl_dirty = 0;
cli->cl_avail_grant = 0;
INIT_LIST_HEAD(&cli->cl_cache_waiters);
INIT_LIST_HEAD(&cli->cl_loi_ready_list);
INIT_LIST_HEAD(&cli->cl_loi_write_list);
+ INIT_LIST_HEAD(&cli->cl_loi_read_list);
spin_lock_init(&cli->cl_loi_list_lock);
cli->cl_brw_in_flight = 0;
spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
spin_lock_init(&cli->cl_write_rpc_hist.oh_lock);
spin_lock_init(&cli->cl_read_page_hist.oh_lock);
spin_lock_init(&cli->cl_write_page_hist.oh_lock);
- cli->cl_max_pages_per_rpc = PTL_MD_MAX_PAGES;
+ cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES;
cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
- ldlm_get_ref();
+ rc = ldlm_get_ref();
if (rc) {
CERROR("ldlm_get_ref failed: %d\n", rc);
GOTO(err, rc);
GOTO(out_ldlm, rc);
}
+ ptlrpc_pinger_add_import(imp);
EXIT;
if (rc) {
/* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
if (obd->obd_no_recov)
- ptlrpc_set_import_active(imp, 0);
+ ptlrpc_invalidate_import(imp, 0);
else
rc = ptlrpc_disconnect_import(imp);
struct lustre_handle *hdl;
hdl = &exp->exp_imp_reverse->imp_remote_handle;
/* Might be a re-connect after a partition. */
- if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
+#warning "FIXME ASAP"
+ memcpy(&hdl->cookie, &conn->cookie, sizeof(conn->cookie));
+ if (1 || !memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
CERROR("%s reconnecting\n", cluuid->uuid);
conn->cookie = exp->exp_handle.h_cookie;
- RETURN(EALREADY);
+ /*RETURN(EALREADY);*/
+ RETURN(0);
} else {
CERROR("%s reconnecting from %s, "
"handle mismatch (ours "LPX64", theirs "
struct list_head *p;
char *str, *tmp;
int rc = 0, abort_recovery;
+ unsigned long flags;
ENTRY;
+ OBD_RACE(OBD_FAIL_TGT_CONN_RACE);
+
LASSERT_REQSWAB (req, 0);
str = lustre_msg_string(req->rq_reqmsg, 0, sizeof(tgtuuid) - 1);
if (str == NULL) {
if (!target) {
target = class_name2obd(str);
}
-
+
if (!target || target->obd_stopping || !target->obd_set_up) {
CERROR("UUID '%s' is not available for connect\n", str);
+
GOTO(out, rc = -ENODEV);
}
obd_str2uuid (&cluuid, str);
/* XXX extract a nettype and format accordingly */
- snprintf(remote_uuid.uuid, sizeof remote_uuid,
- "NET_"LPX64"_UUID", req->rq_peer.peer_nid);
+ switch (sizeof(ptl_nid_t)) {
+ /* NB the casts only avoid compiler warnings */
+ case 8:
+ snprintf(remote_uuid.uuid, sizeof remote_uuid,
+ "NET_"LPX64"_UUID", (__u64)req->rq_peer.peer_nid);
+ break;
+ case 4:
+ snprintf(remote_uuid.uuid, sizeof remote_uuid,
+ "NET_%x_UUID", (__u32)req->rq_peer.peer_nid);
+ break;
+ default:
+ LBUG();
+ }
spin_lock_bh(&target->obd_processing_task_lock);
abort_recovery = target->obd_abort_recovery;
} else if (req->rq_reqmsg->conn_cnt == 1) {
CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n",
cluuid.uuid);
- GOTO(out, rc = -EALREADY);
+#warning "FIXME ASAP"
+ /*GOTO(out, rc = -EALREADY);*/
}
/* Tell the client if we're in recovery. */
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
target_start_recovery_timer(target, handler);
}
-
+#if 0
/* Tell the client if we support replayable requests */
if (target->obd_replayable)
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
-
+#endif
if (export == NULL) {
if (target->obd_recovering) {
CERROR("denying connection for new client %s: "
rc = obd_connect(&conn, target, &cluuid);
}
}
-
+ /* Tell the client if we support replayable requests */
+ if (target->obd_replayable)
+ lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
/* If all else goes well, this is our RPC return code. */
req->rq_status = 0;
if (rc && rc != EALREADY)
GOTO(out, rc);
- /* XXX track this all the time? */
- if (target->obd_recovering) {
- target->obd_connected_clients++;
- }
-
req->rq_repmsg->handle = conn;
/* If the client and the server are the same node, we will already
export = req->rq_export = class_conn2export(&conn);
LASSERT(export != NULL);
+ spin_lock_irqsave(&export->exp_lock, flags);
+#warning "FIXME ASAP"
+ if (0 && export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) {
+ CERROR("%s: already connected at a higher conn_cnt: %d > %d\n",
+ cluuid.uuid, export->exp_conn_cnt,
+ req->rq_reqmsg->conn_cnt);
+ spin_unlock_irqrestore(&export->exp_lock, flags);
+ GOTO(out, rc = -EALREADY);
+ }
+ export->exp_conn_cnt = req->rq_reqmsg->conn_cnt;
+ spin_unlock_irqrestore(&export->exp_lock, flags);
+
/* request from liblustre? */
if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT)
export->exp_libclient = 1;
export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
&remote_uuid);
- LASSERT(export->exp_conn_cnt < req->rq_reqmsg->conn_cnt);
- export->exp_conn_cnt = req->rq_reqmsg->conn_cnt;
-
if (rc == EALREADY) {
/* We indicate the reconnection in a flag, not an error code. */
lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
GOTO(out, rc = 0);
}
+ if (target->obd_recovering) {
+ target->obd_connected_clients++;
+ }
+
memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
sizeof conn);
obd->obd_recovery_handler = handler;
obd->obd_recovery_timer.function = target_recovery_expired;
obd->obd_recovery_timer.data = (unsigned long)obd;
- init_timer(&obd->obd_recovery_timer);
spin_unlock_bh(&obd->obd_processing_task_lock);
reset_recovery_timer(obd);
queue_len = obd->obd_requests_queued_for_recovery;
next_transno = obd->obd_next_recovery_transno;
+ CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, "
+ "req_transno: "LPU64", next_transno: "LPU64"\n",
+ max, connected, completed, queue_len, req_transno, next_transno);
if (obd->obd_abort_recovery) {
CDEBUG(D_HA, "waking for aborted recovery\n");
wake_up = 1;
* Also, if this request has a transno less than the one we're waiting
* for, we should process it now. It could (and currently always will)
* be an open request for a descriptor that was opened some time ago.
+ *
+ * Also, a resent, replayed request that has already been
+ * handled will pass through here and be processed immediately.
*/
if (obd->obd_processing_task == current->pid ||
transno < obd->obd_next_recovery_transno) {
return 1;
}
+ /* A resent, replayed request that is still on the queue; just drop it.
+ The queued request will handle this. */
+ if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) ==
+ (MSG_RESENT | MSG_REPLAY)) {
+ DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ OBD_FREE(reqmsg, req->rq_reqlen);
+ OBD_FREE(saved_req, sizeof *saved_req);
+ return 0;
+ }
+
memcpy(saved_req, req, sizeof *req);
memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
req = saved_req;