#ifdef __KERNEL__
-static struct llog_commit_master lustre_lcm;
-static struct llog_commit_master *lcm = &lustre_lcm;
-
/* Allocate new commit structs in case we do not have enough.
* Make the llcd size small enough that it fits into a single page when we
* are sending/receiving it. */
-static int llcd_alloc(void)
+static int llcd_alloc(struct llog_commit_master *lcm)
{
struct llog_canceld_ctxt *llcd;
int llcd_size;
}
/* Get a free cookie struct from the list */
-struct llog_canceld_ctxt *llcd_grab(void)
+static struct llog_canceld_ctxt *llcd_grab(struct llog_commit_master *lcm)
{
struct llog_canceld_ctxt *llcd;
spin_lock(&lcm->lcm_llcd_lock);
if (list_empty(&lcm->lcm_llcd_free)) {
spin_unlock(&lcm->lcm_llcd_lock);
- if (llcd_alloc() < 0) {
+ if (llcd_alloc(lcm) < 0) {
CERROR("unable to allocate log commit data!\n");
return NULL;
}
return llcd;
}
-EXPORT_SYMBOL(llcd_grab);
static void llcd_put(struct llog_canceld_ctxt *llcd)
{
+ struct llog_commit_master *lcm = llcd->llcd_lcm;
+
+ llog_ctxt_put(llcd->llcd_ctxt);
if (atomic_read(&lcm->lcm_llcd_numfree) >= lcm->lcm_llcd_maxfree) {
int llcd_size = llcd->llcd_size +
offsetof(struct llog_canceld_ctxt, llcd_cookies);
}
/* Send some cookies to the appropriate target */
-void llcd_send(struct llog_canceld_ctxt *llcd)
+static void llcd_send(struct llog_canceld_ctxt *llcd)
{
- spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
- list_add_tail(&llcd->llcd_list, &llcd->llcd_lcm->lcm_llcd_pending);
- spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
-
+ if (!(llcd->llcd_lcm->lcm_flags & LLOG_LCM_FL_EXIT)) {
+ spin_lock(&llcd->llcd_lcm->lcm_llcd_lock);
+ list_add_tail(&llcd->llcd_list,
+ &llcd->llcd_lcm->lcm_llcd_pending);
+ spin_unlock(&llcd->llcd_lcm->lcm_llcd_lock);
+ }
cfs_waitq_signal_nr(&llcd->llcd_lcm->lcm_waitq, 1);
}
-EXPORT_SYMBOL(llcd_send);
/* deleted objects have a commit callback that cancels the MDS
* log record for the deletion. The commit callback calls this
mutex_down(&ctxt->loc_sem);
if (ctxt->loc_imp == NULL) {
- CDEBUG(D_HA, "no import for ctxt %p\n", ctxt);
+ CDEBUG(D_RPCTRACE, "no import for ctxt %p\n", ctxt);
GOTO(out, rc = 0);
}
if (count > 0 && cookies != NULL) {
if (llcd == NULL) {
- llcd = llcd_grab();
+ llcd = llcd_grab(ctxt->loc_lcm);
if (llcd == NULL) {
CERROR("couldn't get an llcd - dropped "LPX64
":%x+%u\n",
cookies->lgc_index);
GOTO(out, rc = -ENOMEM);
}
- llcd->llcd_ctxt = ctxt;
+ llcd->llcd_ctxt = llog_ctxt_get(ctxt);
ctxt->loc_llcd = llcd;
}
if ((llcd->llcd_size - llcd->llcd_cookiebytes) < sizeof(*cookies) ||
(flags & OBD_LLOG_FL_SENDNOW)) {
- CDEBUG(D_HA, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
+ CDEBUG(D_RPCTRACE, "send llcd %p:%p\n", llcd, llcd->llcd_ctxt);
ctxt->loc_llcd = NULL;
llcd_send(llcd);
}
ENTRY;
if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
- CDEBUG(D_HA, "reverse import disconnected, put llcd %p:%p\n",
+ CDEBUG(D_RPCTRACE,"reverse import disconnect, put llcd %p:%p\n",
ctxt->loc_llcd, ctxt);
mutex_down(&ctxt->loc_sem);
if (ctxt->loc_llcd != NULL) {
}
EXPORT_SYMBOL(llog_obd_repl_sync);
+static void llog_lcm_dec(struct llog_commit_master *lcm)
+{
+ atomic_dec(&lcm->lcm_thread_total);
+ cfs_waitq_signal(&lcm->lcm_waitq);
+}
+
static int log_commit_thread(void *arg)
{
- struct llog_commit_master *lcm = arg;
- struct llog_commit_daemon *lcd;
+ struct llog_commit_daemon *lcd = arg;
+ struct llog_commit_master *lcm = lcd->lcd_lcm;
struct llog_canceld_ctxt *llcd, *n;
struct obd_import *import = NULL;
ENTRY;
- OBD_ALLOC(lcd, sizeof(*lcd));
- if (lcd == NULL)
- RETURN(-ENOMEM);
-
- spin_lock(&lcm->lcm_thread_lock);
THREAD_NAME(cfs_curproc_comm(), CFS_CURPROC_COMM_MAX - 1,
- "ll_log_comt_%02d", atomic_read(&lcm->lcm_thread_total));
- atomic_inc(&lcm->lcm_thread_total);
- spin_unlock(&lcm->lcm_thread_lock);
-
+ "ll_log_comt_%02d", lcd->lcd_index);
+
ptlrpc_daemonize(cfs_curproc_comm()); /* thread never needs to do IO */
-
- CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list);
- CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
- lcd->lcd_lcm = lcm;
-
CDEBUG(D_HA, "%s started\n", cfs_curproc_comm());
+
do {
struct ptlrpc_request *request;
struct list_head *sending_list;
/* If we do not have enough pages available, allocate some */
while (atomic_read(&lcm->lcm_llcd_numfree) <
lcm->lcm_llcd_minfree) {
- if (llcd_alloc() < 0)
+ if (llcd_alloc(lcm) < 0)
break;
}
if (atomic_read(&lcm->lcm_thread_numidle) <= 1 &&
atomic_read(&lcm->lcm_thread_total) < lcm->lcm_thread_max) {
- rc = llog_start_commit_thread();
+ rc = llog_start_commit_thread(lcm);
if (rc < 0)
CERROR("error starting thread: rc %d\n", rc);
}
list_del(&llcd->llcd_list);
if (llcd->llcd_cookiebytes == 0) {
- CDEBUG(D_HA, "put empty llcd %p:%p\n",
+ CDEBUG(D_RPCTRACE, "put empty llcd %p:%p\n",
llcd, llcd->llcd_ctxt);
llcd_put(llcd);
continue;
break;
}
- /* XXX FIXME bug 249, 5515 */
+ /* bug 5515 */
request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+ ptlrpc_at_set_req_timeout(request);
ptlrpc_req_set_repsize(request, 1, NULL);
mutex_down(&llcd->llcd_ctxt->loc_sem);
llcd_put(llcd);
}
- spin_lock(&lcm->lcm_thread_lock);
- list_del(&lcd->lcd_lcm_list);
- spin_unlock(&lcm->lcm_thread_lock);
- OBD_FREE(lcd, sizeof(*lcd));
CDEBUG(D_HA, "%s exiting\n", cfs_curproc_comm());
spin_lock(&lcm->lcm_thread_lock);
- atomic_dec(&lcm->lcm_thread_total);
+ list_del(&lcd->lcd_lcm_list);
spin_unlock(&lcm->lcm_thread_lock);
- cfs_waitq_signal(&lcm->lcm_waitq);
+ OBD_FREE_PTR(lcd);
+ llog_lcm_dec(lcm);
- return 0;
+ RETURN(0);
}
-int llog_start_commit_thread(void)
+int llog_start_commit_thread(struct llog_commit_master *lcm)
{
- int rc;
+ struct llog_commit_daemon *lcd;
+ int rc, index;
ENTRY;
if (atomic_read(&lcm->lcm_thread_total) >= lcm->lcm_thread_max)
RETURN(0);
- rc = cfs_kernel_thread(log_commit_thread, lcm, CLONE_VM | CLONE_FILES);
+ /* Check whether it will be cleanup llog commit thread first,
+ * If not, increate the lcm_thread_total count to prevent the
+ * lcm being freed when the log_commit_thread is started */
+ spin_lock(&lcm->lcm_thread_lock);
+ if (!lcm->lcm_flags & LLOG_LCM_FL_EXIT) {
+ atomic_inc(&lcm->lcm_thread_total);
+ index = atomic_read(&lcm->lcm_thread_total);
+ spin_unlock(&lcm->lcm_thread_lock);
+ } else {
+ spin_unlock(&lcm->lcm_thread_lock);
+ RETURN(0);
+ }
+
+ OBD_ALLOC_PTR(lcd);
+ if (lcd == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+
+ CFS_INIT_LIST_HEAD(&lcd->lcd_lcm_list);
+ CFS_INIT_LIST_HEAD(&lcd->lcd_llcd_list);
+ lcd->lcd_index = index;
+ lcd->lcd_lcm = lcm;
+
+ rc = cfs_kernel_thread(log_commit_thread, lcd, CLONE_VM | CLONE_FILES);
+cleanup:
if (rc < 0) {
- CERROR("error starting thread #%d: %d\n",
- atomic_read(&lcm->lcm_thread_total), rc);
+ CERROR("error starting thread #%d: %d\n", lcd->lcd_index, rc);
+ llog_lcm_dec(lcm);
+ if (lcd)
+ OBD_FREE_PTR(lcd);
RETURN(rc);
}
-
RETURN(0);
}
EXPORT_SYMBOL(llog_start_commit_thread);
void *llpa_arg;
} llpa;
-int llog_init_commit_master(void)
+int llog_init_commit_master(struct llog_commit_master *lcm)
{
CFS_INIT_LIST_HEAD(&lcm->lcm_thread_busy);
CFS_INIT_LIST_HEAD(&lcm->lcm_thread_idle);
sema_init(&llpa.llpa_sem, 1);
return 0;
}
+EXPORT_SYMBOL(llog_init_commit_master);
-int llog_cleanup_commit_master(int force)
+int llog_cleanup_commit_master(struct llog_commit_master *lcm,
+ int force)
{
+ spin_lock(&lcm->lcm_thread_lock);
lcm->lcm_flags |= LLOG_LCM_FL_EXIT;
if (force)
lcm->lcm_flags |= LLOG_LCM_FL_EXIT_FORCE;
+
+ spin_unlock(&lcm->lcm_thread_lock);
+
cfs_waitq_signal(&lcm->lcm_waitq);
wait_event_interruptible(lcm->lcm_waitq,
atomic_read(&lcm->lcm_thread_total) == 0);
return 0;
}
+EXPORT_SYMBOL(llog_cleanup_commit_master);
static int log_process_thread(void *args)
{
rc = llog_create(ctxt, &llh, &logid, NULL);
if (rc) {
CERROR("llog_create failed %d\n", rc);
- RETURN(rc);
+ GOTO(out, rc);
}
rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL);
if (rc) {
CERROR("llog_init_handle failed %d\n", rc);
- GOTO(out, rc);
+ GOTO(release_llh, rc);
}
if (cb) {
CDEBUG(D_HA, "send llcd %p:%p forcibly after recovery\n",
ctxt->loc_llcd, ctxt);
llog_sync(ctxt, NULL);
-out:
+
+release_llh:
rc = llog_cat_put(llh);
if (rc)
CERROR("llog_cat_put failed %d\n", rc);
-
+out:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
static int llog_recovery_generic(struct llog_ctxt *ctxt, void *handle,void *arg)
{
+ struct obd_device *obd = ctxt->loc_obd;
int rc;
ENTRY;
+ if (obd->obd_stopping)
+ RETURN(-ENODEV);
+
mutex_down(&llpa.llpa_sem);
- llpa.llpa_ctxt = ctxt;
llpa.llpa_cb = handle;
llpa.llpa_arg = arg;
-
+ llpa.llpa_ctxt = llog_get_context(ctxt->loc_obd, ctxt->loc_idx);
+ if (!llpa.llpa_ctxt) {
+ mutex_up(&llpa.llpa_sem);
+ RETURN(-ENODEV);
+ }
rc = cfs_kernel_thread(log_process_thread, &llpa, CLONE_VM | CLONE_FILES);
if (rc < 0)
CERROR("error starting log_process_thread: %d\n", rc);
mutex_down(&ctxt->loc_sem);
ctxt->loc_gen = *gen;
- llcd = llcd_grab();
+ llcd = llcd_grab(ctxt->loc_lcm);
if (llcd == NULL) {
CERROR("couldn't get an llcd\n");
mutex_up(&ctxt->loc_sem);
RETURN(-ENOMEM);
}
- llcd->llcd_ctxt = ctxt;
+ llcd->llcd_ctxt = llog_ctxt_get(ctxt);
ctxt->loc_llcd = llcd;
mutex_up(&ctxt->loc_sem);