X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Frecov_thread.c;h=d3b473dcd7a63acb879d66f5159ec2bb84265e6c;hb=ec9fc19d23d2ee251894634f95a71f32bbc43b20;hp=3d66eaea829f04e32873fcd59ab4e16d363e1b6d;hpb=090c677210ee2946d99c71412e4ff762bb300f4f;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index 3d66eae..d3b473d 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -4,20 +4,23 @@ * Copyright (C) 2003 Cluster File Systems, Inc. * Author: Andreas Dilger * - * This file is part of Lustre, http://www.lustre.org. + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. * * OST<->MDS recovery logging thread. * @@ -33,40 +36,44 @@ #endif #ifdef __KERNEL__ -#include +# include #else -# include +# include # include #endif -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include "ptlrpc_internal.h" -#define LLCD_SIZE 4096 - #ifdef __KERNEL__ static struct llog_commit_master lustre_lcm; static struct llog_commit_master *lcm = &lustre_lcm; -/* Allocate new commit structs in case we do not have enough */ +/* Allocate new commit structs in case we do not have enough. + * Make the llcd size small enough that it fits into a single page when we + * are sending/receiving it. */ static int llcd_alloc(void) { struct llog_canceld_ctxt *llcd; - int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies); + int llcd_size; - OBD_ALLOC(llcd, LLCD_SIZE + offset); + /* payload of lustre_msg V2 is bigger */ + llcd_size = 4096 - lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL); + OBD_ALLOC(llcd, + llcd_size + offsetof(struct llog_canceld_ctxt, llcd_cookies)); if (llcd == NULL) return -ENOMEM; + llcd->llcd_size = llcd_size; llcd->llcd_lcm = lcm; spin_lock(&lcm->lcm_llcd_lock); @@ -82,6 +89,7 @@ struct llog_canceld_ctxt *llcd_grab(void) { struct llog_canceld_ctxt *llcd; +repeat: spin_lock(&lcm->lcm_llcd_lock); if (list_empty(&lcm->lcm_llcd_free)) { spin_unlock(&lcm->lcm_llcd_lock); @@ -89,7 +97,8 @@ struct llog_canceld_ctxt *llcd_grab(void) CERROR("unable to allocate log commit data!\n"); return NULL; } - spin_lock(&lcm->lcm_llcd_lock); + /* check new llcd wasn't grabbed while lock dropped, b=7407 */ + goto repeat; } llcd = list_entry(lcm->lcm_llcd_free.next, typeof(*llcd), llcd_list); @@ -97,7 +106,6 @@ struct llog_canceld_ctxt *llcd_grab(void) atomic_dec(&lcm->lcm_llcd_numfree); spin_unlock(&lcm->lcm_llcd_lock); - llcd->llcd_tries = 0; llcd->llcd_cookiebytes = 0; return llcd; @@ -106,10 +114,10 @@ EXPORT_SYMBOL(llcd_grab); static void llcd_put(struct llog_canceld_ctxt *llcd) { - int offset = offsetof(struct llog_canceld_ctxt, llcd_cookies); - if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) { - OBD_FREE(llcd, LLCD_SIZE + offset); + int llcd_size = llcd->llcd_size + + offsetof(struct llog_canceld_ctxt, llcd_cookies); + OBD_FREE(llcd, llcd_size); } else { spin_lock(&lcm->lcm_llcd_lock); list_add(&llcd->llcd_list, &lcm->lcm_llcd_free); @@ -125,7 +133,7 @@ void llcd_send(struct llog_canceld_ctxt *llcd) list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending); spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock); - wake_up_nr(&llcd->llcd_lcm->lcm_waitq, 1); + cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1); } EXPORT_SYMBOL(llcd_send); @@ -143,47 +151,45 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, LASSERT(ctxt); + mutex_down(&ctxt->loc_sem); if (ctxt->loc_imp == NULL) { - CWARN("no import for ctxt %p\n", ctxt); - RETURN(0); + CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt); + GOTO(out, rc = 0); } - if (count == 0 || cookies == NULL) { - down(&ctxt->loc_sem); - if (ctxt->loc_llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW)) - GOTO(out, rc); - - llcd = ctxt->loc_llcd; - GOTO(send_now, rc); - } - - down(&ctxt->loc_sem); llcd = ctxt->loc_llcd; - if (llcd == NULL) { - llcd = llcd_grab(); + + if (count > 0 && cookies != NULL) { if (llcd == NULL) { - CERROR("couldn't get an llcd - dropped "LPX64":%x+%u\n", - cookies->lgc_lgl.lgl_oid, - cookies->lgc_lgl.lgl_ogen, cookies->lgc_index); - GOTO(out, rc = -ENOMEM); + llcd = llcd_grab(); + if (llcd == NULL) { + CERROR("couldn't get an llcd - dropped "LPX64 + ":%x+%u\n", + cookies->lgc_lgl.lgl_oid, + cookies->lgc_lgl.lgl_ogen, + cookies->lgc_index); + GOTO(out, rc = -ENOMEM); + } + llcd->llcd_ctxt = ctxt; + ctxt->loc_llcd = llcd; } - llcd->llcd_ctxt = ctxt; - ctxt->loc_llcd = llcd; - } - memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, cookies, - sizeof(*cookies)); - llcd->llcd_cookiebytes += sizeof(*cookies); + memcpy((char *)llcd->llcd_cookies + llcd->llcd_cookiebytes, + cookies, sizeof(*cookies)); + llcd->llcd_cookiebytes += sizeof(*cookies); + } else { + if (llcd == NULL || !(flags & OBD_LLOG_FL_SENDNOW)) + GOTO(out, rc); + } -send_now: - if ((LLCD_SIZE - llcd->llcd_cookiebytes < sizeof(*cookies) || - flags & OBD_LLOG_FL_SENDNOW)) { - CDEBUG(D_HA, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt); + if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) || + (flags & OBD_LLOG_FL_SENDNOW)) { + CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt); ctxt->loc_llcd = NULL; llcd_send(llcd); } out: - up(&ctxt->loc_sem); + mutex_up(&ctxt->loc_sem); return rc; } EXPORT_SYMBOL(llog_obd_repl_cancel); @@ -194,15 +200,15 @@ int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp) ENTRY; if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) { - down(&ctxt->loc_sem); + CDEBUG(D_RPCTRACE,"reverse import disconnect, put llcd %p:%p\n", + ctxt->loc_llcd, ctxt); + mutex_down(&ctxt->loc_sem); if (ctxt->loc_llcd != NULL) { - CWARN("import will be destroyed, put " - "llcd %p:%p\n", ctxt->loc_llcd, ctxt); llcd_put(ctxt->loc_llcd); ctxt->loc_llcd = NULL; - ctxt->loc_imp = NULL; } - up(&ctxt->loc_sem); + ctxt->loc_imp = NULL; + mutex_up(&ctxt->loc_sem); } else { rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW); } @@ -216,39 +222,35 @@ static int log_commit_thread(void *arg) struct llog_commit_master *lcm = arg; struct llog_commit_daemon *lcd; struct llog_canceld_ctxt *llcd, *n; - unsigned long flags; + struct obd_import *import = NULL; ENTRY; OBD_ALLOC(lcd, sizeof(*lcd)); if (lcd == NULL) RETURN(-ENOMEM); - lock_kernel(); - ptlrpc_daemonize(); /* thread never needs to do IO */ - - SIGNAL_MASK_LOCK(current, flags); - sigfillset(¤t->blocked); - RECALC_SIGPENDING; - SIGNAL_MASK_UNLOCK(current, flags); - spin_lock(&lcm->lcm_thread_lock); - THREAD_NAME(current->comm, "ll_log_commit_%d", - atomic_read(&lcm->lcm_thread_total)); + THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1, + "ll_log_comt_%02d", atomic_read(&lcm->lcm_thread_total)); atomic_inc(&lcm->lcm_thread_total); spin_unlock(&lcm->lcm_thread_lock); - unlock_kernel(); - INIT_LIST_HEAD(&lcd->lcd_lcm_list); - INIT_LIST_HEAD(&lcd->lcd_llcd_list); + ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */ + + CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list); + CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list); lcd->lcd_lcm = lcm; - CDEBUG(D_HA, "%s started\n", current->comm); + CDEBUG(D_HA, "%s started\n", cfs_curproc_comm()); do { struct ptlrpc_request *request; - struct obd_import *import = NULL; struct list_head *sending_list; int rc = 0; + if (import) + class_import_put(import); + import = NULL; + /* If we do not have enough pages available, allocate some */ while (atomic_read(&lcm->lcm_llcd_numfree) < lcm->lcm_llcd_minfree) { @@ -274,6 +276,9 @@ static int log_commit_thread(void *arg) sending_list = &lcm->lcm_llcd_pending; resend: + if (import) + class_import_put(import); + import = NULL; if (lcm->lcm_flags & LLOG_LCM_FL_EXIT) { lcm->lcm_llcd_maxfree = 0; lcm->lcm_llcd_minfree = 0; @@ -302,6 +307,8 @@ static int log_commit_thread(void *arg) typeof(*llcd), llcd_list); LASSERT(llcd->llcd_lcm == lcm); import = llcd->llcd_ctxt->loc_imp; + if (import) + class_import_get(import); } list_for_each_entry_safe(llcd, n, sending_list, llcd_list) { LASSERT(llcd->llcd_lcm == lcm); @@ -322,30 +329,40 @@ static int log_commit_thread(void *arg) /* We are the only one manipulating our local list - no lock */ list_for_each_entry_safe(llcd,n, &lcd->lcd_llcd_list,llcd_list){ - char *bufs[1] = {(char *)llcd->llcd_cookies}; + int size[2] = { sizeof(struct ptlrpc_body), + llcd->llcd_cookiebytes }; + char *bufs[2] = { NULL, (char *)llcd->llcd_cookies }; list_del(&llcd->llcd_list); if (llcd->llcd_cookiebytes == 0) { - CDEBUG(D_HA, "put empty llcd %p:%p\n", + CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n", llcd, llcd->llcd_ctxt); llcd_put(llcd); continue; } - down(&llcd->llcd_ctxt->loc_sem); + mutex_down(&llcd->llcd_ctxt->loc_sem); if (llcd->llcd_ctxt->loc_imp == NULL) { - up(&llcd->llcd_ctxt->loc_sem); + mutex_up(&llcd->llcd_ctxt->loc_sem); CWARN("import will be destroyed, put " "llcd %p:%p\n", llcd, llcd->llcd_ctxt); llcd_put(llcd); continue; } + mutex_up(&llcd->llcd_ctxt->loc_sem); + + if (!import || (import == LP_POISON) || + (import->imp_client == LP_POISON)) { + CERROR("No import %p (llcd=%p, ctxt=%p)\n", + import, llcd, llcd->llcd_ctxt); + llcd_put(llcd); + continue; + } - request = ptlrpc_prep_req(import, OBD_LOG_CANCEL, 1, - &llcd->llcd_cookiebytes, - bufs); - up(&llcd->llcd_ctxt->loc_sem); + OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_RECOV, 10); + request = ptlrpc_prep_req(import, LUSTRE_LOG_VERSION, + OBD_LOG_CANCEL, 2, size,bufs); if (request == NULL) { rc = -ENOMEM; CERROR("error preparing commit: rc %d\n", rc); @@ -353,24 +370,28 @@ static int log_commit_thread(void *arg) spin_lock(&lcm->lcm_llcd_lock); list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend); - INIT_LIST_HEAD(&lcd->lcd_llcd_list); + CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list); spin_unlock(&lcm->lcm_llcd_lock); break; } - request->rq_replen = lustre_msg_size(0, NULL); - down(&llcd->llcd_ctxt->loc_sem); + /* XXX FIXME bug 249, 5515 */ + request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; + request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; + + ptlrpc_req_set_repsize(request, 1, NULL); + mutex_down(&llcd->llcd_ctxt->loc_sem); if (llcd->llcd_ctxt->loc_imp == NULL) { - up(&llcd->llcd_ctxt->loc_sem); + mutex_up(&llcd->llcd_ctxt->loc_sem); CWARN("import will be destroyed, put " "llcd %p:%p\n", llcd, llcd->llcd_ctxt); llcd_put(llcd); ptlrpc_req_finished(request); continue; } + mutex_up(&llcd->llcd_ctxt->loc_sem); rc = ptlrpc_queue_wait(request); ptlrpc_req_finished(request); - up(&llcd->llcd_ctxt->loc_sem); /* If the RPC failed, we put this and the remaining * messages onto the resend list for another time. */ @@ -379,28 +400,11 @@ static int log_commit_thread(void *arg) continue; } -#if 0 /* FIXME just put llcd, not put it on resend list */ - spin_lock(&lcm->lcm_llcd_lock); - list_splice(&lcd->lcd_llcd_list, &lcm->lcm_llcd_resend); - if (++llcd->llcd_tries < 5) { - CERROR("commit %p failed on attempt %d: rc %d\n", - llcd, llcd->llcd_tries, rc); - - list_add_tail(&llcd->llcd_list, - &lcm->lcm_llcd_resend); - spin_unlock(&lcm->lcm_llcd_lock); - } else { - spin_unlock(&lcm->lcm_llcd_lock); -#endif - CERROR("commit %p:%p drop %d cookies: rc %d\n", - llcd, llcd->llcd_ctxt, - (int)(llcd->llcd_cookiebytes / - sizeof(*llcd->llcd_cookies)), rc); - llcd_put(llcd); -#if 0 - } - break; -#endif + CERROR("commit %p:%p drop %d cookies: rc %d\n", + llcd, llcd->llcd_ctxt, + (int)(llcd->llcd_cookiebytes / + sizeof(*llcd->llcd_cookies)), rc); + llcd_put(llcd); } if (rc == 0) { @@ -410,6 +414,9 @@ static int log_commit_thread(void *arg) } } while(1); + if (import) + class_import_put(import); + /* If we are force exiting, just drop all of the cookies. */ if (lcm->lcm_flags & LLOG_LCM_FL_EXIT_FORCE) { spin_lock(&lcm->lcm_llcd_lock); @@ -427,12 +434,13 @@ static int log_commit_thread(void *arg) spin_unlock(&lcm->lcm_thread_lock); OBD_FREE(lcd, sizeof(*lcd)); + CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm()); + spin_lock(&lcm->lcm_thread_lock); atomic_dec(&lcm->lcm_thread_total); spin_unlock(&lcm->lcm_thread_lock); - wake_up(&lcm->lcm_waitq); + cfs_waitq_signal(&lcm->lcm_waitq); - CDEBUG(D_HA, "%s exiting\n", current->comm); return 0; } @@ -444,7 +452,7 @@ int llog_start_commit_thread(void) if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max) RETURN(0); - rc = kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES); + rc = cfs_kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES); if (rc < 0) { CERROR("error starting thread #%d: %d\n", atomic_read(&lcm->lcm_thread_total), rc); @@ -464,14 +472,14 @@ static struct llog_process_args { int llog_init_commit_master(void) { - INIT_LIST_HEAD(&lcm->lcm_thread_busy); - INIT_LIST_HEAD(&lcm->lcm_thread_idle); + CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy); + CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle); spin_lock_init(&lcm->lcm_thread_lock); atomic_set(&lcm->lcm_thread_numidle, 0); - init_waitqueue_head(&lcm->lcm_waitq); - INIT_LIST_HEAD(&lcm->lcm_llcd_pending); - INIT_LIST_HEAD(&lcm->lcm_llcd_resend); - INIT_LIST_HEAD(&lcm->lcm_llcd_free); + cfs_waitq_init(&lcm->lcm_waitq); + CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_pending); + CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_resend); + CFS_INIT_LIST_HEAD(&lcm->lcm_llcd_free); spin_lock_init(&lcm->lcm_llcd_lock); atomic_set(&lcm->lcm_llcd_numfree, 0); lcm->lcm_llcd_minfree = 0; @@ -486,7 +494,7 @@ int llog_cleanup_commit_master(int force) lcm->lcm_flags |= LLOG_LCM_FL_EXIT; if (force) lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE; - wake_up(&lcm->lcm_waitq); + cfs_waitq_signal(&lcm->lcm_waitq); wait_event_interruptible(lcm->lcm_waitq, atomic_read(&lcm->lcm_thread_total) == 0); @@ -500,19 +508,11 @@ static int log_process_thread(void *args) void *cb = data->llpa_cb; struct llog_logid logid = *(struct llog_logid *)(data->llpa_arg); struct llog_handle *llh = NULL; - unsigned long flags; int rc; ENTRY; - up(&data->llpa_sem); - lock_kernel(); - ptlrpc_daemonize(); /* thread never needs to do IO */ - - SIGNAL_MASK_LOCK(current, flags); - sigfillset(¤t->blocked); - RECALC_SIGPENDING; - SIGNAL_MASK_UNLOCK(current, flags); - unlock_kernel(); + mutex_up(&data->llpa_sem); + ptlrpc_daemonize("llog_process"); /* thread does IO to log files */ rc = llog_create(ctxt, &llh, &logid, NULL); if (rc) { @@ -549,12 +549,12 @@ static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg) int rc; ENTRY; - down(&llpa.llpa_sem); + mutex_down(&llpa.llpa_sem); llpa.llpa_ctxt = ctxt; llpa.llpa_cb = handle; llpa.llpa_arg = arg; - rc = kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES); + rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES); if (rc < 0) CERROR("error starting log_process_thread: %d\n", rc); else { @@ -579,17 +579,17 @@ int llog_repl_connect(struct llog_ctxt *ctxt, int count, llog_sync(ctxt, NULL); } - down(&ctxt->loc_sem); + mutex_down(&ctxt->loc_sem); ctxt->loc_gen = *gen; llcd = llcd_grab(); if (llcd == NULL) { CERROR("couldn't get an llcd\n"); - up(&ctxt->loc_sem); + mutex_up(&ctxt->loc_sem); RETURN(-ENOMEM); } llcd->llcd_ctxt = ctxt; ctxt->loc_llcd = llcd; - up(&ctxt->loc_sem); + mutex_up(&ctxt->loc_sem); rc = llog_recovery_generic(ctxt, ctxt->llog_proc_cb, logid); if (rc != 0)