X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lib.c;h=1183afe3147a04bff1c0e0d9440ab181bd2447a4;hp=4f113f4df496d406ef96d33c1f9dfe8fe75d7fa8;hb=9641c7f5ea20ef6163752a3b4cd956a8ada40c9c;hpb=401deb5075f9ab7f6c8c1831c56a84b0134e923c diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 4f113f4..1183afe 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -46,7 +46,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) int rq_portal, rp_portal, connect_op; char *name = obddev->obd_type->typ_name; char *mgmt_name = NULL; - int rc = 0; + int rc; struct obd_device *mgmt_obd; mgmtcli_register_for_events_t register_f; ENTRY; @@ -112,7 +112,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES; cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT; - ldlm_get_ref(); + rc = ldlm_get_ref(); if (rc) { CERROR("ldlm_get_ref failed: %d\n", rc); GOTO(err, rc); @@ -333,19 +333,16 @@ int client_disconnect_export(struct obd_export *exp, int failover) * -------------------------------------------------------------------------- */ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, - struct obd_uuid *cluuid) + struct obd_uuid *cluuid, int initial_conn) { - if (exp->exp_connection) { + if (exp->exp_connection && !initial_conn) { struct lustre_handle *hdl; hdl = &exp->exp_imp_reverse->imp_remote_handle; /* Might be a re-connect after a partition. */ -#warning "FIXME ASAP" - memcpy(&hdl->cookie, &conn->cookie, sizeof(conn->cookie)); - if (1 || !memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) { + if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) { CERROR("%s reconnecting\n", cluuid->uuid); conn->cookie = exp->exp_handle.h_cookie; - /*RETURN(EALREADY);*/ - RETURN(0); + RETURN(EALREADY); } else { CERROR("%s reconnecting from %s, " "handle mismatch (ours "LPX64", theirs " @@ -377,6 +374,8 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) char *str, *tmp; int rc = 0, abort_recovery; unsigned long flags; + int initial_conn = 0; + char peer_str[PTL_NALFMT_SIZE]; ENTRY; OBD_RACE(OBD_FAIL_TGT_CONN_RACE); @@ -410,8 +409,19 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) obd_str2uuid (&cluuid, str); /* XXX extract a nettype and format accordingly */ - snprintf(remote_uuid.uuid, sizeof remote_uuid, - "NET_"LPX64"_UUID", req->rq_peer.peer_nid); + switch (sizeof(ptl_nid_t)) { + /* NB the casts only avoid compiler warnings */ + case 8: + snprintf(remote_uuid.uuid, sizeof remote_uuid, + "NET_"LPX64"_UUID", (__u64)req->rq_peer.peer_nid); + break; + case 4: + snprintf(remote_uuid.uuid, sizeof remote_uuid, + "NET_%x_UUID", (__u32)req->rq_peer.peer_nid); + break; + default: + LBUG(); + } spin_lock_bh(&target->obd_processing_task_lock); abort_recovery = target->obd_abort_recovery; @@ -428,7 +438,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) rc = lustre_pack_reply(req, 0, NULL, NULL); if (rc) GOTO(out, rc); - + + if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_INITIAL) + initial_conn = 1; + /* lctl gets a backstage, all-access pass. */ if (obd_uuid_equals(&cluuid, &target->obd_uuid)) goto dont_check_exports; @@ -440,7 +453,8 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) spin_unlock(&target->obd_dev_lock); LASSERT(export->exp_obd == target); - rc = target_handle_reconnect(&conn, export, &cluuid); + rc = target_handle_reconnect(&conn, export, &cluuid, + initial_conn); break; } export = NULL; @@ -448,28 +462,46 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) /* If we found an export, we already unlocked. */ if (!export) { spin_unlock(&target->obd_dev_lock); - } else if (req->rq_reqmsg->conn_cnt == 1) { + } else if (req->rq_export == NULL && + atomic_read(&export->exp_rpc_count) > 0) { + CWARN("%s: refuse connection from %s/%s to 0x%p/%d\n", + target->obd_name, cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), + export, atomic_read(&export->exp_refcount)); + GOTO(out, rc = -EBUSY); + } else if (req->rq_export != NULL && + atomic_read(&export->exp_rpc_count) > 1) { + CWARN("%s: refuse reconnection from %s@%s to 0x%p/%d\n", + target->obd_name, cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), + export, atomic_read(&export->exp_rpc_count)); + GOTO(out, rc = -EBUSY); + } + else if (req->rq_reqmsg->conn_cnt == 1 && !initial_conn) { CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n", cluuid.uuid); -#warning "FIXME ASAP" - /*GOTO(out, rc = -EALREADY);*/ + GOTO(out, rc = -EALREADY); } /* Tell the client if we're in recovery. */ /* If this is the first client, start the recovery timer */ + CWARN("%s: connection from %s@%s %s\n", target->obd_name, cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), + target->obd_recovering ? "(recovering)" : ""); if (target->obd_recovering) { lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING); target_start_recovery_timer(target, handler); } - +#if 0 /* Tell the client if we support replayable requests */ if (target->obd_replayable) lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE); - +#endif if (export == NULL) { if (target->obd_recovering) { - CERROR("denying connection for new client %s: " + CERROR("denying connection for new client %s@%s: " "%d clients in recovery for %lds\n", cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), target->obd_recoverable_clients, (target->obd_recovery_timer.expires-jiffies)/HZ); rc = -EBUSY; @@ -478,7 +510,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) rc = obd_connect(&conn, target, &cluuid); } } - + /* Tell the client if we support replayable requests */ + if (target->obd_replayable) + lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE); /* If all else goes well, this is our RPC return code. */ req->rq_status = 0; @@ -486,11 +520,6 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) if (rc && rc != EALREADY) GOTO(out, rc); - /* XXX track this all the time? */ - if (target->obd_recovering) { - target->obd_connected_clients++; - } - req->rq_repmsg->handle = conn; /* If the client and the server are the same node, we will already @@ -508,14 +537,16 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) LASSERT(export != NULL); spin_lock_irqsave(&export->exp_lock, flags); -#warning "FIXME ASAP" - if (0 && export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) { - CERROR("%s: already connected at a higher conn_cnt: %d > %d\n", - cluuid.uuid, export->exp_conn_cnt, + if (initial_conn) { + req->rq_repmsg->conn_cnt = export->exp_conn_cnt + 1; + } else if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) { + CERROR("%s@%s: already connected at a higher conn_cnt: %d > %d\n", + cluuid.uuid, ptlrpc_peernid2str(&req->rq_peer, peer_str), + export->exp_conn_cnt, req->rq_reqmsg->conn_cnt); spin_unlock_irqrestore(&export->exp_lock, flags); GOTO(out, rc = -EALREADY); - } + } export->exp_conn_cnt = req->rq_reqmsg->conn_cnt; spin_unlock_irqrestore(&export->exp_lock, flags); @@ -534,6 +565,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) GOTO(out, rc = 0); } + if (target->obd_recovering) { + target->obd_connected_clients++; + } + memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn), sizeof conn); @@ -708,7 +743,6 @@ void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler) obd->obd_recovery_handler = handler; obd->obd_recovery_timer.function = target_recovery_expired; obd->obd_recovery_timer.data = (unsigned long)obd; - init_timer(&obd->obd_recovery_timer); spin_unlock_bh(&obd->obd_processing_task_lock); reset_recovery_timer(obd); @@ -730,6 +764,9 @@ static int check_for_next_transno(struct obd_device *obd) queue_len = obd->obd_requests_queued_for_recovery; next_transno = obd->obd_next_recovery_transno; + CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, " + "req_transno: "LPU64", next_transno: "LPU64"\n", + max, connected, completed, queue_len, req_transno, next_transno); if (obd->obd_abort_recovery) { CDEBUG(D_HA, "waking for aborted recovery\n"); wake_up = 1; @@ -843,10 +880,14 @@ int target_queue_recovery_request(struct ptlrpc_request *req, * Also, if this request has a transno less than the one we're waiting * for, we should process it now. It could (and currently always will) * be an open request for a descriptor that was opened some time ago. + * + * Also, a resent, replayed request that has already been + * handled will pass through here and be processed immediately. */ if (obd->obd_processing_task == current->pid || transno < obd->obd_next_recovery_transno) { /* Processing the queue right now, don't re-add. */ + lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); LASSERT(list_empty(&req->rq_list)); spin_unlock_bh(&obd->obd_processing_task_lock); OBD_FREE(reqmsg, req->rq_reqlen); @@ -854,6 +895,17 @@ int target_queue_recovery_request(struct ptlrpc_request *req, return 1; } + /* A resent, replayed request that is still on the queue; just drop it. + The queued request will handle this. */ + if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) == + (MSG_RESENT | MSG_REPLAY)) { + DEBUG_REQ(D_ERROR, req, "dropping resent queued req"); + spin_unlock_bh(&obd->obd_processing_task_lock); + OBD_FREE(reqmsg, req->rq_reqlen); + OBD_FREE(saved_req, sizeof *saved_req); + return 0; + } + memcpy(saved_req, req, sizeof *req); memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); req = saved_req; @@ -939,7 +991,12 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) list_add(&req->rq_list, &obd->obd_delayed_reply_queue); spin_lock_bh(&obd->obd_processing_task_lock); - --obd->obd_recoverable_clients; + /* only count the first "replay over" request from each + export */ + if (req->rq_export->exp_replay_needed) { + --obd->obd_recoverable_clients; + req->rq_export->exp_replay_needed = 0; + } recovery_done = (obd->obd_recoverable_clients == 0); spin_unlock_bh(&obd->obd_processing_task_lock);