X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lib.c;h=d0e24663f1d2343ee7a4092358a25627378cf644;hb=c39489126f88bb5b30643ebb11c72fbe9f9d2241;hp=4b7eb3bc0ee332954abfe42df122b33922a123d1;hpb=30c3a18963d1d6d70175fbbbdd9554e1eb2fa40d;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 4b7eb3b..d0e2466 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -35,10 +35,142 @@ #include #include #include +/* @priority: if non-zero, move the selected to the list head + * @nocreate: if non-zero, only search in existed connections + */ +static int import_set_conn(struct obd_import *imp, struct obd_uuid *uuid, + int priority, int nocreate) +{ + struct ptlrpc_connection *ptlrpc_conn; + struct obd_import_conn *imp_conn = NULL, *item; + int rc = 0; + ENTRY; + + LASSERT(!(nocreate && !priority)); + + ptlrpc_conn = ptlrpc_uuid_to_connection(uuid); + if (!ptlrpc_conn) { + CERROR("can't find connection %s\n", uuid->uuid); + RETURN (-EINVAL); + } + + if (!nocreate) { + OBD_ALLOC(imp_conn, sizeof(*imp_conn)); + if (!imp_conn) { + CERROR("fail to alloc memory\n"); + GOTO(out_put, rc = -ENOMEM); + } + } + + spin_lock(&imp->imp_lock); + list_for_each_entry(item, &imp->imp_conn_list, oic_item) { + if (obd_uuid_equals(uuid, &item->oic_uuid)) { + if (priority) { + list_del(&item->oic_item); + list_add(&item->oic_item, &imp->imp_conn_list); + item->oic_last_attempt = 0; + } + CDEBUG(D_HA, "imp %p@%s: find existed conn %s%s\n", + imp, imp->imp_obd->obd_name, uuid->uuid, + (priority ? ", move to head." : "")); + spin_unlock(&imp->imp_lock); + GOTO(out_free, rc = 0); + } + } + /* not found */ + if (!nocreate) { + imp_conn->oic_conn = ptlrpc_conn; + imp_conn->oic_uuid = *uuid; + imp_conn->oic_last_attempt = 0; + if (priority) + list_add(&imp_conn->oic_item, &imp->imp_conn_list); + else + list_add_tail(&imp_conn->oic_item, &imp->imp_conn_list); + CDEBUG(D_HA, "imp %p@%s: add connection %s at %s\n", + imp, imp->imp_obd->obd_name, uuid->uuid, + (priority ? "head" : "tail")); + } else + rc = -ENOENT; + + spin_unlock(&imp->imp_lock); + RETURN(0); +out_free: + if (imp_conn) + OBD_FREE(imp_conn, sizeof(*imp_conn)); +out_put: + ptlrpc_put_connection(ptlrpc_conn); + RETURN(rc); +} + +int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid) +{ + return import_set_conn(imp, uuid, 1, 1); +} + +int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid, + int priority) +{ + return import_set_conn(imp, uuid, priority, 0); +} + +int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid) +{ + struct obd_import_conn *imp_conn; + struct obd_export *dlmexp; + int rc = -ENOENT; + ENTRY; + + spin_lock(&imp->imp_lock); + if (list_empty(&imp->imp_conn_list)) { + LASSERT(!imp->imp_conn_current); + LASSERT(!imp->imp_connection); + GOTO(out, rc); + } + + list_for_each_entry(imp_conn, &imp->imp_conn_list, oic_item) { + if (!obd_uuid_equals(uuid, &imp_conn->oic_uuid)) + continue; + LASSERT(imp_conn->oic_conn); + + /* is current conn? */ + if (imp_conn == imp->imp_conn_current) { + LASSERT(imp_conn->oic_conn == imp->imp_connection); + + if (imp->imp_state != LUSTRE_IMP_CLOSED && + imp->imp_state != LUSTRE_IMP_DISCON) { + CERROR("can't remove current connection\n"); + GOTO(out, rc = -EBUSY); + } + + ptlrpc_put_connection(imp->imp_connection); + imp->imp_connection = NULL; + + dlmexp = class_conn2export(&imp->imp_dlm_handle); + if (dlmexp && dlmexp->exp_connection) { + LASSERT(dlmexp->exp_connection == + imp_conn->oic_conn); + ptlrpc_put_connection(dlmexp->exp_connection); + dlmexp->exp_connection = NULL; + } + } + + list_del(&imp_conn->oic_item); + ptlrpc_put_connection(imp_conn->oic_conn); + OBD_FREE(imp_conn, sizeof(*imp_conn)); + CDEBUG(D_HA, "imp %p@%s: remove connection %s\n", + imp, imp->imp_obd->obd_name, uuid->uuid); + rc = 0; + break; + } +out: + spin_unlock(&imp->imp_lock); + if (rc == -ENOENT) + CERROR("connection %s not found\n", uuid->uuid); + RETURN(rc); +} int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) { - struct ptlrpc_connection *conn; struct lustre_cfg* lcfg = buf; struct client_obd *cli = &obddev->u.cli; struct obd_import *imp; @@ -46,7 +178,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) int rq_portal, rp_portal, connect_op; char *name = obddev->obd_type->typ_name; char *mgmt_name = NULL; - int rc = 0; + int rc; struct obd_device *mgmt_obd; mgmtcli_register_for_events_t register_f; ENTRY; @@ -93,43 +225,48 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) sema_init(&cli->cl_sem, 1); cli->cl_conn_count = 0; - memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, min(lcfg->lcfg_inllen2, - sizeof(server_uuid))); + memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, + min_t(unsigned int, lcfg->lcfg_inllen2, sizeof(server_uuid))); cli->cl_dirty = 0; cli->cl_avail_grant = 0; + /* FIXME: should limit this for the sum of all cl_dirty_max */ cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024; + if (cli->cl_dirty_max >> PAGE_SHIFT > num_physpages / 8) + cli->cl_dirty_max = num_physpages << (PAGE_SHIFT - 3); INIT_LIST_HEAD(&cli->cl_cache_waiters); INIT_LIST_HEAD(&cli->cl_loi_ready_list); INIT_LIST_HEAD(&cli->cl_loi_write_list); + INIT_LIST_HEAD(&cli->cl_loi_read_list); spin_lock_init(&cli->cl_loi_list_lock); - cli->cl_brw_in_flight = 0; + cli->cl_r_in_flight = 0; + cli->cl_w_in_flight = 0; spin_lock_init(&cli->cl_read_rpc_hist.oh_lock); spin_lock_init(&cli->cl_write_rpc_hist.oh_lock); spin_lock_init(&cli->cl_read_page_hist.oh_lock); spin_lock_init(&cli->cl_write_page_hist.oh_lock); - cli->cl_max_pages_per_rpc = PTL_MD_MAX_PAGES; - cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT; + + if (num_physpages <= 32768) { /* <= 128 MB */ + cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 3; + cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 3; + } else { + cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES; + cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT; + } + - ldlm_get_ref(); + rc = ldlm_get_ref(); if (rc) { CERROR("ldlm_get_ref failed: %d\n", rc); GOTO(err, rc); } - conn = ptlrpc_uuid_to_connection(&server_uuid); - if (conn == NULL) - GOTO(err_ldlm, rc = -ENOENT); - ptlrpc_init_client(rq_portal, rp_portal, name, &obddev->obd_ldlm_client); imp = class_new_import(); - if (imp == NULL) { - ptlrpc_put_connection(conn); + if (imp == NULL) GOTO(err_ldlm, rc = -ENOENT); - } - imp->imp_connection = conn; imp->imp_client = &obddev->obd_ldlm_client; imp->imp_obd = obddev; imp->imp_connect_op = connect_op; @@ -140,6 +277,12 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) lcfg->lcfg_inllen1); class_import_put(imp); + rc = client_import_add_conn(imp, &server_uuid, 1); + if (rc) { + CERROR("can't add initial connection\n"); + GOTO(err_import, rc); + } + cli->cl_import = imp; cli->cl_max_mds_easize = sizeof(struct lov_mds_md); cli->cl_max_mds_cookiesize = sizeof(struct llog_cookie); @@ -218,7 +361,8 @@ int client_obd_cleanup(struct obd_device *obddev, int flags) int client_connect_import(struct lustre_handle *dlm_handle, struct obd_device *obd, - struct obd_uuid *cluuid) + struct obd_uuid *cluuid, + unsigned long connect_flags) { struct client_obd *cli = &obd->u.cli; struct obd_import *imp = cli->cl_import; @@ -248,13 +392,14 @@ int client_connect_import(struct lustre_handle *dlm_handle, if (rc != 0) GOTO(out_ldlm, rc); - exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection); + imp->imp_connect_flags = connect_flags; rc = ptlrpc_connect_import(imp, NULL); if (rc != 0) { LASSERT (imp->imp_state == LUSTRE_IMP_DISCON); GOTO(out_ldlm, rc); } - + LASSERT(exp->exp_connection); + ptlrpc_pinger_add_import(imp); EXIT; if (rc) { @@ -312,7 +457,7 @@ int client_disconnect_export(struct obd_export *exp, int failover) /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */ if (obd->obd_no_recov) - ptlrpc_set_import_active(imp, 0); + ptlrpc_invalidate_import(imp, 0); else rc = ptlrpc_disconnect_import(imp); @@ -331,9 +476,9 @@ int client_disconnect_export(struct obd_export *exp, int failover) * -------------------------------------------------------------------------- */ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, - struct obd_uuid *cluuid) + struct obd_uuid *cluuid, int initial_conn) { - if (exp->exp_connection) { + if (exp->exp_connection && !initial_conn) { struct lustre_handle *hdl; hdl = &exp->exp_imp_reverse->imp_remote_handle; /* Might be a re-connect after a partition. */ @@ -359,8 +504,10 @@ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, RETURN(0); } -int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) +static char nidstr[PTL_NALFMT_SIZE]; +int target_handle_connect(struct ptlrpc_request *req) { + unsigned long connect_flags = 0, *cfp; struct obd_device *target; struct obd_export *export = NULL; struct obd_import *revimp; @@ -370,9 +517,14 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) struct obd_uuid remote_uuid; struct list_head *p; char *str, *tmp; - int rc = 0, abort_recovery; + int rc = 0; + unsigned long flags; + int initial_conn = 0; + char peer_str[PTL_NALFMT_SIZE]; ENTRY; + OBD_RACE(OBD_FAIL_TGT_CONN_RACE); + LASSERT_REQSWAB (req, 0); str = lustre_msg_string(req->rq_reqmsg, 0, sizeof(tgtuuid) - 1); if (str == NULL) { @@ -385,9 +537,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) if (!target) { target = class_name2obd(str); } - + if (!target || target->obd_stopping || !target->obd_set_up) { - CERROR("UUID '%s' is not available for connect\n", str); + CERROR("UUID '%s' is not available for connect from NID %s\n", + str, ptlrpc_peernid2str(&req->rq_peer, nidstr)); GOTO(out, rc = -ENODEV); } @@ -401,14 +554,19 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) obd_str2uuid (&cluuid, str); /* XXX extract a nettype and format accordingly */ - snprintf(remote_uuid.uuid, sizeof remote_uuid, - "NET_"LPX64"_UUID", req->rq_peer.peer_nid); - - spin_lock_bh(&target->obd_processing_task_lock); - abort_recovery = target->obd_abort_recovery; - spin_unlock_bh(&target->obd_processing_task_lock); - if (abort_recovery) - target_abort_recovery(target); + switch (sizeof(ptl_nid_t)) { + /* NB the casts only avoid compiler warnings */ + case 8: + snprintf(remote_uuid.uuid, sizeof remote_uuid, + "NET_"LPX64"_UUID", (__u64)req->rq_peer.peer_id.nid); + break; + case 4: + snprintf(remote_uuid.uuid, sizeof remote_uuid, + "NET_%x_UUID", (__u32)req->rq_peer.peer_id.nid); + break; + default: + LBUG(); + } tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn); if (tmp == NULL) @@ -416,10 +574,17 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) memcpy(&conn, tmp, sizeof conn); + cfp = lustre_msg_buf(req->rq_reqmsg, 3, sizeof(unsigned long)); + LASSERT(cfp != NULL); + connect_flags = *cfp; + rc = lustre_pack_reply(req, 0, NULL, NULL); if (rc) GOTO(out, rc); - + + if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_INITIAL) + initial_conn = 1; + /* lctl gets a backstage, all-access pass. */ if (obd_uuid_equals(&cluuid, &target->obd_uuid)) goto dont_check_exports; @@ -431,7 +596,8 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) spin_unlock(&target->obd_dev_lock); LASSERT(export->exp_obd == target); - rc = target_handle_reconnect(&conn, export, &cluuid); + rc = target_handle_reconnect(&conn, export, &cluuid, + initial_conn); break; } export = NULL; @@ -439,7 +605,22 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) /* If we found an export, we already unlocked. */ if (!export) { spin_unlock(&target->obd_dev_lock); - } else if (req->rq_reqmsg->conn_cnt == 1) { + } else if (req->rq_export == NULL && + atomic_read(&export->exp_rpc_count) > 0) { + CWARN("%s: refuse connection from %s/%s to 0x%p/%d\n", + target->obd_name, cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), + export, atomic_read(&export->exp_refcount)); + GOTO(out, rc = -EBUSY); + } else if (req->rq_export != NULL && + atomic_read(&export->exp_rpc_count) > 1) { + CWARN("%s: refuse reconnection from %s@%s to 0x%p/%d\n", + target->obd_name, cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), + export, atomic_read(&export->exp_rpc_count)); + GOTO(out, rc = -EBUSY); + } + else if (req->rq_reqmsg->conn_cnt == 1 && !initial_conn) { CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n", cluuid.uuid); GOTO(out, rc = -EALREADY); @@ -447,28 +628,35 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) /* Tell the client if we're in recovery. */ /* If this is the first client, start the recovery timer */ + CWARN("%s: connection from %s@%s/%lu %s\n", target->obd_name, cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), *cfp, + target->obd_recovering ? "(recovering)" : ""); if (target->obd_recovering) { lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING); - target_start_recovery_timer(target, handler); + target_start_recovery_timer(target); } - +#if 0 /* Tell the client if we support replayable requests */ if (target->obd_replayable) lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE); - +#endif if (export == NULL) { if (target->obd_recovering) { - CERROR("denying connection for new client %s: " - "%d clients in recovery for %lds\n", cluuid.uuid, + CERROR("%s denying connection for new client %s@%s: " + "%d clients in recovery for %lds\n", target->obd_name, + cluuid.uuid, + ptlrpc_peernid2str(&req->rq_peer, peer_str), target->obd_recoverable_clients, (target->obd_recovery_timer.expires-jiffies)/HZ); rc = -EBUSY; } else { dont_check_exports: - rc = obd_connect(&conn, target, &cluuid); + rc = obd_connect(&conn, target, &cluuid, connect_flags); } } - + /* Tell the client if we support replayable requests */ + if (target->obd_replayable) + lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE); /* If all else goes well, this is our RPC return code. */ req->rq_status = 0; @@ -476,11 +664,6 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) if (rc && rc != EALREADY) GOTO(out, rc); - /* XXX track this all the time? */ - if (target->obd_recovering) { - target->obd_connected_clients++; - } - req->rq_repmsg->handle = conn; /* If the client and the server are the same node, we will already @@ -497,20 +680,39 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) export = req->rq_export = class_conn2export(&conn); LASSERT(export != NULL); + spin_lock_irqsave(&export->exp_lock, flags); + if (initial_conn) { + req->rq_repmsg->conn_cnt = export->exp_conn_cnt + 1; + } else if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) { + CERROR("%s@%s: already connected at a higher conn_cnt: %d > %d\n", + cluuid.uuid, ptlrpc_peernid2str(&req->rq_peer, peer_str), + export->exp_conn_cnt, + req->rq_reqmsg->conn_cnt); + spin_unlock_irqrestore(&export->exp_lock, flags); + GOTO(out, rc = -EALREADY); + } + export->exp_conn_cnt = req->rq_reqmsg->conn_cnt; + spin_unlock_irqrestore(&export->exp_lock, flags); + + /* request from liblustre? */ + if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT) + export->exp_libclient = 1; + if (export->exp_connection != NULL) ptlrpc_put_connection(export->exp_connection); export->exp_connection = ptlrpc_get_connection(&req->rq_peer, &remote_uuid); - LASSERT(export->exp_conn_cnt < req->rq_reqmsg->conn_cnt); - export->exp_conn_cnt = req->rq_reqmsg->conn_cnt; - if (rc == EALREADY) { /* We indicate the reconnection in a flag, not an error code. */ lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT); GOTO(out, rc = 0); } + if (target->obd_recovering) { + target->obd_connected_clients++; + } + memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn), sizeof conn); @@ -524,6 +726,8 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) revimp->imp_dlm_fake = 1; revimp->imp_state = LUSTRE_IMP_FULL; class_import_put(revimp); + + rc = obd_connect_post(export); out: if (rc) req->rq_status = rc; @@ -563,21 +767,80 @@ void target_destroy_export(struct obd_export *exp) * Recovery functions */ -static void abort_delayed_replies(struct obd_device *obd) +struct ptlrpc_request * +ptlrpc_clone_req( struct ptlrpc_request *orig_req) +{ + struct ptlrpc_request *copy_req; + struct lustre_msg *copy_reqmsg; + + OBD_ALLOC(copy_req, sizeof *copy_req); + if (!copy_req) + return NULL; + OBD_ALLOC(copy_reqmsg, orig_req->rq_reqlen); + if (!copy_reqmsg){ + OBD_FREE(copy_req, sizeof *copy_req); + return NULL; + } + + memcpy(copy_req, orig_req, sizeof *copy_req); + memcpy(copy_reqmsg, orig_req->rq_reqmsg, orig_req->rq_reqlen); + /* the copied req takes over the reply state */ + orig_req->rq_reply_state = NULL; + + copy_req->rq_reqmsg = copy_reqmsg; + class_export_get(copy_req->rq_export); + INIT_LIST_HEAD(©_req->rq_list); + + return copy_req; +} +void ptlrpc_free_clone( struct ptlrpc_request *req) +{ + class_export_put(req->rq_export); + list_del(&req->rq_list); + OBD_FREE(req->rq_reqmsg, req->rq_reqlen); + OBD_FREE(req, sizeof *req); +} + + + +static void target_release_saved_req(struct ptlrpc_request *req) +{ + class_export_put(req->rq_export); + OBD_FREE(req->rq_reqmsg, req->rq_reqlen); + OBD_FREE(req, sizeof *req); +} + +static void target_finish_recovery(struct obd_device *obd) { - struct ptlrpc_request *req; struct list_head *tmp, *n; + int rc; + + CWARN("%s: sending delayed replies to recovered clients\n", + obd->obd_name); + + ldlm_reprocess_all_ns(obd->obd_namespace); + + /* when recovery finished, cleanup orphans on mds and ost */ + if (OBT(obd) && OBP(obd, postrecov)) { + rc = OBP(obd, postrecov)(obd); + if (rc >= 0) + CWARN("%s: all clients recovered, %d MDS " + "orphans deleted\n", obd->obd_name, rc); + else + CERROR("postrecov failed %d\n", rc); + } + + list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { + struct ptlrpc_request *req; req = list_entry(tmp, struct ptlrpc_request, rq_list); - DEBUG_REQ(D_ERROR, req, "aborted:"); - req->rq_status = -ENOTCONN; - req->rq_type = PTL_RPC_MSG_ERR; - ptlrpc_reply(req); - class_export_put(req->rq_export); list_del(&req->rq_list); - OBD_FREE(req->rq_reqmsg, req->rq_reqlen); - OBD_FREE(req, sizeof *req); + DEBUG_REQ(D_ERROR, req, "delayed:"); + ptlrpc_reply(req); + target_release_saved_req(req); } + obd->obd_recovery_end = LTIME_S(CURRENT_TIME); + return; } static void abort_recovery_queue(struct obd_device *obd) @@ -588,6 +851,7 @@ static void abort_recovery_queue(struct obd_device *obd) list_for_each_safe(tmp, n, &obd->obd_recovery_queue) { req = list_entry(tmp, struct ptlrpc_request, rq_list); + list_del(&req->rq_list); DEBUG_REQ(D_ERROR, req, "aborted:"); req->rq_status = -ENOTCONN; req->rq_type = PTL_RPC_MSG_ERR; @@ -598,45 +862,61 @@ static void abort_recovery_queue(struct obd_device *obd) DEBUG_REQ(D_ERROR, req, "packing failed for abort-reply; skipping"); } - list_del(&req->rq_list); - class_export_put(req->rq_export); - OBD_FREE(req->rq_reqmsg, req->rq_reqlen); - OBD_FREE(req, sizeof *req); + target_release_saved_req(req); } } - -void target_abort_recovery(void *data) +/* Called from a cleanup function if the device is being cleaned up + forcefully. The exports should all have been disconnected already, + the only thing left to do is + - clear the recovery flags + - cancel the timer + - free queued requests and replies, but don't send replies + Because the obd_stopping flag is set, no new requests should be received. + +*/ +void target_cleanup_recovery(struct obd_device *obd) { - struct obd_device *obd = data; - int rc; + struct list_head *tmp, *n; + struct ptlrpc_request *req; - CERROR("disconnecting clients and aborting recovery\n"); spin_lock_bh(&obd->obd_processing_task_lock); if (!obd->obd_recovering) { spin_unlock_bh(&obd->obd_processing_task_lock); EXIT; return; } - obd->obd_recovering = obd->obd_abort_recovery = 0; - - wake_up(&obd->obd_next_transno_waitq); target_cancel_recovery_timer(obd); spin_unlock_bh(&obd->obd_processing_task_lock); - class_disconnect_exports(obd, 0); - - /* when recovery was aborted, cleanup orphans on mds and ost */ - if (OBT(obd) && OBP(obd, postrecov)) { - rc = OBP(obd, postrecov)(obd); - if (rc >= 0) - CWARN("Cleanup %d orphans after recovery was aborted\n", rc); - else - CERROR("postrecov failed %d\n", rc); + list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { + req = list_entry(tmp, struct ptlrpc_request, rq_list); + list_del(&req->rq_list); + LASSERT (req->rq_reply_state); + lustre_free_reply_state(req->rq_reply_state); + target_release_saved_req(req); } + list_for_each_safe(tmp, n, &obd->obd_recovery_queue) { + req = list_entry(tmp, struct ptlrpc_request, rq_list); + list_del(&req->rq_list); + LASSERT (req->rq_reply_state == 0); + target_release_saved_req(req); + } +} + +static void target_abort_recovery(void *data) +{ + struct obd_device *obd = data; + + LASSERT(!obd->obd_recovering); + + class_disconnect_stale_exports(obd, 0); + + CERROR("%s: recovery period over; disconnecting unfinished clients.\n", + obd->obd_name); - abort_delayed_replies(obd); abort_recovery_queue(obd); + target_finish_recovery(obd); ptlrpc_run_recovery_over_upcall(obd); } @@ -645,7 +925,9 @@ static void target_recovery_expired(unsigned long castmeharder) struct obd_device *obd = (struct obd_device *)castmeharder; CERROR("recovery timed out, aborting\n"); spin_lock_bh(&obd->obd_processing_task_lock); - obd->obd_abort_recovery = 1; + if (obd->obd_recovering) + obd->obd_abort_recovery = 1; + wake_up(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); } @@ -673,50 +955,56 @@ static void reset_recovery_timer(struct obd_device *obd) /* Only start it the first time called */ -void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler) +void target_start_recovery_timer(struct obd_device *obd) { spin_lock_bh(&obd->obd_processing_task_lock); - if (obd->obd_recovery_handler) { + if (!obd->obd_recovering || timer_pending(&obd->obd_recovery_timer)) { spin_unlock_bh(&obd->obd_processing_task_lock); return; } CWARN("%s: starting recovery timer (%us)\n", obd->obd_name, OBD_RECOVERY_TIMEOUT / HZ); - obd->obd_recovery_handler = handler; obd->obd_recovery_timer.function = target_recovery_expired; obd->obd_recovery_timer.data = (unsigned long)obd; - init_timer(&obd->obd_recovery_timer); + mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT); spin_unlock_bh(&obd->obd_processing_task_lock); - - reset_recovery_timer(obd); } static int check_for_next_transno(struct obd_device *obd) { - struct ptlrpc_request *req; + struct ptlrpc_request *req = NULL; int wake_up = 0, connected, completed, queue_len, max; __u64 next_transno, req_transno; spin_lock_bh(&obd->obd_processing_task_lock); - req = list_entry(obd->obd_recovery_queue.next, - struct ptlrpc_request, rq_list); + if (!list_empty(&obd->obd_recovery_queue)) { + req = list_entry(obd->obd_recovery_queue.next, + struct ptlrpc_request, rq_list); + req_transno = req->rq_reqmsg->transno; + } else { + req_transno = 0; + } + max = obd->obd_max_recoverable_clients; - req_transno = req->rq_reqmsg->transno; connected = obd->obd_connected_clients; completed = max - obd->obd_recoverable_clients; queue_len = obd->obd_requests_queued_for_recovery; next_transno = obd->obd_next_recovery_transno; + CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, " + "req_transno: "LPU64", next_transno: "LPU64"\n", + max, connected, completed, queue_len, req_transno, next_transno); if (obd->obd_abort_recovery) { CDEBUG(D_HA, "waking for aborted recovery\n"); wake_up = 1; - } else if (!obd->obd_recovering) { - CDEBUG(D_HA, "waking for completed recovery (?)\n"); + } else if (max == completed) { + CDEBUG(D_HA, "waking for completed recovery\n"); wake_up = 1; } else if (req_transno == next_transno) { CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno); wake_up = 1; } else if (queue_len + completed == max) { + LASSERT(req->rq_reqmsg->transno >= next_transno); CDEBUG(D_ERROR, "waking for skipped transno (skip: "LPD64 ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n", @@ -725,73 +1013,137 @@ static int check_for_next_transno(struct obd_device *obd) wake_up = 1; } spin_unlock_bh(&obd->obd_processing_task_lock); - LASSERT(req->rq_reqmsg->transno >= next_transno); + return wake_up; } -static void process_recovery_queue(struct obd_device *obd) +static struct ptlrpc_request * +target_next_replay_req(struct obd_device *obd) { - struct ptlrpc_request *req; - int abort_recovery = 0; struct l_wait_info lwi = { 0 }; - ENTRY; + struct ptlrpc_request *req; - for (;;) { - spin_lock_bh(&obd->obd_processing_task_lock); - LASSERT(obd->obd_processing_task == current->pid); + CDEBUG(D_HA, "Waiting for transno "LPD64"\n", + obd->obd_next_recovery_transno); + l_wait_event(obd->obd_next_transno_waitq, + check_for_next_transno(obd), &lwi); + + spin_lock_bh(&obd->obd_processing_task_lock); + if (obd->obd_abort_recovery) { + req = NULL; + } else if (!list_empty(&obd->obd_recovery_queue)) { req = list_entry(obd->obd_recovery_queue.next, struct ptlrpc_request, rq_list); + list_del_init(&req->rq_list); + obd->obd_requests_queued_for_recovery--; + } else { + req = NULL; + } + spin_unlock_bh(&obd->obd_processing_task_lock); + return req; +} - if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) { - spin_unlock_bh(&obd->obd_processing_task_lock); - CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is " - LPD64")\n", - obd->obd_next_recovery_transno, - req->rq_reqmsg->transno); - l_wait_event(obd->obd_next_transno_waitq, - check_for_next_transno(obd), &lwi); +#ifdef __KERNEL__ +static int target_recovery_thread(void *arg) +{ + struct obd_device *obd = arg; + struct ptlrpc_request *req; + struct target_recovery_data *trd = &obd->obd_recovery_data; + unsigned long flags; + ENTRY; + + kportal_daemonize("tgt-recov"); + + SIGNAL_MASK_LOCK(current, flags); + sigfillset(¤t->blocked); + RECALC_SIGPENDING; + SIGNAL_MASK_UNLOCK(current, flags); + + CERROR("%s: started recovery thread pid %d\n", obd->obd_name, + current->pid); + trd->trd_processing_task = current->pid; + + obd->obd_recovering = 1; + complete(&trd->trd_starting); + + while (obd->obd_recovering) { + LASSERT(trd->trd_processing_task == current->pid); + req = target_next_replay_req(obd); + if (req != NULL) { + char peer_str[PTL_NALFMT_SIZE]; + DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s: ", + req->rq_reqmsg->transno, + ptlrpc_peernid2str(&req->rq_peer, peer_str)); + (void)trd->trd_recovery_handler(req); + obd->obd_replayed_requests++; + reset_recovery_timer(obd); + /* bug 1580: decide how to properly sync() in recovery*/ + //mds_fsync_super(mds->mds_sb); + ptlrpc_free_clone(req); spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; + obd->obd_next_recovery_transno++; spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) { - target_abort_recovery(obd); - return; + } else { + /* recovery is over */ + spin_lock_bh(&obd->obd_processing_task_lock); + obd->obd_recovering = 0; + target_cancel_recovery_timer(obd); + if (obd->obd_abort_recovery) { + obd->obd_abort_recovery = 0; + spin_unlock_bh(&obd->obd_processing_task_lock); + target_abort_recovery(obd); + } else { + LASSERT(obd->obd_recoverable_clients == 0); + spin_unlock_bh(&obd->obd_processing_task_lock); + target_finish_recovery(obd); } - continue; } - list_del_init(&req->rq_list); - obd->obd_requests_queued_for_recovery--; - spin_unlock_bh(&obd->obd_processing_task_lock); + } - DEBUG_REQ(D_HA, req, "processing: "); - (void)obd->obd_recovery_handler(req); - obd->obd_replayed_requests++; - reset_recovery_timer(obd); - /* bug 1580: decide how to properly sync() in recovery */ - //mds_fsync_super(mds->mds_sb); - class_export_put(req->rq_export); - OBD_FREE(req->rq_reqmsg, req->rq_reqlen); - OBD_FREE(req, sizeof *req); - spin_lock_bh(&obd->obd_processing_task_lock); - obd->obd_next_recovery_transno++; - if (list_empty(&obd->obd_recovery_queue)) { - obd->obd_processing_task = 0; - spin_unlock_bh(&obd->obd_processing_task_lock); - break; - } + trd->trd_processing_task = 0; + complete(&trd->trd_finishing); + return 0; +} + +int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler) +{ + int rc = 0; + struct target_recovery_data *trd = &obd->obd_recovery_data; + + memset(trd, 0, sizeof(*trd)); + init_completion(&trd->trd_starting); + init_completion(&trd->trd_finishing); + trd->trd_recovery_handler = handler; + + if (kernel_thread(target_recovery_thread, obd, 0) == 0) + wait_for_completion(&trd->trd_starting); + else + rc = -ECHILD; + + return rc; +} + +void target_stop_recovery_thread(struct obd_device *obd) +{ + spin_lock_bh(&obd->obd_processing_task_lock); + if (obd->obd_recovery_data.trd_processing_task > 0) { + struct target_recovery_data *trd = &obd->obd_recovery_data; + CERROR("%s: aborting recovery\n", obd->obd_name); + obd->obd_abort_recovery = 1; + wake_up(&obd->obd_next_transno_waitq); + spin_unlock_bh(&obd->obd_processing_task_lock); + wait_for_completion(&trd->trd_finishing); + } else { spin_unlock_bh(&obd->obd_processing_task_lock); } - EXIT; } - +#endif int target_queue_recovery_request(struct ptlrpc_request *req, struct obd_device *obd) { struct list_head *tmp; int inserted = 0; __u64 transno = req->rq_reqmsg->transno; - struct ptlrpc_request *saved_req; - struct lustre_msg *reqmsg; /* CAVEAT EMPTOR: The incoming request message has been swabbed * (i.e. buflens etc are in my own byte order), but type-dependent @@ -803,16 +1155,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req, return 1; } - /* XXX If I were a real man, these LBUGs would be sane cleanups. */ - /* XXX just like the request-dup code in queue_final_reply */ - OBD_ALLOC(saved_req, sizeof *saved_req); - if (!saved_req) - LBUG(); - OBD_ALLOC(reqmsg, req->rq_reqlen); - if (!reqmsg) - LBUG(); - - spin_lock_bh(&obd->obd_processing_task_lock); /* If we're processing the queue, we want don't want to queue this * message. @@ -820,23 +1162,34 @@ int target_queue_recovery_request(struct ptlrpc_request *req, * Also, if this request has a transno less than the one we're waiting * for, we should process it now. It could (and currently always will) * be an open request for a descriptor that was opened some time ago. + * + * Also, a resent, replayed request that has already been + * handled will pass through here and be processed immediately. */ - if (obd->obd_processing_task == current->pid || + spin_lock_bh(&obd->obd_processing_task_lock); + if (obd->obd_recovery_data.trd_processing_task == current->pid || transno < obd->obd_next_recovery_transno) { /* Processing the queue right now, don't re-add. */ + lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); LASSERT(list_empty(&req->rq_list)); spin_unlock_bh(&obd->obd_processing_task_lock); - OBD_FREE(reqmsg, req->rq_reqlen); - OBD_FREE(saved_req, sizeof *saved_req); return 1; } + spin_unlock_bh(&obd->obd_processing_task_lock); - memcpy(saved_req, req, sizeof *req); - memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); - req = saved_req; - req->rq_reqmsg = reqmsg; - class_export_get(req->rq_export); - INIT_LIST_HEAD(&req->rq_list); + /* A resent, replayed request that is still on the queue; just drop it. + The queued request will handle this. */ + if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) + == (MSG_RESENT | MSG_REPLAY)) { + DEBUG_REQ(D_ERROR, req, "dropping resent queued req"); + return 0; + } + + req = ptlrpc_clone_req(req); + if (req == NULL) + return -ENOMEM; + + spin_lock_bh(&obd->obd_processing_task_lock); /* XXX O(n^2) */ list_for_each(tmp, &obd->obd_recovery_queue) { @@ -855,23 +1208,9 @@ int target_queue_recovery_request(struct ptlrpc_request *req, } obd->obd_requests_queued_for_recovery++; - - if (obd->obd_processing_task != 0) { - /* Someone else is processing this queue, we'll leave it to - * them. - */ - wake_up(&obd->obd_next_transno_waitq); - spin_unlock_bh(&obd->obd_processing_task_lock); - return 0; - } - - /* Nobody is processing, and we know there's (at least) one to process - * now, so we'll do the honours. - */ - obd->obd_processing_task = current->pid; + wake_up(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); - process_recovery_queue(obd); return 0; } @@ -883,10 +1222,8 @@ struct obd_device * target_req2obd(struct ptlrpc_request *req) int target_queue_final_reply(struct ptlrpc_request *req, int rc) { struct obd_device *obd = target_req2obd(req); - struct ptlrpc_request *saved_req; - struct lustre_msg *reqmsg; - int recovery_done = 0; - int rc2; + + LASSERT ((rc == 0) == (req->rq_reply_state != NULL)); if (rc) { /* Just like ptlrpc_error, but without the sending. */ @@ -895,239 +1232,146 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) req->rq_type = PTL_RPC_MSG_ERR; } + LASSERT (!req->rq_reply_state->rs_difficult); LASSERT(list_empty(&req->rq_list)); - /* XXX a bit like the request-dup code in queue_recovery_request */ - OBD_ALLOC(saved_req, sizeof *saved_req); - if (!saved_req) - LBUG(); - OBD_ALLOC(reqmsg, req->rq_reqlen); - if (!reqmsg) - LBUG(); - memcpy(saved_req, req, sizeof *saved_req); - memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); - req = saved_req; - req->rq_reqmsg = reqmsg; - class_export_get(req->rq_export); - list_add(&req->rq_list, &obd->obd_delayed_reply_queue); + + req = ptlrpc_clone_req(req); spin_lock_bh(&obd->obd_processing_task_lock); - --obd->obd_recoverable_clients; - recovery_done = (obd->obd_recoverable_clients == 0); - spin_unlock_bh(&obd->obd_processing_task_lock); - - if (recovery_done) { - struct list_head *tmp, *n; - ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace); - CWARN("%s: all clients recovered, sending delayed replies\n", - obd->obd_name); - spin_lock_bh(&obd->obd_processing_task_lock); - obd->obd_recovering = 0; - target_cancel_recovery_timer(obd); - spin_unlock_bh(&obd->obd_processing_task_lock); - /* when recovery finished, cleanup orphans on mds and ost */ - if (OBT(obd) && OBP(obd, postrecov)) { - rc2 = OBP(obd, postrecov)(obd); - if (rc2 >= 0) - CWARN("%s: all clients recovered, %d MDS " - "orphans deleted\n", obd->obd_name, rc2); - else - CERROR("postrecov failed %d\n", rc2); - } + list_add(&req->rq_list, &obd->obd_delayed_reply_queue); - list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { - req = list_entry(tmp, struct ptlrpc_request, rq_list); - DEBUG_REQ(D_ERROR, req, "delayed:"); - ptlrpc_reply(req); - class_export_put(req->rq_export); - list_del(&req->rq_list); - OBD_FREE(req->rq_reqmsg, req->rq_reqlen); - OBD_FREE(req, sizeof *req); - } - ptlrpc_run_recovery_over_upcall(obd); - } else { + /* only count the first "replay over" request from each + export */ + if (req->rq_export->exp_replay_needed) { + --obd->obd_recoverable_clients; + req->rq_export->exp_replay_needed = 0; CWARN("%s: %d recoverable clients remain\n", - obd->obd_name, obd->obd_recoverable_clients); - wake_up(&obd->obd_next_transno_waitq); + obd->obd_name, obd->obd_recoverable_clients); } - + wake_up(&obd->obd_next_transno_waitq); + spin_unlock_bh(&obd->obd_processing_task_lock); return 1; } -static void ptlrpc_abort_reply (struct ptlrpc_request *req) -{ - /* On return, we must be sure that the ACK callback has either - * happened or will not happen. Note that the SENT callback will - * happen come what may since we successfully posted the PUT. */ - int rc; - struct l_wait_info lwi; - unsigned long flags; - - again: - /* serialise with ACK callback */ - spin_lock_irqsave (&req->rq_lock, flags); - if (!req->rq_want_ack) { - spin_unlock_irqrestore (&req->rq_lock, flags); - /* The ACK callback has happened already. Although the - * SENT callback might still be outstanding (yes really) we - * don't care; this is just like normal completion. */ - return; - } - spin_unlock_irqrestore (&req->rq_lock, flags); - - /* Have a bash at unlinking the MD. This will fail until the SENT - * callback has happened since the MD is busy from the PUT. If the - * ACK still hasn't arrived after then, a successful unlink will - * ensure the ACK callback never happens. */ - rc = PtlMDUnlink (req->rq_reply_md_h); - switch (rc) { - default: - LBUG (); - case PTL_OK: - /* SENT callback happened; ACK callback preempted */ - LASSERT (req->rq_want_ack); - spin_lock_irqsave (&req->rq_lock, flags); - req->rq_want_ack = 0; - spin_unlock_irqrestore (&req->rq_lock, flags); - return; - case PTL_INV_MD: - return; - case PTL_MD_INUSE: - /* Still sending or ACK callback in progress: wait until - * either callback has completed and try again. - * Actually we can't wait for the SENT callback because - * there's no state the SENT callback can touch that will - * allow it to communicate with us! So we just wait here - * for a short time, effectively polling for the SENT - * callback by calling PtlMDUnlink() again, to see if it - * has finished. Note that if the ACK does arrive, its - * callback wakes us in short order. --eeb */ - lwi = LWI_TIMEOUT (HZ/4, NULL, NULL); - rc = l_wait_event(req->rq_reply_waitq, !req->rq_want_ack, - &lwi); - CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc); - /* NB go back and test rq_want_ack with locking, to ensure - * if ACK callback happened, it has completed stopped - * referencing this req. */ - goto again; - } -} - -void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) +int +target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id) { - int i; - int netrc; - unsigned long flags; - struct ptlrpc_req_ack_lock *ack_lock; - struct l_wait_info lwi = { 0 }; - wait_queue_t commit_wait; - struct obd_device *obd = - req->rq_export ? req->rq_export->exp_obd : NULL; - struct obd_export *exp = NULL; - - if (req->rq_export) { - for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) { - if (req->rq_ack_locks[i].mode) { - exp = req->rq_export; - break; + if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) { + obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED; + DEBUG_REQ(D_ERROR, req, "dropping reply"); + /* NB this does _not_ send with ACK disabled, to simulate + * sending OK, but timing out for the ACK */ + if (req->rq_reply_state != NULL) { + if (!req->rq_reply_state->rs_difficult) { + lustre_free_reply_state (req->rq_reply_state); + req->rq_reply_state = NULL; + } else { + struct ptlrpc_service *svc = + req->rq_rqbd->rqbd_srv_ni->sni_service; + atomic_inc(&svc->srv_outstanding_replies); } } + return (-ECOMM); } - if (exp) { - exp->exp_outstanding_reply = req; - spin_lock_irqsave (&req->rq_lock, flags); - req->rq_want_ack = 1; - spin_unlock_irqrestore (&req->rq_lock, flags); - } - - if (!OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) { - if (rc == 0) { - DEBUG_REQ(D_NET, req, "sending reply"); - netrc = ptlrpc_reply(req); - } else if (rc == -ENOTCONN) { - DEBUG_REQ(D_HA, req, "processing error (%d)", rc); - netrc = ptlrpc_error(req); - } else { - DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc); - netrc = ptlrpc_error(req); - } + if (rc) { + req->rq_status = rc; + return (ptlrpc_error(req)); } else { - obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED; - DEBUG_REQ(D_ERROR, req, "dropping reply"); - if (req->rq_repmsg) { - OBD_FREE(req->rq_repmsg, req->rq_replen); - req->rq_repmsg = NULL; - } - init_waitqueue_head(&req->rq_reply_waitq); - netrc = 0; + DEBUG_REQ(D_NET, req, "sending reply"); } + + return (ptlrpc_send_reply(req, 1)); +} - /* a failed send simulates the callbacks */ - LASSERT(netrc == 0 || req->rq_want_ack == 0); - if (exp == NULL) { - LASSERT(req->rq_want_ack == 0); +void +target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) +{ + int netrc; + unsigned long flags; + struct ptlrpc_reply_state *rs; + struct obd_device *obd; + struct obd_export *exp; + struct ptlrpc_srv_ni *sni; + struct ptlrpc_service *svc; + + sni = req->rq_rqbd->rqbd_srv_ni; + svc = sni->sni_service; + + rs = req->rq_reply_state; + if (rs == NULL || !rs->rs_difficult) { + /* The easy case; no notifiers and reply_out_callback() + * cleans up (i.e. we can't look inside rs after a + * successful send) */ + netrc = target_send_reply_msg (req, rc, fail_id); + + LASSERT (netrc == 0 || req->rq_reply_state == NULL); return; } - LASSERT(obd != NULL); - - init_waitqueue_entry(&commit_wait, current); - add_wait_queue(&obd->obd_commit_waitq, &commit_wait); - rc = l_wait_event(req->rq_reply_waitq, - !req->rq_want_ack || req->rq_resent || - req->rq_transno <= obd->obd_last_committed, &lwi); - remove_wait_queue(&obd->obd_commit_waitq, &commit_wait); - - spin_lock_irqsave (&req->rq_lock, flags); - /* If we got here because the ACK callback ran, this acts as a - * barrier to ensure the callback completed the wakeup. */ - spin_unlock_irqrestore (&req->rq_lock, flags); - - /* If we committed the transno already, then we might wake up before - * the ack arrives. We need to stop waiting for the ack before we can - * reuse this request structure. We are guaranteed by this point that - * this cannot abort the sending of the actual reply.*/ - ptlrpc_abort_reply(req); - - if (req->rq_resent) { - DEBUG_REQ(D_HA, req, "resent: not cancelling locks"); - return; + + /* must be an export if locks saved */ + LASSERT (req->rq_export != NULL); + /* req/reply consistent */ + LASSERT (rs->rs_srv_ni == sni); + + /* "fresh" reply */ + LASSERT (!rs->rs_scheduled); + LASSERT (!rs->rs_scheduled_ever); + LASSERT (!rs->rs_handled); + LASSERT (!rs->rs_on_net); + LASSERT (rs->rs_export == NULL); + LASSERT (list_empty(&rs->rs_obd_list)); + LASSERT (list_empty(&rs->rs_exp_list)); + + exp = class_export_get (req->rq_export); + obd = exp->exp_obd; + + /* disable reply scheduling onto srv_reply_queue while I'm setting up */ + rs->rs_scheduled = 1; + rs->rs_on_net = 1; + rs->rs_xid = req->rq_xid; + rs->rs_transno = req->rq_transno; + rs->rs_export = exp; + + spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags); + + if (rs->rs_transno > obd->obd_last_committed) { + /* not committed already */ + list_add_tail (&rs->rs_obd_list, + &obd->obd_uncommitted_replies); } - LASSERT(rc == 0); - DEBUG_REQ(D_HA, req, "cancelling locks for %s", - req->rq_want_ack ? "commit" : "ack"); + spin_unlock (&obd->obd_uncommitted_replies_lock); + spin_lock (&exp->exp_lock); - exp->exp_outstanding_reply = NULL; + list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies); - for (ack_lock = req->rq_ack_locks, i = 0; - i < REQ_MAX_ACK_LOCKS; i++, ack_lock++) { - if (!ack_lock->mode) - continue; - ldlm_lock_decref(&ack_lock->lock, ack_lock->mode); + spin_unlock_irqrestore (&exp->exp_lock, flags); + + netrc = target_send_reply_msg (req, rc, fail_id); + + spin_lock_irqsave (&svc->srv_lock, flags); + + svc->srv_n_difficult_replies++; + + if (netrc != 0) /* error sending: reply is off the net */ + rs->rs_on_net = 0; + + if (!rs->rs_on_net || /* some notifier */ + list_empty(&rs->rs_exp_list) || /* completed already */ + list_empty(&rs->rs_obd_list)) { + list_add_tail (&rs->rs_list, &svc->srv_reply_queue); + wake_up (&svc->srv_waitq); + } else { + list_add (&rs->rs_list, &sni->sni_active_replies); + rs->rs_scheduled = 0; /* allow notifier to schedule */ } + + spin_unlock_irqrestore (&svc->srv_lock, flags); } int target_handle_ping(struct ptlrpc_request *req) { return lustre_pack_reply(req, 0, NULL, NULL); } - -void *ldlm_put_lock_into_req(struct ptlrpc_request *req, - struct lustre_handle *lock, int mode) -{ - int i; - - for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) { - if (req->rq_ack_locks[i].mode) - continue; - CDEBUG(D_HA, "saving lock "LPX64" in req %p ack_lock[%d]\n", - lock->cookie, req, i); - memcpy(&req->rq_ack_locks[i].lock, lock, sizeof(*lock)); - req->rq_ack_locks[i].mode = mode; - return &req->rq_ack_locks[i]; - } - CERROR("no space for lock in struct ptlrpc_request\n"); - LBUG(); - return NULL; -}