X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lib.c;h=83ad3c22c9e3c7a13bfb3d6f0309bf5915c09ec5;hb=7ce2000eb0f4e7b7ea1f362c17099881098cfef7;hp=735e383161e7c819cf53a14374d82e0c4e692679;hpb=96ec6856f91f7f9031cfce4273c714d72cfe59ae;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 735e3831..83ad3c2 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -19,7 +19,9 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define EXPORT_SYMTAB +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif #define DEBUG_SUBSYSTEM S_LDLM #ifdef __KERNEL__ @@ -27,30 +29,204 @@ #else # include #endif -#include +#include +#include /* for LUSTRE_OSC_NAME */ +#include /* for LUSTRE_MDC_NAME */ +#include #include -#include #include -int client_import_connect(struct lustre_handle *dlm_handle, +int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) +{ + struct ptlrpc_connection *conn; + struct lustre_cfg* lcfg = buf; + struct client_obd *cli = &obddev->u.cli; + struct obd_import *imp; + struct obd_uuid server_uuid; + int rq_portal, rp_portal, connect_op; + char *name = obddev->obd_type->typ_name; + char *mgmt_name = NULL; + int rc; + struct obd_device *mgmt_obd; + mgmtcli_register_for_events_t register_f; + ENTRY; + + /* In a more perfect world, we would hang a ptlrpc_client off of + * obd_type and just use the values from there. */ + if (!strcmp(name, LUSTRE_OSC_NAME)) { + rq_portal = OST_REQUEST_PORTAL; + rp_portal = OSC_REPLY_PORTAL; + connect_op = OST_CONNECT; + } else if (!strcmp(name, LUSTRE_MDC_NAME)) { + rq_portal = MDS_REQUEST_PORTAL; + rp_portal = MDC_REPLY_PORTAL; + connect_op = MDS_CONNECT; + } else if (!strcmp(name, LUSTRE_MGMTCLI_NAME)) { + rq_portal = MGMT_REQUEST_PORTAL; + rp_portal = MGMT_REPLY_PORTAL; + connect_op = MGMT_CONNECT; + } else { + CERROR("unknown client OBD type \"%s\", can't setup\n", + name); + RETURN(-EINVAL); + } + + if (lcfg->lcfg_inllen1 < 1) { + CERROR("requires a TARGET UUID\n"); + RETURN(-EINVAL); + } + + if (lcfg->lcfg_inllen1 > 37) { + CERROR("client UUID must be less than 38 characters\n"); + RETURN(-EINVAL); + } + + if (lcfg->lcfg_inllen2 < 1) { + CERROR("setup requires a SERVER UUID\n"); + RETURN(-EINVAL); + } + + if (lcfg->lcfg_inllen2 > 37) { + CERROR("target UUID must be less than 38 characters\n"); + RETURN(-EINVAL); + } + + sema_init(&cli->cl_sem, 1); + cli->cl_conn_count = 0; + memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, + min_t(unsigned int, lcfg->lcfg_inllen2, sizeof(server_uuid))); + + cli->cl_dirty = 0; + cli->cl_avail_grant = 0; + cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024; + INIT_LIST_HEAD(&cli->cl_cache_waiters); + INIT_LIST_HEAD(&cli->cl_loi_ready_list); + INIT_LIST_HEAD(&cli->cl_loi_write_list); + INIT_LIST_HEAD(&cli->cl_loi_read_list); + spin_lock_init(&cli->cl_loi_list_lock); + cli->cl_brw_in_flight = 0; + spin_lock_init(&cli->cl_read_rpc_hist.oh_lock); + spin_lock_init(&cli->cl_write_rpc_hist.oh_lock); + spin_lock_init(&cli->cl_read_page_hist.oh_lock); + spin_lock_init(&cli->cl_write_page_hist.oh_lock); + cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES; + cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT; + + rc = ldlm_get_ref(); + if (rc) { + CERROR("ldlm_get_ref failed: %d\n", rc); + GOTO(err, rc); + } + + conn = ptlrpc_uuid_to_connection(&server_uuid); + if (conn == NULL) + GOTO(err_ldlm, rc = -ENOENT); + + ptlrpc_init_client(rq_portal, rp_portal, name, + &obddev->obd_ldlm_client); + + imp = class_new_import(); + if (imp == NULL) { + ptlrpc_put_connection(conn); + GOTO(err_ldlm, rc = -ENOENT); + } + imp->imp_connection = conn; + imp->imp_client = &obddev->obd_ldlm_client; + imp->imp_obd = obddev; + imp->imp_connect_op = connect_op; + imp->imp_generation = 0; + imp->imp_initial_recov = 1; + INIT_LIST_HEAD(&imp->imp_pinger_chain); + memcpy(imp->imp_target_uuid.uuid, lcfg->lcfg_inlbuf1, + lcfg->lcfg_inllen1); + class_import_put(imp); + + cli->cl_import = imp; + cli->cl_max_mds_easize = sizeof(struct lov_mds_md); + cli->cl_max_mds_cookiesize = sizeof(struct llog_cookie); + cli->cl_sandev = to_kdev_t(0); + + if (lcfg->lcfg_inllen3 != 0) { + if (!strcmp(lcfg->lcfg_inlbuf3, "inactive")) { + CDEBUG(D_HA, "marking %s %s->%s as inactive\n", + name, obddev->obd_name, + imp->imp_target_uuid.uuid); + imp->imp_invalid = 1; + + if (lcfg->lcfg_inllen4 != 0) + mgmt_name = lcfg->lcfg_inlbuf4; + } else { + mgmt_name = lcfg->lcfg_inlbuf3; + } + } + + if (mgmt_name != NULL) { + /* Register with management client if we need to. */ + CDEBUG(D_HA, "%s registering with %s for events about %s\n", + obddev->obd_name, mgmt_name, server_uuid.uuid); + + mgmt_obd = class_name2obd(mgmt_name); + if (!mgmt_obd) { + CERROR("can't find mgmtcli %s to register\n", + mgmt_name); + GOTO(err_import, rc = -ENOSYS); + } + + register_f = inter_module_get("mgmtcli_register_for_events"); + if (!register_f) { + CERROR("can't i_m_g mgmtcli_register_for_events\n"); + GOTO(err_import, rc = -ENOSYS); + } + + rc = register_f(mgmt_obd, obddev, &imp->imp_target_uuid); + inter_module_put("mgmtcli_register_for_events"); + + if (!rc) + cli->cl_mgmtcli_obd = mgmt_obd; + } + + RETURN(rc); + +err_import: + class_destroy_import(imp); +err_ldlm: + ldlm_put_ref(0); +err: + RETURN(rc); + +} + +int client_obd_cleanup(struct obd_device *obddev, int flags) +{ + struct client_obd *cli = &obddev->u.cli; + + if (!cli->cl_import) + RETURN(-EINVAL); + if (cli->cl_mgmtcli_obd) { + mgmtcli_deregister_for_events_t dereg_f; + + dereg_f = inter_module_get("mgmtcli_deregister_for_events"); + dereg_f(cli->cl_mgmtcli_obd, obddev); + inter_module_put("mgmtcli_deregister_for_events"); + } + class_destroy_import(cli->cl_import); + cli->cl_import = NULL; + + ldlm_put_ref(flags & OBD_OPT_FORCE); + + RETURN(0); +} + +int client_connect_import(struct lustre_handle *dlm_handle, struct obd_device *obd, struct obd_uuid *cluuid) { struct client_obd *cli = &obd->u.cli; struct obd_import *imp = cli->cl_import; struct obd_export *exp; - struct ptlrpc_request *request; - /* XXX maybe this is a good time to create a connect struct? */ - int rc, size[] = {sizeof(imp->imp_target_uuid), - sizeof(obd->obd_uuid), - sizeof(*dlm_handle)}; - char *tmp[] = {imp->imp_target_uuid.uuid, - obd->obd_uuid.uuid, - (char *)dlm_handle}; - int rq_opc = (obd->obd_type->typ_ops->o_brw) ? OST_CONNECT :MDS_CONNECT; - int msg_flags; - + int rc; ENTRY; + down(&cli->cl_sem); rc = class_connect(dlm_handle, obd, cluuid); if (rc) @@ -59,6 +235,7 @@ int client_import_connect(struct lustre_handle *dlm_handle, cli->cl_conn_count++; if (cli->cl_conn_count > 1) GOTO(out_sem, rc); + exp = class_conn2export(dlm_handle); if (obd->obd_namespace != NULL) CERROR("already have namespace!\n"); @@ -67,69 +244,50 @@ int client_import_connect(struct lustre_handle *dlm_handle, if (obd->obd_namespace == NULL) GOTO(out_disco, rc = -ENOMEM); - request = ptlrpc_prep_req(imp, rq_opc, 3, size, tmp); - if (!request) - GOTO(out_ldlm, rc = -ENOMEM); - - request->rq_level = LUSTRE_CONN_NEW; - request->rq_replen = lustre_msg_size(0, NULL); - imp->imp_dlm_handle = *dlm_handle; - - imp->imp_level = LUSTRE_CONN_CON; - rc = ptlrpc_queue_wait(request); - if (rc) { - class_disconnect(dlm_handle, 0); - GOTO(out_req, rc); + rc = ptlrpc_init_import(imp); + if (rc != 0) + GOTO(out_ldlm, rc); + + exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection); + rc = ptlrpc_connect_import(imp, NULL); + if (rc != 0) { + LASSERT (imp->imp_state == LUSTRE_IMP_DISCON); + GOTO(out_ldlm, rc); } - exp = class_conn2export(dlm_handle); - exp->exp_connection = ptlrpc_connection_addref(request->rq_connection); - class_export_put(exp); - - msg_flags = lustre_msg_get_op_flags(request->rq_repmsg); - if (rq_opc == MDS_CONNECT || msg_flags & MSG_CONNECT_REPLAYABLE) { - imp->imp_replayable = 1; - CDEBUG(D_HA, "connected to replayable target: %s\n", - imp->imp_target_uuid.uuid); - } - imp->imp_level = LUSTRE_CONN_FULL; - imp->imp_remote_handle = request->rq_repmsg->handle; - CDEBUG(D_HA, "local import: %p, remote handle: "LPX64"\n", imp, - imp->imp_remote_handle.cookie); - + ptlrpc_pinger_add_import(imp); EXIT; -out_req: - ptlrpc_req_finished(request); + if (rc) { out_ldlm: - ldlm_namespace_free(obd->obd_namespace); + ldlm_namespace_free(obd->obd_namespace, 0); obd->obd_namespace = NULL; out_disco: cli->cl_conn_count--; - class_disconnect(dlm_handle, 0); + class_disconnect(exp, 0); + } else { + class_export_put(exp); } out_sem: up(&cli->cl_sem); return rc; } -int client_import_disconnect(struct lustre_handle *dlm_handle, int failover) +int client_disconnect_export(struct obd_export *exp, int failover) { - struct obd_device *obd = class_conn2obd(dlm_handle); + struct obd_device *obd = class_exp2obd(exp); struct client_obd *cli = &obd->u.cli; struct obd_import *imp = cli->cl_import; - struct ptlrpc_request *request = NULL; - int rc = 0, err, rq_opc; + int rc = 0, err; ENTRY; if (!obd) { - CERROR("invalid connection for disconnect: cookie "LPX64"\n", - dlm_handle ? dlm_handle->cookie : -1UL); + CERROR("invalid export for disconnect: exp %p cookie "LPX64"\n", + exp, exp ? exp->exp_handle.h_cookie : -1); RETURN(-EINVAL); } - rq_opc = obd->obd_type->typ_ops->o_brw ? OST_DISCONNECT:MDS_DISCONNECT; down(&cli->cl_sem); if (!cli->cl_conn_count) { CERROR("disconnecting disconnected device (%s)\n", @@ -141,37 +299,28 @@ int client_import_disconnect(struct lustre_handle *dlm_handle, int failover) if (cli->cl_conn_count) GOTO(out_no_disconnect, rc = 0); + /* Some non-replayable imports (MDS's OSCs) are pinged, so just + * delete it regardless. (It's safe to delete an import that was + * never added.) */ + (void)ptlrpc_pinger_del_import(imp); + if (obd->obd_namespace != NULL) { /* obd_no_recov == local only */ ldlm_cli_cancel_unused(obd->obd_namespace, NULL, obd->obd_no_recov, NULL); - ldlm_namespace_free(obd->obd_namespace); + ldlm_namespace_free(obd->obd_namespace, obd->obd_no_recov); obd->obd_namespace = NULL; } /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */ - if (obd->obd_no_recov) { - ptlrpc_abort_inflight(imp); - } else { - request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL); - if (!request) - GOTO(out_req, rc = -ENOMEM); + if (obd->obd_no_recov) + ptlrpc_invalidate_import(imp, 0); + else + rc = ptlrpc_disconnect_import(imp); - request->rq_replen = lustre_msg_size(0, NULL); - - /* Process disconnects even if we're waiting for recovery. */ - request->rq_level = LUSTRE_CONN_RECOVD; - - rc = ptlrpc_queue_wait(request); - if (rc) - GOTO(out_req, rc); - } EXIT; - out_req: - if (request) - ptlrpc_req_finished(request); out_no_disconnect: - err = class_disconnect(dlm_handle, 0); + err = class_disconnect(exp, 0); if (!rc && err) rc = err; out_sem: @@ -188,7 +337,7 @@ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, { if (exp->exp_connection) { struct lustre_handle *hdl; - hdl = &exp->exp_ldlm_data.led_import->imp_remote_handle; + hdl = &exp->exp_imp_reverse->imp_remote_handle; /* Might be a re-connect after a partition. */ if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) { CERROR("%s reconnecting\n", cluuid->uuid); @@ -200,12 +349,7 @@ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, LPX64")\n", cluuid->uuid, exp->exp_connection->c_remote_uuid.uuid, hdl->cookie, conn->cookie); - /* XXX disconnect them here? */ memset(conn, 0, sizeof *conn); - /* This is a little scary, but right now we build this - * file separately into each server module, so I won't - * go _immediately_ to hell. - */ RETURN(-EALREADY); } } @@ -221,47 +365,60 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) { struct obd_device *target; struct obd_export *export = NULL; - struct obd_import *dlmimp; + struct obd_import *revimp; struct lustre_handle conn; struct obd_uuid tgtuuid; struct obd_uuid cluuid; struct obd_uuid remote_uuid; struct list_head *p; char *str, *tmp; - int rc, i, abort_recovery; + int rc = 0, abort_recovery; + unsigned long flags; ENTRY; + OBD_RACE(OBD_FAIL_TGT_CONN_RACE); + LASSERT_REQSWAB (req, 0); - str = lustre_msg_string (req->rq_reqmsg, 0, sizeof (tgtuuid.uuid) - 1); + str = lustre_msg_string(req->rq_reqmsg, 0, sizeof(tgtuuid) - 1); if (str == NULL) { CERROR("bad target UUID for connect\n"); GOTO(out, rc = -EINVAL); } + obd_str2uuid (&tgtuuid, str); + target = class_uuid2obd(&tgtuuid); + if (!target) { + target = class_name2obd(str); + } + + if (!target || target->obd_stopping || !target->obd_set_up) { + CERROR("UUID '%s' is not available for connect\n", str); + GOTO(out, rc = -ENODEV); + } LASSERT_REQSWAB (req, 1); - str = lustre_msg_string (req->rq_reqmsg, 1, sizeof (cluuid.uuid) - 1); + str = lustre_msg_string(req->rq_reqmsg, 1, sizeof(cluuid) - 1); if (str == NULL) { CERROR("bad client UUID for connect\n"); GOTO(out, rc = -EINVAL); } - obd_str2uuid (&cluuid, str); - i = class_uuid2dev(&tgtuuid); - if (i == -1) { - CERROR("UUID '%s' not found for connect\n", tgtuuid.uuid); - GOTO(out, rc = -ENODEV); - } - - target = &obd_dev[i]; - if (!target || target->obd_stopping || !target->obd_set_up) { - CERROR("UUID '%s' is not available for connect\n", str); - GOTO(out, rc = -ENODEV); - } + obd_str2uuid (&cluuid, str); /* XXX extract a nettype and format accordingly */ - snprintf(remote_uuid.uuid, sizeof remote_uuid, - "NET_"LPX64"_UUID", req->rq_peer.peer_nid); + switch (sizeof(ptl_nid_t)) { + /* NB the casts only avoid compiler warnings */ + case 8: + snprintf(remote_uuid.uuid, sizeof remote_uuid, + "NET_"LPX64"_UUID", (__u64)req->rq_peer.peer_nid); + break; + case 4: + snprintf(remote_uuid.uuid, sizeof remote_uuid, + "NET_%x_UUID", (__u32)req->rq_peer.peer_nid); + break; + default: + LBUG(); + } spin_lock_bh(&target->obd_processing_task_lock); abort_recovery = target->obd_abort_recovery; @@ -275,12 +432,12 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) memcpy(&conn, tmp, sizeof conn); - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); + rc = lustre_pack_reply(req, 0, NULL, NULL); if (rc) GOTO(out, rc); /* lctl gets a backstage, all-access pass. */ - if (obd_uuid_equals(&cluuid, &lctl_fake_uuid)) + if (obd_uuid_equals(&cluuid, &target->obd_uuid)) goto dont_check_exports; spin_lock(&target->obd_dev_lock); @@ -296,8 +453,13 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) export = NULL; } /* If we found an export, we already unlocked. */ - if (!export) + if (!export) { spin_unlock(&target->obd_dev_lock); + } else if (req->rq_reqmsg->conn_cnt == 1) { + CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n", + cluuid.uuid); + GOTO(out, rc = -EALREADY); + } /* Tell the client if we're in recovery. */ /* If this is the first client, start the recovery timer */ @@ -313,7 +475,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) if (export == NULL) { if (target->obd_recovering) { CERROR("denying connection for new client %s: " - "in recovery\n", cluuid.uuid); + "%d clients in recovery for %lds\n", cluuid.uuid, + target->obd_recoverable_clients, + (target->obd_recovery_timer.expires-jiffies)/HZ); rc = -EBUSY; } else { dont_check_exports: @@ -321,12 +485,18 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) } } + /* If all else goes well, this is our RPC return code. */ req->rq_status = 0; if (rc && rc != EALREADY) GOTO(out, rc); + /* XXX track this all the time? */ + if (target->obd_recovering) { + target->obd_connected_clients++; + } + req->rq_repmsg->handle = conn; /* If the client and the server are the same node, we will already @@ -343,13 +513,25 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) export = req->rq_export = class_conn2export(&conn); LASSERT(export != NULL); - if (req->rq_connection != NULL) - ptlrpc_put_connection(req->rq_connection); + spin_lock_irqsave(&export->exp_lock, flags); + if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) { + CERROR("%s: already connected at a higher conn_cnt: %d > %d\n", + cluuid.uuid, export->exp_conn_cnt, + req->rq_reqmsg->conn_cnt); + spin_unlock_irqrestore(&export->exp_lock, flags); + GOTO(out, rc = -EALREADY); + } + export->exp_conn_cnt = req->rq_reqmsg->conn_cnt; + spin_unlock_irqrestore(&export->exp_lock, flags); + + /* request from liblustre? */ + if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT) + export->exp_libclient = 1; + if (export->exp_connection != NULL) ptlrpc_put_connection(export->exp_connection); export->exp_connection = ptlrpc_get_connection(&req->rq_peer, &remote_uuid); - req->rq_connection = ptlrpc_connection_addref(export->exp_connection); if (rc == EALREADY) { /* We indicate the reconnection in a flag, not an error code. */ @@ -360,16 +542,16 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn), sizeof conn); - if (export->exp_ldlm_data.led_import != NULL) - class_destroy_import(export->exp_ldlm_data.led_import); - dlmimp = export->exp_ldlm_data.led_import = class_new_import(); - dlmimp->imp_connection = ptlrpc_connection_addref(req->rq_connection); - dlmimp->imp_client = &export->exp_obd->obd_ldlm_client; - dlmimp->imp_remote_handle = conn; - dlmimp->imp_obd = target; - dlmimp->imp_dlm_fake = 1; - dlmimp->imp_level = LUSTRE_CONN_FULL; - class_import_put(dlmimp); + if (export->exp_imp_reverse != NULL) + class_destroy_import(export->exp_imp_reverse); + revimp = export->exp_imp_reverse = class_new_import(); + revimp->imp_connection = ptlrpc_connection_addref(export->exp_connection); + revimp->imp_client = &export->exp_obd->obd_ldlm_client; + revimp->imp_remote_handle = conn; + revimp->imp_obd = target; + revimp->imp_dlm_fake = 1; + revimp->imp_state = LUSTRE_IMP_FULL; + class_import_put(revimp); out: if (rc) req->rq_status = rc; @@ -378,34 +560,37 @@ out: int target_handle_disconnect(struct ptlrpc_request *req) { - struct lustre_handle *conn = &req->rq_reqmsg->handle; - struct obd_import *dlmimp; + struct obd_export *exp; int rc; ENTRY; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); + rc = lustre_pack_reply(req, 0, NULL, NULL); if (rc) RETURN(rc); - req->rq_status = obd_disconnect(conn, 0); - - dlmimp = req->rq_export->exp_ldlm_data.led_import; - class_destroy_import(dlmimp); - - class_export_put(req->rq_export); - req->rq_export = NULL; + /* keep the rq_export around so we can send the reply */ + exp = class_export_get(req->rq_export); + req->rq_status = obd_disconnect(exp, 0); RETURN(0); } +void target_destroy_export(struct obd_export *exp) +{ + /* exports created from last_rcvd data, and "fake" + exports created by lctl don't have an import */ + if (exp->exp_imp_reverse != NULL) + class_destroy_import(exp->exp_imp_reverse); + + /* We cancel locks at disconnect time, but this will catch any locks + * granted in a race with recovery-induced disconnect. */ + if (exp->exp_obd->obd_namespace != NULL) + ldlm_cancel_locks_for_export(exp); +} + /* * Recovery functions */ -void target_cancel_recovery_timer(struct obd_device *obd) -{ - del_timer(&obd->obd_recovery_timer); -} - static void abort_delayed_replies(struct obd_device *obd) { struct ptlrpc_request *req; @@ -416,6 +601,7 @@ static void abort_delayed_replies(struct obd_device *obd) req->rq_status = -ENOTCONN; req->rq_type = PTL_RPC_MSG_ERR; ptlrpc_reply(req); + class_export_put(req->rq_export); list_del(&req->rq_list); OBD_FREE(req->rq_reqmsg, req->rq_reqlen); OBD_FREE(req, sizeof *req); @@ -433,8 +619,7 @@ static void abort_recovery_queue(struct obd_device *obd) DEBUG_REQ(D_ERROR, req, "aborted:"); req->rq_status = -ENOTCONN; req->rq_type = PTL_RPC_MSG_ERR; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, - &req->rq_repmsg); + rc = lustre_pack_reply(req, 0, NULL, NULL); if (rc == 0) { ptlrpc_reply(req); } else { @@ -451,6 +636,7 @@ static void abort_recovery_queue(struct obd_device *obd) void target_abort_recovery(void *data) { struct obd_device *obd = data; + int rc; CERROR("disconnecting clients and aborting recovery\n"); spin_lock_bh(&obd->obd_processing_task_lock); @@ -461,13 +647,25 @@ void target_abort_recovery(void *data) } obd->obd_recovering = obd->obd_abort_recovery = 0; - obd->obd_recoverable_clients = 0; + wake_up(&obd->obd_next_transno_waitq); target_cancel_recovery_timer(obd); spin_unlock_bh(&obd->obd_processing_task_lock); + class_disconnect_exports(obd, 0); + + /* when recovery was aborted, cleanup orphans on mds and ost */ + if (OBT(obd) && OBP(obd, postrecov)) { + rc = OBP(obd, postrecov)(obd); + if (rc >= 0) + CWARN("Cleanup %d orphans after recovery was aborted\n", rc); + else + CERROR("postrecov failed %d\n", rc); + } + abort_delayed_replies(obd); abort_recovery_queue(obd); + ptlrpc_run_recovery_over_upcall(obd); } static void target_recovery_expired(unsigned long castmeharder) @@ -480,18 +678,25 @@ static void target_recovery_expired(unsigned long castmeharder) spin_unlock_bh(&obd->obd_processing_task_lock); } -static void reset_recovery_timer(struct obd_device *obd) + +/* obd_processing_task_lock should be held */ +void target_cancel_recovery_timer(struct obd_device *obd) { - int recovering; - spin_lock(&obd->obd_dev_lock); - recovering = obd->obd_recovering; - spin_unlock(&obd->obd_dev_lock); + CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name); + del_timer(&obd->obd_recovery_timer); +} - if (!recovering) +static void reset_recovery_timer(struct obd_device *obd) +{ + spin_lock_bh(&obd->obd_processing_task_lock); + if (!obd->obd_recovering) { + spin_unlock_bh(&obd->obd_processing_task_lock); return; - CDEBUG(D_ERROR, "timer will expire in %ld seconds\n", + } + CDEBUG(D_HA, "timer will expire in %u seconds\n", OBD_RECOVERY_TIMEOUT / HZ); mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT); + spin_unlock_bh(&obd->obd_processing_task_lock); } @@ -503,11 +708,11 @@ void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler) spin_unlock_bh(&obd->obd_processing_task_lock); return; } - CERROR("%s: starting recovery timer\n", obd->obd_name); + CWARN("%s: starting recovery timer (%us)\n", obd->obd_name, + OBD_RECOVERY_TIMEOUT / HZ); obd->obd_recovery_handler = handler; obd->obd_recovery_timer.function = target_recovery_expired; obd->obd_recovery_timer.data = (unsigned long)obd; - init_timer(&obd->obd_recovery_timer); spin_unlock_bh(&obd->obd_processing_task_lock); reset_recovery_timer(obd); @@ -516,17 +721,38 @@ void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler) static int check_for_next_transno(struct obd_device *obd) { struct ptlrpc_request *req; - int wake_up; + int wake_up = 0, connected, completed, queue_len, max; + __u64 next_transno, req_transno; + spin_lock_bh(&obd->obd_processing_task_lock); req = list_entry(obd->obd_recovery_queue.next, struct ptlrpc_request, rq_list); - LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno); - - wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno || - (obd->obd_recovering) == 0; - CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n", - req->rq_reqmsg->transno, obd->obd_next_recovery_transno, - obd->obd_recovering, wake_up); + max = obd->obd_max_recoverable_clients; + req_transno = req->rq_reqmsg->transno; + connected = obd->obd_connected_clients; + completed = max - obd->obd_recoverable_clients; + queue_len = obd->obd_requests_queued_for_recovery; + next_transno = obd->obd_next_recovery_transno; + + if (obd->obd_abort_recovery) { + CDEBUG(D_HA, "waking for aborted recovery\n"); + wake_up = 1; + } else if (!obd->obd_recovering) { + CDEBUG(D_HA, "waking for completed recovery (?)\n"); + wake_up = 1; + } else if (req_transno == next_transno) { + CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno); + wake_up = 1; + } else if (queue_len + completed == max) { + CDEBUG(D_ERROR, + "waking for skipped transno (skip: "LPD64 + ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n", + next_transno, queue_len, completed, max, req_transno); + obd->obd_next_recovery_transno = req_transno; + wake_up = 1; + } + spin_unlock_bh(&obd->obd_processing_task_lock); + LASSERT(req->rq_reqmsg->transno >= next_transno); return wake_up; } @@ -561,12 +787,15 @@ static void process_recovery_queue(struct obd_device *obd) continue; } list_del_init(&req->rq_list); + obd->obd_requests_queued_for_recovery--; spin_unlock_bh(&obd->obd_processing_task_lock); - DEBUG_REQ(D_ERROR, req, "processing: "); + DEBUG_REQ(D_HA, req, "processing: "); (void)obd->obd_recovery_handler(req); + obd->obd_replayed_requests++; reset_recovery_timer(obd); -#warning FIXME: mds_fsync_super(mds->mds_sb); + /* bug 1580: decide how to properly sync() in recovery */ + //mds_fsync_super(mds->mds_sb); class_export_put(req->rq_export); OBD_FREE(req->rq_reqmsg, req->rq_reqlen); OBD_FREE(req, sizeof *req); @@ -652,12 +881,13 @@ int target_queue_recovery_request(struct ptlrpc_request *req, list_add_tail(&req->rq_list, &obd->obd_recovery_queue); } + obd->obd_requests_queued_for_recovery++; + if (obd->obd_processing_task != 0) { /* Someone else is processing this queue, we'll leave it to * them. */ - if (transno == obd->obd_next_recovery_transno) - wake_up(&obd->obd_next_transno_waitq); + wake_up(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); return 0; } @@ -683,16 +913,20 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) struct ptlrpc_request *saved_req; struct lustre_msg *reqmsg; int recovery_done = 0; + int rc2; + + LASSERT ((rc == 0) == (req->rq_reply_state != NULL)); if (rc) { /* Just like ptlrpc_error, but without the sending. */ - lustre_pack_msg(0, NULL, NULL, &req->rq_replen, - &req->rq_repmsg); + rc = lustre_pack_reply(req, 0, NULL, NULL); + LASSERT(rc == 0); /* XXX handle this */ req->rq_type = PTL_RPC_MSG_ERR; } + LASSERT (!req->rq_reply_state->rs_difficult); LASSERT(list_empty(&req->rq_list)); - /* XXX just like the request-dup code in queue_recovery_request */ + /* XXX a bit like the request-dup code in queue_recovery_request */ OBD_ALLOC(saved_req, sizeof *saved_req); if (!saved_req) LBUG(); @@ -701,8 +935,11 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) LBUG(); memcpy(saved_req, req, sizeof *saved_req); memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); + /* the copied req takes over the reply state */ + req->rq_reply_state = NULL; req = saved_req; req->rq_reqmsg = reqmsg; + class_export_get(req->rq_export); list_add(&req->rq_list, &obd->obd_delayed_reply_queue); spin_lock_bh(&obd->obd_processing_task_lock); @@ -713,171 +950,167 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) if (recovery_done) { struct list_head *tmp, *n; ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace); - CDEBUG(D_ERROR, - "%s: all clients recovered, sending delayed replies\n", + CWARN("%s: all clients recovered, sending delayed replies\n", obd->obd_name); + spin_lock_bh(&obd->obd_processing_task_lock); obd->obd_recovering = 0; + target_cancel_recovery_timer(obd); + spin_unlock_bh(&obd->obd_processing_task_lock); + + /* when recovery finished, cleanup orphans on mds and ost */ + if (OBT(obd) && OBP(obd, postrecov)) { + rc2 = OBP(obd, postrecov)(obd); + if (rc2 >= 0) + CWARN("%s: all clients recovered, %d MDS " + "orphans deleted\n", obd->obd_name, rc2); + else + CERROR("postrecov failed %d\n", rc2); + } + list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { req = list_entry(tmp, struct ptlrpc_request, rq_list); DEBUG_REQ(D_ERROR, req, "delayed:"); ptlrpc_reply(req); + class_export_put(req->rq_export); list_del(&req->rq_list); OBD_FREE(req->rq_reqmsg, req->rq_reqlen); OBD_FREE(req, sizeof *req); } - target_cancel_recovery_timer(obd); + ptlrpc_run_recovery_over_upcall(obd); } else { - CERROR("%s: %d recoverable clients remain\n", + CWARN("%s: %d recoverable clients remain\n", obd->obd_name, obd->obd_recoverable_clients); + wake_up(&obd->obd_next_transno_waitq); } return 1; } -static void ptlrpc_abort_reply (struct ptlrpc_request *req) +int +target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id) { - /* On return, we must be sure that the ACK callback has either - * happened or will not happen. Note that the SENT callback will - * happen come what may since we successfully posted the PUT. */ - int rc; - struct l_wait_info lwi; - unsigned long flags; - - again: - /* serialise with ACK callback */ - spin_lock_irqsave (&req->rq_lock, flags); - if (!req->rq_want_ack) { - spin_unlock_irqrestore (&req->rq_lock, flags); - /* The ACK callback has happened already. Although the - * SENT callback might still be outstanding (yes really) we - * don't care; this is just like normal completion. */ - return; + if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) { + obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED; + DEBUG_REQ(D_ERROR, req, "dropping reply"); + /* NB this does _not_ send with ACK disabled, to simulate + * sending OK, but timing out for the ACK */ + if (req->rq_reply_state != NULL) { + if (!req->rq_reply_state->rs_difficult) { + lustre_free_reply_state (req->rq_reply_state); + req->rq_reply_state = NULL; + } else { + struct ptlrpc_service *svc = + req->rq_rqbd->rqbd_srv_ni->sni_service; + atomic_inc(&svc->srv_outstanding_replies); + } + } + return (-ECOMM); } - spin_unlock_irqrestore (&req->rq_lock, flags); - /* Have a bash at unlinking the MD. This will fail until the SENT - * callback has happened since the MD is busy from the PUT. If the - * ACK still hasn't arrived after then, a successful unlink will - * ensure the ACK callback never happens. */ - rc = PtlMDUnlink (req->rq_reply_md_h); - switch (rc) { - default: - LBUG (); - case PTL_OK: - /* SENT callback happened; ACK callback preempted */ - LASSERT (req->rq_want_ack); - spin_lock_irqsave (&req->rq_lock, flags); - req->rq_want_ack = 0; - spin_unlock_irqrestore (&req->rq_lock, flags); - return; - case PTL_INV_MD: - return; - case PTL_MD_INUSE: - /* Still sending or ACK callback in progress: wait until - * either callback has completed and try again. - * Actually we can't wait for the SENT callback because - * there's no state the SENT callback can touch that will - * allow it to communicate with us! So we just wait here - * for a short time, effectively polling for the SENT - * callback by calling PtlMDUnlink() again, to see if it - * has finished. Note that if the ACK does arrive, its - * callback wakes us in short order. --eeb */ - lwi = LWI_TIMEOUT (HZ/4, NULL, NULL); - rc = l_wait_event(req->rq_wait_for_rep, !req->rq_want_ack, - &lwi); - CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc); - /* NB go back and test rq_want_ack with locking, to ensure - * if ACK callback happened, it has completed stopped - * referencing this req. */ - goto again; + if (rc) { + DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc); + if (req->rq_reply_state == NULL) { + rc = lustre_pack_reply (req, 0, NULL, NULL); + if (rc != 0) { + CERROR ("can't allocate reply\n"); + return (rc); + } + } + req->rq_type = PTL_RPC_MSG_ERR; + } else { + DEBUG_REQ(D_NET, req, "sending reply"); } + + return (ptlrpc_send_reply(req, 1)); } -void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) +void +target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) { - int i; - int netrc; - unsigned long flags; - struct ptlrpc_req_ack_lock *ack_lock; - struct l_wait_info lwi = { 0 }; - wait_queue_t commit_wait; - struct obd_device *obd = - req->rq_export ? req->rq_export->exp_obd : NULL; - struct obd_export *exp = - (req->rq_export && req->rq_ack_locks[0].mode) ? - req->rq_export : NULL; - - if (exp) { - exp->exp_outstanding_reply = req; - spin_lock_irqsave (&req->rq_lock, flags); - req->rq_want_ack = 1; - spin_unlock_irqrestore (&req->rq_lock, flags); - } - - if (!OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) { - if (rc) { - DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc); - netrc = ptlrpc_error(req); - } else { - DEBUG_REQ(D_NET, req, "sending reply"); - netrc = ptlrpc_reply(req); - } - } else { - obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED; - DEBUG_REQ(D_ERROR, req, "dropping reply"); - if (!exp && req->rq_repmsg) { - OBD_FREE(req->rq_repmsg, req->rq_replen); - req->rq_repmsg = NULL; - } - init_waitqueue_head(&req->rq_wait_for_rep); - netrc = 0; + int netrc; + unsigned long flags; + struct ptlrpc_reply_state *rs; + struct obd_device *obd; + struct obd_export *exp; + struct ptlrpc_srv_ni *sni; + struct ptlrpc_service *svc; + + sni = req->rq_rqbd->rqbd_srv_ni; + svc = sni->sni_service; + + rs = req->rq_reply_state; + if (rs == NULL || !rs->rs_difficult) { + /* The easy case; no notifiers and reply_out_callback() + * cleans up (i.e. we can't look inside rs after a + * successful send) */ + netrc = target_send_reply_msg (req, rc, fail_id); + + LASSERT (netrc == 0 || req->rq_reply_state == NULL); + return; } - /* a failed send simulates the callbacks */ - LASSERT(netrc == 0 || req->rq_want_ack == 0); - if (exp == NULL) { - LASSERT(req->rq_want_ack == 0); - return; + /* must be an export if locks saved */ + LASSERT (req->rq_export != NULL); + /* req/reply consistent */ + LASSERT (rs->rs_srv_ni == sni); + + /* "fresh" reply */ + LASSERT (!rs->rs_scheduled); + LASSERT (!rs->rs_scheduled_ever); + LASSERT (!rs->rs_handled); + LASSERT (!rs->rs_on_net); + LASSERT (rs->rs_export == NULL); + LASSERT (list_empty(&rs->rs_obd_list)); + LASSERT (list_empty(&rs->rs_exp_list)); + + exp = class_export_get (req->rq_export); + obd = exp->exp_obd; + + /* disable reply scheduling onto srv_reply_queue while I'm setting up */ + rs->rs_scheduled = 1; + rs->rs_on_net = 1; + rs->rs_xid = req->rq_xid; + rs->rs_transno = req->rq_transno; + rs->rs_export = exp; + + spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags); + + if (rs->rs_transno > obd->obd_last_committed) { + /* not committed already */ + list_add_tail (&rs->rs_obd_list, + &obd->obd_uncommitted_replies); } - LASSERT(obd != NULL); - init_waitqueue_entry(&commit_wait, current); - add_wait_queue(&obd->obd_commit_waitq, &commit_wait); - rc = l_wait_event(req->rq_wait_for_rep, - !req->rq_want_ack || req->rq_resent || - req->rq_transno <= obd->obd_last_committed, &lwi); - remove_wait_queue(&obd->obd_commit_waitq, &commit_wait); + spin_unlock (&obd->obd_uncommitted_replies_lock); + spin_lock (&exp->exp_lock); - spin_lock_irqsave (&req->rq_lock, flags); - /* If we got here because the ACK callback ran, this acts as a - * barrier to ensure the callback completed the wakeup. */ - spin_unlock_irqrestore (&req->rq_lock, flags); + list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies); - /* If we committed the transno already, then we might wake up before - * the ack arrives. We need to stop waiting for the ack before we can - * reuse this request structure. We are guaranteed by this point that - * this cannot abort the sending of the actual reply.*/ - ptlrpc_abort_reply(req); + spin_unlock_irqrestore (&exp->exp_lock, flags); - if (req->rq_resent) { - DEBUG_REQ(D_HA, req, "resent: not cancelling locks"); - return; - } + netrc = target_send_reply_msg (req, rc, fail_id); - LASSERT(rc == 0); - DEBUG_REQ(D_HA, req, "cancelling locks for %s", - req->rq_want_ack ? "commit" : "ack"); + spin_lock_irqsave (&svc->srv_lock, flags); - exp->exp_outstanding_reply = NULL; + svc->srv_n_difficult_replies++; - for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) { - if (!ack_lock->mode) - break; - ldlm_lock_decref(&ack_lock->lock, ack_lock->mode); + if (netrc != 0) /* error sending: reply is off the net */ + rs->rs_on_net = 0; + + if (!rs->rs_on_net || /* some notifier */ + list_empty(&rs->rs_exp_list) || /* completed already */ + list_empty(&rs->rs_obd_list)) { + list_add_tail (&rs->rs_list, &svc->srv_reply_queue); + wake_up (&svc->srv_waitq); + } else { + list_add (&rs->rs_list, &sni->sni_active_replies); + rs->rs_scheduled = 0; /* allow notifier to schedule */ } + + spin_unlock_irqrestore (&svc->srv_lock, flags); } int target_handle_ping(struct ptlrpc_request *req) { - return lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); + return lustre_pack_reply(req, 0, NULL, NULL); }