X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lib.c;h=57d1058ca1ca3dcdf5edbcaf257eb65c1ee17670;hb=813544faea368ed6bbaee2b22100483ce91e2a69;hp=30d3ac5fb6e573642d8749758b57d4788c5ad7b4;hpb=e9c557eed0464ad35bacbbca3da18a8df7190795;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 30d3ac5..57d1058 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -93,23 +93,23 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) sema_init(&cli->cl_sem, 1); cli->cl_conn_count = 0; - memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, MIN(lcfg->lcfg_inllen2, - sizeof(server_uuid))); + memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, + min_t(unsigned int, lcfg->lcfg_inllen2, sizeof(server_uuid))); - init_MUTEX(&cli->cl_dirty_sem); cli->cl_dirty = 0; - cli->cl_dirty_granted = 0; + cli->cl_avail_grant = 0; cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024; - cli->cl_ost_can_grant = 1; INIT_LIST_HEAD(&cli->cl_cache_waiters); INIT_LIST_HEAD(&cli->cl_loi_ready_list); + INIT_LIST_HEAD(&cli->cl_loi_write_list); + INIT_LIST_HEAD(&cli->cl_loi_read_list); spin_lock_init(&cli->cl_loi_list_lock); cli->cl_brw_in_flight = 0; spin_lock_init(&cli->cl_read_rpc_hist.oh_lock); spin_lock_init(&cli->cl_write_rpc_hist.oh_lock); spin_lock_init(&cli->cl_read_page_hist.oh_lock); spin_lock_init(&cli->cl_write_page_hist.oh_lock); - cli->cl_max_pages_per_rpc = PTL_MD_MAX_PAGES; + cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES; cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT; ldlm_get_ref(); @@ -135,6 +135,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) imp->imp_obd = obddev; imp->imp_connect_op = connect_op; imp->imp_generation = 0; + imp->imp_initial_recov = 1; INIT_LIST_HEAD(&imp->imp_pinger_chain); memcpy(imp->imp_target_uuid.uuid, lcfg->lcfg_inlbuf1, lcfg->lcfg_inllen1); @@ -244,27 +245,18 @@ int client_connect_import(struct lustre_handle *dlm_handle, GOTO(out_disco, rc = -ENOMEM); imp->imp_dlm_handle = *dlm_handle; - imp->imp_state = LUSTRE_IMP_DISCON; + rc = ptlrpc_init_import(imp); + if (rc != 0) + GOTO(out_ldlm, rc); - rc = ptlrpc_connect_import(imp); + exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection); + rc = ptlrpc_connect_import(imp, NULL); if (rc != 0) { LASSERT (imp->imp_state == LUSTRE_IMP_DISCON); GOTO(out_ldlm, rc); } - LASSERT (imp->imp_state == LUSTRE_IMP_FULL); - - exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection); - - if (imp->imp_replayable) { - CDEBUG(D_HA, "connected to replayable target: %s\n", - imp->imp_target_uuid.uuid); - ptlrpc_pinger_add_import(imp); - } - - CDEBUG(D_HA, "local import: %p, remote handle: "LPX64"\n", imp, - imp->imp_remote_handle.cookie); - + ptlrpc_pinger_add_import(imp); EXIT; if (rc) { @@ -322,12 +314,10 @@ int client_disconnect_export(struct obd_export *exp, int failover) /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */ if (obd->obd_no_recov) - ptlrpc_set_import_active(imp, 0); + ptlrpc_invalidate_import(imp, 0); else rc = ptlrpc_disconnect_import(imp); - imp->imp_state = LUSTRE_IMP_NEW; - EXIT; out_no_disconnect: err = class_disconnect(exp, 0); @@ -383,8 +373,11 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) struct list_head *p; char *str, *tmp; int rc = 0, abort_recovery; + unsigned long flags; ENTRY; + OBD_RACE(OBD_FAIL_TGT_CONN_RACE); + LASSERT_REQSWAB (req, 0); str = lustre_msg_string(req->rq_reqmsg, 0, sizeof(tgtuuid) - 1); if (str == NULL) { @@ -397,7 +390,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) if (!target) { target = class_name2obd(str); } - + if (!target || target->obd_stopping || !target->obd_set_up) { CERROR("UUID '%s' is not available for connect\n", str); GOTO(out, rc = -ENODEV); @@ -481,6 +474,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) } } + /* If all else goes well, this is our RPC return code. */ req->rq_status = 0; @@ -508,16 +502,25 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) export = req->rq_export = class_conn2export(&conn); LASSERT(export != NULL); - if (req->rq_connection != NULL) - ptlrpc_put_connection(req->rq_connection); + spin_lock_irqsave(&export->exp_lock, flags); + if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) { + CERROR("%s: already connected at a higher conn_cnt: %d > %d\n", + cluuid.uuid, export->exp_conn_cnt, + req->rq_reqmsg->conn_cnt); + spin_unlock_irqrestore(&export->exp_lock, flags); + GOTO(out, rc = -EALREADY); + } + export->exp_conn_cnt = req->rq_reqmsg->conn_cnt; + spin_unlock_irqrestore(&export->exp_lock, flags); + + /* request from liblustre? */ + if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT) + export->exp_libclient = 1; + if (export->exp_connection != NULL) ptlrpc_put_connection(export->exp_connection); export->exp_connection = ptlrpc_get_connection(&req->rq_peer, &remote_uuid); - req->rq_connection = ptlrpc_connection_addref(export->exp_connection); - - LASSERT(export->exp_conn_cnt < req->rq_reqmsg->conn_cnt); - export->exp_conn_cnt = req->rq_reqmsg->conn_cnt; if (rc == EALREADY) { /* We indicate the reconnection in a flag, not an error code. */ @@ -531,7 +534,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) if (export->exp_imp_reverse != NULL) class_destroy_import(export->exp_imp_reverse); revimp = export->exp_imp_reverse = class_new_import(); - revimp->imp_connection = ptlrpc_connection_addref(req->rq_connection); + revimp->imp_connection = ptlrpc_connection_addref(export->exp_connection); revimp->imp_client = &export->exp_obd->obd_ldlm_client; revimp->imp_remote_handle = conn; revimp->imp_obd = target; @@ -546,6 +549,7 @@ out: int target_handle_disconnect(struct ptlrpc_request *req) { + struct obd_export *exp; int rc; ENTRY; @@ -553,8 +557,9 @@ int target_handle_disconnect(struct ptlrpc_request *req) if (rc) RETURN(rc); - req->rq_status = obd_disconnect(req->rq_export, 0); - req->rq_export = NULL; + /* keep the rq_export around so we can send the reply */ + exp = class_export_get(req->rq_export); + req->rq_status = obd_disconnect(exp, 0); RETURN(0); } @@ -567,18 +572,14 @@ void target_destroy_export(struct obd_export *exp) /* We cancel locks at disconnect time, but this will catch any locks * granted in a race with recovery-induced disconnect. */ - ldlm_cancel_locks_for_export(exp); + if (exp->exp_obd->obd_namespace != NULL) + ldlm_cancel_locks_for_export(exp); } /* * Recovery functions */ -void target_cancel_recovery_timer(struct obd_device *obd) -{ - del_timer(&obd->obd_recovery_timer); -} - static void abort_delayed_replies(struct obd_device *obd) { struct ptlrpc_request *req; @@ -589,6 +590,7 @@ static void abort_delayed_replies(struct obd_device *obd) req->rq_status = -ENOTCONN; req->rq_type = PTL_RPC_MSG_ERR; ptlrpc_reply(req); + class_export_put(req->rq_export); list_del(&req->rq_list); OBD_FREE(req->rq_reqmsg, req->rq_reqlen); OBD_FREE(req, sizeof *req); @@ -641,7 +643,7 @@ void target_abort_recovery(void *data) class_disconnect_exports(obd, 0); - /* when recovery was abort, cleanup orphans for mds */ + /* when recovery was aborted, cleanup orphans on mds and ost */ if (OBT(obd) && OBP(obd, postrecov)) { rc = OBP(obd, postrecov)(obd); if (rc >= 0) @@ -665,18 +667,25 @@ static void target_recovery_expired(unsigned long castmeharder) spin_unlock_bh(&obd->obd_processing_task_lock); } -static void reset_recovery_timer(struct obd_device *obd) + +/* obd_processing_task_lock should be held */ +void target_cancel_recovery_timer(struct obd_device *obd) { - int recovering; - spin_lock(&obd->obd_dev_lock); - recovering = obd->obd_recovering; - spin_unlock(&obd->obd_dev_lock); + CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name); + del_timer(&obd->obd_recovery_timer); +} - if (!recovering) +static void reset_recovery_timer(struct obd_device *obd) +{ + spin_lock_bh(&obd->obd_processing_task_lock); + if (!obd->obd_recovering) { + spin_unlock_bh(&obd->obd_processing_task_lock); return; + } CDEBUG(D_HA, "timer will expire in %u seconds\n", OBD_RECOVERY_TIMEOUT / HZ); mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT); + spin_unlock_bh(&obd->obd_processing_task_lock); } @@ -896,6 +905,8 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) int recovery_done = 0; int rc2; + LASSERT ((rc == 0) == (req->rq_reply_state != NULL)); + if (rc) { /* Just like ptlrpc_error, but without the sending. */ rc = lustre_pack_reply(req, 0, NULL, NULL); @@ -903,6 +914,7 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) req->rq_type = PTL_RPC_MSG_ERR; } + LASSERT (!req->rq_reply_state->rs_difficult); LASSERT(list_empty(&req->rq_list)); /* XXX a bit like the request-dup code in queue_recovery_request */ OBD_ALLOC(saved_req, sizeof *saved_req); @@ -913,8 +925,11 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) LBUG(); memcpy(saved_req, req, sizeof *saved_req); memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); + /* the copied req takes over the reply state */ + req->rq_reply_state = NULL; req = saved_req; req->rq_reqmsg = reqmsg; + class_export_get(req->rq_export); list_add(&req->rq_list, &obd->obd_delayed_reply_queue); spin_lock_bh(&obd->obd_processing_task_lock); @@ -927,14 +942,17 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace); CWARN("%s: all clients recovered, sending delayed replies\n", obd->obd_name); + spin_lock_bh(&obd->obd_processing_task_lock); obd->obd_recovering = 0; + target_cancel_recovery_timer(obd); + spin_unlock_bh(&obd->obd_processing_task_lock); - /* when recovering finished, cleanup orphans for mds */ + /* when recovery finished, cleanup orphans on mds and ost */ if (OBT(obd) && OBP(obd, postrecov)) { rc2 = OBP(obd, postrecov)(obd); if (rc2 >= 0) - CWARN("%s: all clients recovered, %d MDS orphans " - "deleted\n", obd->obd_name, rc2); + CWARN("%s: all clients recovered, %d MDS " + "orphans deleted\n", obd->obd_name, rc2); else CERROR("postrecov failed %d\n", rc2); } @@ -943,11 +961,12 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) req = list_entry(tmp, struct ptlrpc_request, rq_list); DEBUG_REQ(D_ERROR, req, "delayed:"); ptlrpc_reply(req); + class_export_put(req->rq_export); list_del(&req->rq_list); OBD_FREE(req->rq_reqmsg, req->rq_reqlen); OBD_FREE(req, sizeof *req); } - target_cancel_recovery_timer(obd); + ptlrpc_run_recovery_over_upcall(obd); } else { CWARN("%s: %d recoverable clients remain\n", obd->obd_name, obd->obd_recoverable_clients); @@ -957,179 +976,131 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) return 1; } -static void ptlrpc_abort_reply (struct ptlrpc_request *req) +int +target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id) { - /* On return, we must be sure that the ACK callback has either - * happened or will not happen. Note that the SENT callback will - * happen come what may since we successfully posted the PUT. */ - int rc; - struct l_wait_info lwi; - unsigned long flags; - - again: - /* serialise with ACK callback */ - spin_lock_irqsave (&req->rq_lock, flags); - if (!req->rq_want_ack) { - spin_unlock_irqrestore (&req->rq_lock, flags); - /* The ACK callback has happened already. Although the - * SENT callback might still be outstanding (yes really) we - * don't care; this is just like normal completion. */ - return; - } - spin_unlock_irqrestore (&req->rq_lock, flags); - - /* Have a bash at unlinking the MD. This will fail until the SENT - * callback has happened since the MD is busy from the PUT. If the - * ACK still hasn't arrived after then, a successful unlink will - * ensure the ACK callback never happens. */ - rc = PtlMDUnlink (req->rq_reply_md_h); - switch (rc) { - default: - LBUG (); - case PTL_OK: - /* SENT callback happened; ACK callback preempted */ - LASSERT (req->rq_want_ack); - spin_lock_irqsave (&req->rq_lock, flags); - req->rq_want_ack = 0; - spin_unlock_irqrestore (&req->rq_lock, flags); - return; - case PTL_INV_MD: - return; - case PTL_MD_INUSE: - /* Still sending or ACK callback in progress: wait until - * either callback has completed and try again. - * Actually we can't wait for the SENT callback because - * there's no state the SENT callback can touch that will - * allow it to communicate with us! So we just wait here - * for a short time, effectively polling for the SENT - * callback by calling PtlMDUnlink() again, to see if it - * has finished. Note that if the ACK does arrive, its - * callback wakes us in short order. --eeb */ - lwi = LWI_TIMEOUT (HZ/4, NULL, NULL); - rc = l_wait_event(req->rq_reply_waitq, !req->rq_want_ack, - &lwi); - CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc); - /* NB go back and test rq_want_ack with locking, to ensure - * if ACK callback happened, it has completed stopped - * referencing this req. */ - goto again; - } -} - -void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) -{ - int i; - int netrc; - unsigned long flags; - struct ptlrpc_req_ack_lock *ack_lock; - struct l_wait_info lwi = { 0 }; - wait_queue_t commit_wait; - struct obd_device *obd = - req->rq_export ? req->rq_export->exp_obd : NULL; - struct obd_export *exp = NULL; - - if (req->rq_export) { - for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) { - if (req->rq_ack_locks[i].mode) { - exp = req->rq_export; - break; + if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) { + obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED; + DEBUG_REQ(D_ERROR, req, "dropping reply"); + /* NB this does _not_ send with ACK disabled, to simulate + * sending OK, but timing out for the ACK */ + if (req->rq_reply_state != NULL) { + if (!req->rq_reply_state->rs_difficult) { + lustre_free_reply_state (req->rq_reply_state); + req->rq_reply_state = NULL; + } else { + struct ptlrpc_service *svc = + req->rq_rqbd->rqbd_srv_ni->sni_service; + atomic_inc(&svc->srv_outstanding_replies); } } + return (-ECOMM); } - if (exp) { - exp->exp_outstanding_reply = req; - spin_lock_irqsave (&req->rq_lock, flags); - req->rq_want_ack = 1; - spin_unlock_irqrestore (&req->rq_lock, flags); - } - - if (!OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) { - if (rc == 0) { - DEBUG_REQ(D_NET, req, "sending reply"); - netrc = ptlrpc_reply(req); - } else if (rc == -ENOTCONN) { - DEBUG_REQ(D_HA, req, "processing error (%d)", rc); - netrc = ptlrpc_error(req); - } else { - DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc); - netrc = ptlrpc_error(req); + if (rc) { + DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc); + if (req->rq_reply_state == NULL) { + rc = lustre_pack_reply (req, 0, NULL, NULL); + if (rc != 0) { + CERROR ("can't allocate reply\n"); + return (rc); + } } + req->rq_type = PTL_RPC_MSG_ERR; } else { - obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED; - DEBUG_REQ(D_ERROR, req, "dropping reply"); - if (req->rq_repmsg) { - OBD_FREE(req->rq_repmsg, req->rq_replen); - req->rq_repmsg = NULL; - } - init_waitqueue_head(&req->rq_reply_waitq); - netrc = 0; + DEBUG_REQ(D_NET, req, "sending reply"); } + + return (ptlrpc_send_reply(req, 1)); +} - /* a failed send simulates the callbacks */ - LASSERT(netrc == 0 || req->rq_want_ack == 0); - if (exp == NULL) { - LASSERT(req->rq_want_ack == 0); +void +target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) +{ + int netrc; + unsigned long flags; + struct ptlrpc_reply_state *rs; + struct obd_device *obd; + struct obd_export *exp; + struct ptlrpc_srv_ni *sni; + struct ptlrpc_service *svc; + + sni = req->rq_rqbd->rqbd_srv_ni; + svc = sni->sni_service; + + rs = req->rq_reply_state; + if (rs == NULL || !rs->rs_difficult) { + /* The easy case; no notifiers and reply_out_callback() + * cleans up (i.e. we can't look inside rs after a + * successful send) */ + netrc = target_send_reply_msg (req, rc, fail_id); + + LASSERT (netrc == 0 || req->rq_reply_state == NULL); return; } - LASSERT(obd != NULL); - - init_waitqueue_entry(&commit_wait, current); - add_wait_queue(&obd->obd_commit_waitq, &commit_wait); - rc = l_wait_event(req->rq_reply_waitq, - !req->rq_want_ack || req->rq_resent || - req->rq_transno <= obd->obd_last_committed, &lwi); - remove_wait_queue(&obd->obd_commit_waitq, &commit_wait); - - spin_lock_irqsave (&req->rq_lock, flags); - /* If we got here because the ACK callback ran, this acts as a - * barrier to ensure the callback completed the wakeup. */ - spin_unlock_irqrestore (&req->rq_lock, flags); - - /* If we committed the transno already, then we might wake up before - * the ack arrives. We need to stop waiting for the ack before we can - * reuse this request structure. We are guaranteed by this point that - * this cannot abort the sending of the actual reply.*/ - ptlrpc_abort_reply(req); - - if (req->rq_resent) { - DEBUG_REQ(D_HA, req, "resent: not cancelling locks"); - return; + + /* must be an export if locks saved */ + LASSERT (req->rq_export != NULL); + /* req/reply consistent */ + LASSERT (rs->rs_srv_ni == sni); + + /* "fresh" reply */ + LASSERT (!rs->rs_scheduled); + LASSERT (!rs->rs_scheduled_ever); + LASSERT (!rs->rs_handled); + LASSERT (!rs->rs_on_net); + LASSERT (rs->rs_export == NULL); + LASSERT (list_empty(&rs->rs_obd_list)); + LASSERT (list_empty(&rs->rs_exp_list)); + + exp = class_export_get (req->rq_export); + obd = exp->exp_obd; + + /* disable reply scheduling onto srv_reply_queue while I'm setting up */ + rs->rs_scheduled = 1; + rs->rs_on_net = 1; + rs->rs_xid = req->rq_xid; + rs->rs_transno = req->rq_transno; + rs->rs_export = exp; + + spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags); + + if (rs->rs_transno > obd->obd_last_committed) { + /* not committed already */ + list_add_tail (&rs->rs_obd_list, + &obd->obd_uncommitted_replies); } - LASSERT(rc == 0); - DEBUG_REQ(D_HA, req, "cancelling locks for %s", - req->rq_want_ack ? "commit" : "ack"); + spin_unlock (&obd->obd_uncommitted_replies_lock); + spin_lock (&exp->exp_lock); - exp->exp_outstanding_reply = NULL; + list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies); - for (ack_lock = req->rq_ack_locks, i = 0; - i < REQ_MAX_ACK_LOCKS; i++, ack_lock++) { - if (!ack_lock->mode) - continue; - ldlm_lock_decref(&ack_lock->lock, ack_lock->mode); + spin_unlock_irqrestore (&exp->exp_lock, flags); + + netrc = target_send_reply_msg (req, rc, fail_id); + + spin_lock_irqsave (&svc->srv_lock, flags); + + svc->srv_n_difficult_replies++; + + if (netrc != 0) /* error sending: reply is off the net */ + rs->rs_on_net = 0; + + if (!rs->rs_on_net || /* some notifier */ + list_empty(&rs->rs_exp_list) || /* completed already */ + list_empty(&rs->rs_obd_list)) { + list_add_tail (&rs->rs_list, &svc->srv_reply_queue); + wake_up (&svc->srv_waitq); + } else { + list_add (&rs->rs_list, &sni->sni_active_replies); + rs->rs_scheduled = 0; /* allow notifier to schedule */ } + + spin_unlock_irqrestore (&svc->srv_lock, flags); } int target_handle_ping(struct ptlrpc_request *req) { return lustre_pack_reply(req, 0, NULL, NULL); } - -void *ldlm_put_lock_into_req(struct ptlrpc_request *req, - struct lustre_handle *lock, int mode) -{ - int i; - - for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) { - if (req->rq_ack_locks[i].mode) - continue; - memcpy(&req->rq_ack_locks[i].lock, lock, sizeof(*lock)); - req->rq_ack_locks[i].mode = mode; - return &req->rq_ack_locks[i]; - } - CERROR("no space for lock in struct ptlrpc_request\n"); - LBUG(); - return NULL; -} -