X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lib.c;h=4f113f4df496d406ef96d33c1f9dfe8fe75d7fa8;hp=9d4934ed84403ea0e1ef5719d7696d3c243b18ce;hb=401deb5075f9ab7f6c8c1831c56a84b0134e923c;hpb=4d22d9a6a7be61064a76f69ff3b8ca5b84b57f2c diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 9d4934e..4f113f4 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -91,27 +91,25 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) RETURN(-EINVAL); } - sema_init(&cli->cl_sem, 1); cli->cl_conn_count = 0; - memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, MIN(lcfg->lcfg_inllen2, - sizeof(server_uuid))); + memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, + min_t(unsigned int, lcfg->lcfg_inllen2, sizeof(server_uuid))); - init_MUTEX(&cli->cl_dirty_sem); cli->cl_dirty = 0; - cli->cl_dirty_granted = 0; + cli->cl_avail_grant = 0; cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024; - cli->cl_ost_can_grant = 1; INIT_LIST_HEAD(&cli->cl_cache_waiters); INIT_LIST_HEAD(&cli->cl_loi_ready_list); INIT_LIST_HEAD(&cli->cl_loi_write_list); + INIT_LIST_HEAD(&cli->cl_loi_read_list); spin_lock_init(&cli->cl_loi_list_lock); cli->cl_brw_in_flight = 0; spin_lock_init(&cli->cl_read_rpc_hist.oh_lock); spin_lock_init(&cli->cl_write_rpc_hist.oh_lock); spin_lock_init(&cli->cl_read_page_hist.oh_lock); spin_lock_init(&cli->cl_write_page_hist.oh_lock); - cli->cl_max_pages_per_rpc = PTL_MD_MAX_PAGES; + cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES; cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT; ldlm_get_ref(); @@ -258,6 +256,7 @@ int client_connect_import(struct lustre_handle *dlm_handle, GOTO(out_ldlm, rc); } + ptlrpc_pinger_add_import(imp); EXIT; if (rc) { @@ -315,7 +314,7 @@ int client_disconnect_export(struct obd_export *exp, int failover) /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */ if (obd->obd_no_recov) - ptlrpc_set_import_active(imp, 0); + ptlrpc_invalidate_import(imp, 0); else rc = ptlrpc_disconnect_import(imp); @@ -340,10 +339,13 @@ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, struct lustre_handle *hdl; hdl = &exp->exp_imp_reverse->imp_remote_handle; /* Might be a re-connect after a partition. */ - if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) { +#warning "FIXME ASAP" + memcpy(&hdl->cookie, &conn->cookie, sizeof(conn->cookie)); + if (1 || !memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) { CERROR("%s reconnecting\n", cluuid->uuid); conn->cookie = exp->exp_handle.h_cookie; - RETURN(EALREADY); + /*RETURN(EALREADY);*/ + RETURN(0); } else { CERROR("%s reconnecting from %s, " "handle mismatch (ours "LPX64", theirs " @@ -374,8 +376,11 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) struct list_head *p; char *str, *tmp; int rc = 0, abort_recovery; + unsigned long flags; ENTRY; + OBD_RACE(OBD_FAIL_TGT_CONN_RACE); + LASSERT_REQSWAB (req, 0); str = lustre_msg_string(req->rq_reqmsg, 0, sizeof(tgtuuid) - 1); if (str == NULL) { @@ -388,9 +393,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) if (!target) { target = class_name2obd(str); } - + if (!target || target->obd_stopping || !target->obd_set_up) { CERROR("UUID '%s' is not available for connect\n", str); + GOTO(out, rc = -ENODEV); } @@ -445,7 +451,8 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) } else if (req->rq_reqmsg->conn_cnt == 1) { CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n", cluuid.uuid); - GOTO(out, rc = -EALREADY); +#warning "FIXME ASAP" + /*GOTO(out, rc = -EALREADY);*/ } /* Tell the client if we're in recovery. */ @@ -472,6 +479,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) } } + /* If all else goes well, this is our RPC return code. */ req->rq_status = 0; @@ -499,14 +507,27 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) export = req->rq_export = class_conn2export(&conn); LASSERT(export != NULL); + spin_lock_irqsave(&export->exp_lock, flags); +#warning "FIXME ASAP" + if (0 && export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) { + CERROR("%s: already connected at a higher conn_cnt: %d > %d\n", + cluuid.uuid, export->exp_conn_cnt, + req->rq_reqmsg->conn_cnt); + spin_unlock_irqrestore(&export->exp_lock, flags); + GOTO(out, rc = -EALREADY); + } + export->exp_conn_cnt = req->rq_reqmsg->conn_cnt; + spin_unlock_irqrestore(&export->exp_lock, flags); + + /* request from liblustre? */ + if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT) + export->exp_libclient = 1; + if (export->exp_connection != NULL) ptlrpc_put_connection(export->exp_connection); export->exp_connection = ptlrpc_get_connection(&req->rq_peer, &remote_uuid); - LASSERT(export->exp_conn_cnt < req->rq_reqmsg->conn_cnt); - export->exp_conn_cnt = req->rq_reqmsg->conn_cnt; - if (rc == EALREADY) { /* We indicate the reconnection in a flag, not an error code. */ lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT); @@ -890,6 +911,8 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) int recovery_done = 0; int rc2; + LASSERT ((rc == 0) == (req->rq_reply_state != NULL)); + if (rc) { /* Just like ptlrpc_error, but without the sending. */ rc = lustre_pack_reply(req, 0, NULL, NULL); @@ -897,6 +920,7 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) req->rq_type = PTL_RPC_MSG_ERR; } + LASSERT (!req->rq_reply_state->rs_difficult); LASSERT(list_empty(&req->rq_list)); /* XXX a bit like the request-dup code in queue_recovery_request */ OBD_ALLOC(saved_req, sizeof *saved_req); @@ -907,6 +931,8 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) LBUG(); memcpy(saved_req, req, sizeof *saved_req); memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); + /* the copied req takes over the reply state */ + req->rq_reply_state = NULL; req = saved_req; req->rq_reqmsg = reqmsg; class_export_get(req->rq_export); @@ -956,180 +982,131 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) return 1; } -static void ptlrpc_abort_reply (struct ptlrpc_request *req) +int +target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id) { - /* On return, we must be sure that the ACK callback has either - * happened or will not happen. Note that the SENT callback will - * happen come what may since we successfully posted the PUT. */ - int rc; - struct l_wait_info lwi; - unsigned long flags; - - again: - /* serialise with ACK callback */ - spin_lock_irqsave (&req->rq_lock, flags); - if (!req->rq_want_ack) { - spin_unlock_irqrestore (&req->rq_lock, flags); - /* The ACK callback has happened already. Although the - * SENT callback might still be outstanding (yes really) we - * don't care; this is just like normal completion. */ - return; - } - spin_unlock_irqrestore (&req->rq_lock, flags); - - /* Have a bash at unlinking the MD. This will fail until the SENT - * callback has happened since the MD is busy from the PUT. If the - * ACK still hasn't arrived after then, a successful unlink will - * ensure the ACK callback never happens. */ - rc = PtlMDUnlink (req->rq_reply_md_h); - switch (rc) { - default: - LBUG (); - case PTL_OK: - /* SENT callback happened; ACK callback preempted */ - LASSERT (req->rq_want_ack); - spin_lock_irqsave (&req->rq_lock, flags); - req->rq_want_ack = 0; - spin_unlock_irqrestore (&req->rq_lock, flags); - return; - case PTL_INV_MD: - return; - case PTL_MD_INUSE: - /* Still sending or ACK callback in progress: wait until - * either callback has completed and try again. - * Actually we can't wait for the SENT callback because - * there's no state the SENT callback can touch that will - * allow it to communicate with us! So we just wait here - * for a short time, effectively polling for the SENT - * callback by calling PtlMDUnlink() again, to see if it - * has finished. Note that if the ACK does arrive, its - * callback wakes us in short order. --eeb */ - lwi = LWI_TIMEOUT (HZ/4, NULL, NULL); - rc = l_wait_event(req->rq_reply_waitq, !req->rq_want_ack, - &lwi); - CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc); - /* NB go back and test rq_want_ack with locking, to ensure - * if ACK callback happened, it has completed stopped - * referencing this req. */ - goto again; - } -} - -void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) -{ - int i; - int netrc; - unsigned long flags; - struct ptlrpc_req_ack_lock *ack_lock; - struct l_wait_info lwi = { 0 }; - wait_queue_t commit_wait; - struct obd_device *obd = - req->rq_export ? req->rq_export->exp_obd : NULL; - struct obd_export *exp = NULL; - - if (req->rq_export) { - for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) { - if (req->rq_ack_locks[i].mode) { - exp = req->rq_export; - break; + if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) { + obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED; + DEBUG_REQ(D_ERROR, req, "dropping reply"); + /* NB this does _not_ send with ACK disabled, to simulate + * sending OK, but timing out for the ACK */ + if (req->rq_reply_state != NULL) { + if (!req->rq_reply_state->rs_difficult) { + lustre_free_reply_state (req->rq_reply_state); + req->rq_reply_state = NULL; + } else { + struct ptlrpc_service *svc = + req->rq_rqbd->rqbd_srv_ni->sni_service; + atomic_inc(&svc->srv_outstanding_replies); } } + return (-ECOMM); } - if (exp) { - exp->exp_outstanding_reply = req; - spin_lock_irqsave (&req->rq_lock, flags); - req->rq_want_ack = 1; - spin_unlock_irqrestore (&req->rq_lock, flags); - } - - if (!OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) { - if (rc == 0) { - DEBUG_REQ(D_NET, req, "sending reply"); - netrc = ptlrpc_reply(req); - } else if (rc == -ENOTCONN) { - DEBUG_REQ(D_HA, req, "processing error (%d)", rc); - netrc = ptlrpc_error(req); - } else { - DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc); - netrc = ptlrpc_error(req); + if (rc) { + DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc); + if (req->rq_reply_state == NULL) { + rc = lustre_pack_reply (req, 0, NULL, NULL); + if (rc != 0) { + CERROR ("can't allocate reply\n"); + return (rc); + } } + req->rq_type = PTL_RPC_MSG_ERR; } else { - obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED; - DEBUG_REQ(D_ERROR, req, "dropping reply"); - if (req->rq_repmsg) { - OBD_FREE(req->rq_repmsg, req->rq_replen); - req->rq_repmsg = NULL; - } - init_waitqueue_head(&req->rq_reply_waitq); - netrc = 0; + DEBUG_REQ(D_NET, req, "sending reply"); } + + return (ptlrpc_send_reply(req, 1)); +} - /* a failed send simulates the callbacks */ - LASSERT(netrc == 0 || req->rq_want_ack == 0); - if (exp == NULL) { - LASSERT(req->rq_want_ack == 0); +void +target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) +{ + int netrc; + unsigned long flags; + struct ptlrpc_reply_state *rs; + struct obd_device *obd; + struct obd_export *exp; + struct ptlrpc_srv_ni *sni; + struct ptlrpc_service *svc; + + sni = req->rq_rqbd->rqbd_srv_ni; + svc = sni->sni_service; + + rs = req->rq_reply_state; + if (rs == NULL || !rs->rs_difficult) { + /* The easy case; no notifiers and reply_out_callback() + * cleans up (i.e. we can't look inside rs after a + * successful send) */ + netrc = target_send_reply_msg (req, rc, fail_id); + + LASSERT (netrc == 0 || req->rq_reply_state == NULL); return; } - LASSERT(obd != NULL); - - init_waitqueue_entry(&commit_wait, current); - add_wait_queue(&obd->obd_commit_waitq, &commit_wait); - rc = l_wait_event(req->rq_reply_waitq, - !req->rq_want_ack || req->rq_resent || - req->rq_transno <= obd->obd_last_committed, &lwi); - remove_wait_queue(&obd->obd_commit_waitq, &commit_wait); - - spin_lock_irqsave (&req->rq_lock, flags); - /* If we got here because the ACK callback ran, this acts as a - * barrier to ensure the callback completed the wakeup. */ - spin_unlock_irqrestore (&req->rq_lock, flags); - - /* If we committed the transno already, then we might wake up before - * the ack arrives. We need to stop waiting for the ack before we can - * reuse this request structure. We are guaranteed by this point that - * this cannot abort the sending of the actual reply.*/ - ptlrpc_abort_reply(req); - - if (req->rq_resent) { - DEBUG_REQ(D_HA, req, "resent: not cancelling locks"); - return; + + /* must be an export if locks saved */ + LASSERT (req->rq_export != NULL); + /* req/reply consistent */ + LASSERT (rs->rs_srv_ni == sni); + + /* "fresh" reply */ + LASSERT (!rs->rs_scheduled); + LASSERT (!rs->rs_scheduled_ever); + LASSERT (!rs->rs_handled); + LASSERT (!rs->rs_on_net); + LASSERT (rs->rs_export == NULL); + LASSERT (list_empty(&rs->rs_obd_list)); + LASSERT (list_empty(&rs->rs_exp_list)); + + exp = class_export_get (req->rq_export); + obd = exp->exp_obd; + + /* disable reply scheduling onto srv_reply_queue while I'm setting up */ + rs->rs_scheduled = 1; + rs->rs_on_net = 1; + rs->rs_xid = req->rq_xid; + rs->rs_transno = req->rq_transno; + rs->rs_export = exp; + + spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags); + + if (rs->rs_transno > obd->obd_last_committed) { + /* not committed already */ + list_add_tail (&rs->rs_obd_list, + &obd->obd_uncommitted_replies); } - LASSERT(rc == 0); - DEBUG_REQ(D_HA, req, "cancelling locks for %s", - req->rq_want_ack ? "commit" : "ack"); + spin_unlock (&obd->obd_uncommitted_replies_lock); + spin_lock (&exp->exp_lock); - exp->exp_outstanding_reply = NULL; + list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies); - for (ack_lock = req->rq_ack_locks, i = 0; - i < REQ_MAX_ACK_LOCKS; i++, ack_lock++) { - if (!ack_lock->mode) - continue; - ldlm_lock_decref(&ack_lock->lock, ack_lock->mode); + spin_unlock_irqrestore (&exp->exp_lock, flags); + + netrc = target_send_reply_msg (req, rc, fail_id); + + spin_lock_irqsave (&svc->srv_lock, flags); + + svc->srv_n_difficult_replies++; + + if (netrc != 0) /* error sending: reply is off the net */ + rs->rs_on_net = 0; + + if (!rs->rs_on_net || /* some notifier */ + list_empty(&rs->rs_exp_list) || /* completed already */ + list_empty(&rs->rs_obd_list)) { + list_add_tail (&rs->rs_list, &svc->srv_reply_queue); + wake_up (&svc->srv_waitq); + } else { + list_add (&rs->rs_list, &sni->sni_active_replies); + rs->rs_scheduled = 0; /* allow notifier to schedule */ } + + spin_unlock_irqrestore (&svc->srv_lock, flags); } int target_handle_ping(struct ptlrpc_request *req) { return lustre_pack_reply(req, 0, NULL, NULL); } - -void *ldlm_put_lock_into_req(struct ptlrpc_request *req, - struct lustre_handle *lock, int mode) -{ - int i; - - for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) { - if (req->rq_ack_locks[i].mode) - continue; - CDEBUG(D_HA, "saving lock "LPX64" in req %p ack_lock[%d]\n", - lock->cookie, req, i); - memcpy(&req->rq_ack_locks[i].lock, lock, sizeof(*lock)); - req->rq_ack_locks[i].mode = mode; - return &req->rq_ack_locks[i]; - } - CERROR("no space for lock in struct ptlrpc_request\n"); - LBUG(); - return NULL; -}