X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lib.c;h=20a398e8b816e661dc427bd566e34d6275ada59e;hp=9a9721e1c35d326505c7d186c56032e35f43737c;hb=95d394d59bbccbc92af076c985fd664719f8fe12;hpb=d41530916baeff6b289e5658cc27d79fdd34cb7e diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 9a9721e..20a398e 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -35,10 +35,142 @@ #include #include #include +/* @priority: if non-zero, move the selected to the list head + * @nocreate: if non-zero, only search in existed connections + */ +static int import_set_conn(struct obd_import *imp, struct obd_uuid *uuid, + int priority, int nocreate) +{ + struct ptlrpc_connection *ptlrpc_conn; + struct obd_import_conn *imp_conn = NULL, *item; + int rc = 0; + ENTRY; + + LASSERT(!(nocreate && !priority)); + + ptlrpc_conn = ptlrpc_uuid_to_connection(uuid); + if (!ptlrpc_conn) { + CERROR("can't find connection %s\n", uuid->uuid); + RETURN (-EINVAL); + } + + if (!nocreate) { + OBD_ALLOC(imp_conn, sizeof(*imp_conn)); + if (!imp_conn) { + CERROR("fail to alloc memory\n"); + GOTO(out_put, rc = -ENOMEM); + } + } + + spin_lock(&imp->imp_lock); + list_for_each_entry(item, &imp->imp_conn_list, oic_item) { + if (obd_uuid_equals(uuid, &item->oic_uuid)) { + if (priority) { + list_del(&item->oic_item); + list_add(&item->oic_item, &imp->imp_conn_list); + item->oic_last_attempt = 0; + } + CDEBUG(D_HA, "imp %p@%s: find existed conn %s%s\n", + imp, imp->imp_obd->obd_name, uuid->uuid, + (priority ? ", move to head." : "")); + spin_unlock(&imp->imp_lock); + GOTO(out_free, rc = 0); + } + } + /* not found */ + if (!nocreate) { + imp_conn->oic_conn = ptlrpc_conn; + imp_conn->oic_uuid = *uuid; + imp_conn->oic_last_attempt = 0; + if (priority) + list_add(&imp_conn->oic_item, &imp->imp_conn_list); + else + list_add_tail(&imp_conn->oic_item, &imp->imp_conn_list); + CDEBUG(D_HA, "imp %p@%s: add connection %s at %s\n", + imp, imp->imp_obd->obd_name, uuid->uuid, + (priority ? "head" : "tail")); + } else + rc = -ENOENT; + + spin_unlock(&imp->imp_lock); + RETURN(0); +out_free: + if (imp_conn) + OBD_FREE(imp_conn, sizeof(*imp_conn)); +out_put: + ptlrpc_put_connection(ptlrpc_conn); + RETURN(rc); +} + +int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid) +{ + return import_set_conn(imp, uuid, 1, 1); +} + +int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid, + int priority) +{ + return import_set_conn(imp, uuid, priority, 0); +} + +int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid) +{ + struct obd_import_conn *imp_conn; + struct obd_export *dlmexp; + int rc = -ENOENT; + ENTRY; + + spin_lock(&imp->imp_lock); + if (list_empty(&imp->imp_conn_list)) { + LASSERT(!imp->imp_conn_current); + LASSERT(!imp->imp_connection); + GOTO(out, rc); + } + + list_for_each_entry(imp_conn, &imp->imp_conn_list, oic_item) { + if (!obd_uuid_equals(uuid, &imp_conn->oic_uuid)) + continue; + LASSERT(imp_conn->oic_conn); + + /* is current conn? */ + if (imp_conn == imp->imp_conn_current) { + LASSERT(imp_conn->oic_conn == imp->imp_connection); + + if (imp->imp_state != LUSTRE_IMP_CLOSED && + imp->imp_state != LUSTRE_IMP_DISCON) { + CERROR("can't remove current connection\n"); + GOTO(out, rc = -EBUSY); + } + + ptlrpc_put_connection(imp->imp_connection); + imp->imp_connection = NULL; + + dlmexp = class_conn2export(&imp->imp_dlm_handle); + if (dlmexp && dlmexp->exp_connection) { + LASSERT(dlmexp->exp_connection == + imp_conn->oic_conn); + ptlrpc_put_connection(dlmexp->exp_connection); + dlmexp->exp_connection = NULL; + } + } + + list_del(&imp_conn->oic_item); + ptlrpc_put_connection(imp_conn->oic_conn); + OBD_FREE(imp_conn, sizeof(*imp_conn)); + CDEBUG(D_HA, "imp %p@%s: remove connection %s\n", + imp, imp->imp_obd->obd_name, uuid->uuid); + rc = 0; + break; + } +out: + spin_unlock(&imp->imp_lock); + if (rc == -ENOENT) + CERROR("connection %s not found\n", uuid->uuid); + RETURN(rc); +} int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) { - struct ptlrpc_connection *conn; struct lustre_cfg* lcfg = buf; struct client_obd *cli = &obddev->u.cli; struct obd_import *imp; @@ -98,19 +230,30 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) cli->cl_dirty = 0; cli->cl_avail_grant = 0; + /* FIXME: should limit this for the sum of all cl_dirty_max */ cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024; + if (cli->cl_dirty_max >> PAGE_SHIFT > num_physpages / 8) + cli->cl_dirty_max = num_physpages << (PAGE_SHIFT - 3); INIT_LIST_HEAD(&cli->cl_cache_waiters); INIT_LIST_HEAD(&cli->cl_loi_ready_list); INIT_LIST_HEAD(&cli->cl_loi_write_list); INIT_LIST_HEAD(&cli->cl_loi_read_list); spin_lock_init(&cli->cl_loi_list_lock); - cli->cl_brw_in_flight = 0; + cli->cl_r_in_flight = 0; + cli->cl_w_in_flight = 0; spin_lock_init(&cli->cl_read_rpc_hist.oh_lock); spin_lock_init(&cli->cl_write_rpc_hist.oh_lock); spin_lock_init(&cli->cl_read_page_hist.oh_lock); spin_lock_init(&cli->cl_write_page_hist.oh_lock); - cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES; - cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT; + + if (num_physpages <= 32768) { /* <= 128 MB */ + cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES / 3; + cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT / 3; + } else { + cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES; + cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT; + } + rc = ldlm_get_ref(); if (rc) { @@ -118,19 +261,12 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(err, rc); } - conn = ptlrpc_uuid_to_connection(&server_uuid); - if (conn == NULL) - GOTO(err_ldlm, rc = -ENOENT); - ptlrpc_init_client(rq_portal, rp_portal, name, &obddev->obd_ldlm_client); imp = class_new_import(); - if (imp == NULL) { - ptlrpc_put_connection(conn); + if (imp == NULL) GOTO(err_ldlm, rc = -ENOENT); - } - imp->imp_connection = conn; imp->imp_client = &obddev->obd_ldlm_client; imp->imp_obd = obddev; imp->imp_connect_op = connect_op; @@ -141,6 +277,12 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) lcfg->lcfg_inllen1); class_import_put(imp); + rc = client_import_add_conn(imp, &server_uuid, 1); + if (rc) { + CERROR("can't add initial connection\n"); + GOTO(err_import, rc); + } + cli->cl_import = imp; cli->cl_max_mds_easize = sizeof(struct lov_mds_md); cli->cl_max_mds_cookiesize = sizeof(struct llog_cookie); @@ -250,14 +392,13 @@ int client_connect_import(struct lustre_handle *dlm_handle, if (rc != 0) GOTO(out_ldlm, rc); - exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection); imp->imp_connect_flags = connect_flags; rc = ptlrpc_connect_import(imp, NULL); if (rc != 0) { LASSERT (imp->imp_state == LUSTRE_IMP_DISCON); GOTO(out_ldlm, rc); } - + LASSERT(exp->exp_connection); ptlrpc_pinger_add_import(imp); EXIT; @@ -276,7 +417,7 @@ out_sem: return rc; } -int client_disconnect_export(struct obd_export *exp, int failover) +int client_disconnect_export(struct obd_export *exp, unsigned long flags) { struct obd_device *obd = class_exp2obd(exp); struct client_obd *cli = &obd->u.cli; @@ -363,6 +504,7 @@ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, RETURN(0); } +static char nidstr[PTL_NALFMT_SIZE]; int target_handle_connect(struct ptlrpc_request *req) { unsigned long connect_flags = 0, *cfp; @@ -397,8 +539,8 @@ int target_handle_connect(struct ptlrpc_request *req) } if (!target || target->obd_stopping || !target->obd_set_up) { - CERROR("UUID '%s' is not available for connect\n", str); - + CERROR("UUID '%s' is not available for connect from NID %s\n", + str, ptlrpc_peernid2str(&req->rq_peer, nidstr)); GOTO(out, rc = -ENODEV); } @@ -416,11 +558,11 @@ int target_handle_connect(struct ptlrpc_request *req) /* NB the casts only avoid compiler warnings */ case 8: snprintf(remote_uuid.uuid, sizeof remote_uuid, - "NET_"LPX64"_UUID", (__u64)req->rq_peer.peer_nid); + "NET_"LPX64"_UUID", (__u64)req->rq_peer.peer_id.nid); break; case 4: snprintf(remote_uuid.uuid, sizeof remote_uuid, - "NET_%x_UUID", (__u32)req->rq_peer.peer_nid); + "NET_%x_UUID", (__u32)req->rq_peer.peer_id.nid); break; default: LBUG(); @@ -500,8 +642,9 @@ int target_handle_connect(struct ptlrpc_request *req) #endif if (export == NULL) { if (target->obd_recovering) { - CERROR("denying connection for new client %s@%s: " - "%d clients in recovery for %lds\n", cluuid.uuid, + CERROR("%s denying connection for new client %s@%s: " + "%d clients in recovery for %lds\n", target->obd_name, + cluuid.uuid, ptlrpc_peernid2str(&req->rq_peer, peer_str), target->obd_recoverable_clients, (target->obd_recovery_timer.expires-jiffies)/HZ); @@ -583,6 +726,8 @@ int target_handle_connect(struct ptlrpc_request *req) revimp->imp_dlm_fake = 1; revimp->imp_state = LUSTRE_IMP_FULL; class_import_put(revimp); + + rc = obd_connect_post(export, connect_flags); out: if (rc) req->rq_status = rc; @@ -656,21 +801,47 @@ void ptlrpc_free_clone( struct ptlrpc_request *req) OBD_FREE(req, sizeof *req); } -static void abort_delayed_replies(struct obd_device *obd) + + +static void target_release_saved_req(struct ptlrpc_request *req) +{ + class_export_put(req->rq_export); + OBD_FREE(req->rq_reqmsg, req->rq_reqlen); + OBD_FREE(req, sizeof *req); +} + +#ifdef __KERNEL__ +static void target_finish_recovery(struct obd_device *obd) { - struct ptlrpc_request *req; struct list_head *tmp, *n; + int rc; + + CWARN("%s: sending delayed replies to recovered clients\n", + obd->obd_name); + + ldlm_reprocess_all_ns(obd->obd_namespace); + + /* when recovery finished, cleanup orphans on mds and ost */ + if (OBT(obd) && OBP(obd, postrecov)) { + rc = OBP(obd, postrecov)(obd); + if (rc >= 0) + CWARN("%s: all clients recovered, %d MDS " + "orphans deleted\n", obd->obd_name, rc); + else + CERROR("postrecov failed %d\n", rc); + } + + list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { + struct ptlrpc_request *req; req = list_entry(tmp, struct ptlrpc_request, rq_list); - DEBUG_REQ(D_ERROR, req, "aborted:"); - req->rq_status = -ENOTCONN; - req->rq_type = PTL_RPC_MSG_ERR; - ptlrpc_reply(req); - class_export_put(req->rq_export); list_del(&req->rq_list); - OBD_FREE(req->rq_reqmsg, req->rq_reqlen); - OBD_FREE(req, sizeof *req); + DEBUG_REQ(D_ERROR, req, "delayed:"); + ptlrpc_reply(req); + target_release_saved_req(req); } + obd->obd_recovery_end = LTIME_S(CURRENT_TIME); + return; } static void abort_recovery_queue(struct obd_device *obd) @@ -681,6 +852,7 @@ static void abort_recovery_queue(struct obd_device *obd) list_for_each_safe(tmp, n, &obd->obd_recovery_queue) { req = list_entry(tmp, struct ptlrpc_request, rq_list); + list_del(&req->rq_list); DEBUG_REQ(D_ERROR, req, "aborted:"); req->rq_status = -ENOTCONN; req->rq_type = PTL_RPC_MSG_ERR; @@ -691,44 +863,76 @@ static void abort_recovery_queue(struct obd_device *obd) DEBUG_REQ(D_ERROR, req, "packing failed for abort-reply; skipping"); } + target_release_saved_req(req); + } +} +#endif + +/* Called from a cleanup function if the device is being cleaned up + forcefully. The exports should all have been disconnected already, + the only thing left to do is + - clear the recovery flags + - cancel the timer + - free queued requests and replies, but don't send replies + Because the obd_stopping flag is set, no new requests should be received. + +*/ +void target_cleanup_recovery(struct obd_device *obd) +{ + struct list_head *tmp, *n; + struct ptlrpc_request *req; + + spin_lock_bh(&obd->obd_processing_task_lock); + if (!obd->obd_recovering) { + spin_unlock_bh(&obd->obd_processing_task_lock); + EXIT; + return; + } + obd->obd_recovering = obd->obd_abort_recovery = 0; + target_cancel_recovery_timer(obd); + spin_unlock_bh(&obd->obd_processing_task_lock); + + list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { + req = list_entry(tmp, struct ptlrpc_request, rq_list); list_del(&req->rq_list); - class_export_put(req->rq_export); - OBD_FREE(req->rq_reqmsg, req->rq_reqlen); - OBD_FREE(req, sizeof *req); + LASSERT (req->rq_reply_state); + lustre_free_reply_state(req->rq_reply_state); + target_release_saved_req(req); } + list_for_each_safe(tmp, n, &obd->obd_recovery_queue) { + req = list_entry(tmp, struct ptlrpc_request, rq_list); + list_del(&req->rq_list); + LASSERT (req->rq_reply_state == 0); + target_release_saved_req(req); + } } +#ifdef __KERNEL__ static void target_abort_recovery(void *data) { struct obd_device *obd = data; - int rc; - - CERROR("disconnecting clients and aborting recovery\n"); + LASSERT(!obd->obd_recovering); - class_disconnect_exports(obd, 0); + class_disconnect_stale_exports(obd, 0); - /* when recovery was aborted, cleanup orphans on mds and ost */ - if (OBT(obd) && OBP(obd, postrecov)) { - rc = OBP(obd, postrecov)(obd); - if (rc >= 0) - CWARN("Cleanup %d orphans after recovery was aborted\n", - rc); - else - CERROR("postrecov failed %d\n", rc); - } + CERROR("%s: recovery period over; disconnecting unfinished clients.\n", + obd->obd_name); - abort_delayed_replies(obd); abort_recovery_queue(obd); + target_finish_recovery(obd); ptlrpc_run_recovery_over_upcall(obd); } +#endif static void target_recovery_expired(unsigned long castmeharder) { struct obd_device *obd = (struct obd_device *)castmeharder; CERROR("recovery timed out, aborting\n"); spin_lock_bh(&obd->obd_processing_task_lock); - obd->obd_abort_recovery = 1; + if (obd->obd_recovering) + obd->obd_abort_recovery = 1; + wake_up(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); } @@ -741,6 +945,7 @@ void target_cancel_recovery_timer(struct obd_device *obd) del_timer(&obd->obd_recovery_timer); } +#ifdef __KERNEL__ static void reset_recovery_timer(struct obd_device *obd) { spin_lock_bh(&obd->obd_processing_task_lock); @@ -753,7 +958,7 @@ static void reset_recovery_timer(struct obd_device *obd) mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT); spin_unlock_bh(&obd->obd_processing_task_lock); } - +#endif /* Only start it the first time called */ void target_start_recovery_timer(struct obd_device *obd) @@ -771,40 +976,10 @@ void target_start_recovery_timer(struct obd_device *obd) spin_unlock_bh(&obd->obd_processing_task_lock); } -static void target_finish_recovery(struct obd_device *obd) -{ - struct list_head *tmp, *n; - int rc2; - - ldlm_reprocess_all_ns(obd->obd_namespace); - - CWARN("%s: all clients recovered, calling postrecov\n", - obd->obd_name); - /* when recovery finished, cleanup orphans on mds and ost */ - if (OBT(obd) && OBP(obd, postrecov)) { - rc2 = OBP(obd, postrecov)(obd); - if (rc2 >= 0) - CWARN("%s: all clients recovered, %d MDS " - "orphans deleted\n", obd->obd_name, rc2); - else - CERROR("postrecov failed %d\n", rc2); - } - - CWARN("%s: recovery over, sending delayed replies\n", - obd->obd_name); - list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { - struct ptlrpc_request *req; - req = list_entry(tmp, struct ptlrpc_request, rq_list); - DEBUG_REQ(D_ERROR, req, "delayed:"); - ptlrpc_reply(req); - ptlrpc_free_clone(req); - } - ptlrpc_run_recovery_over_upcall(obd); -} - +#ifdef __KERNEL__ static int check_for_next_transno(struct obd_device *obd) { - struct ptlrpc_request *req; + struct ptlrpc_request *req = NULL; int wake_up = 0, connected, completed, queue_len, max; __u64 next_transno, req_transno; @@ -875,7 +1050,6 @@ target_next_replay_req(struct obd_device *obd) return req; } - static int target_recovery_thread(void *arg) { struct obd_device *obd = arg; @@ -969,6 +1143,7 @@ void target_stop_recovery_thread(struct obd_device *obd) spin_unlock_bh(&obd->obd_processing_task_lock); } } +#endif int target_queue_recovery_request(struct ptlrpc_request *req, struct obd_device *obd) @@ -1002,7 +1177,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req, if (obd->obd_recovery_data.trd_processing_task == current->pid || transno < obd->obd_next_recovery_transno) { /* Processing the queue right now, don't re-add. */ - lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); LASSERT(list_empty(&req->rq_list)); spin_unlock_bh(&obd->obd_processing_task_lock); return 1; @@ -1108,15 +1282,8 @@ target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id) } if (rc) { - DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc); - if (req->rq_reply_state == NULL) { - rc = lustre_pack_reply (req, 0, NULL, NULL); - if (rc != 0) { - CERROR ("can't allocate reply\n"); - return (rc); - } - } - req->rq_type = PTL_RPC_MSG_ERR; + req->rq_status = rc; + return (ptlrpc_error(req)); } else { DEBUG_REQ(D_NET, req, "sending reply"); }