r=tappro,wangdi
- fixes ost cleanup issue due to missed llcd_put() in the case ost does not receive disconnect from mds;
- do not sleep on hanging llcd. Instead assert on it _after_ stopping recov_thread's ptlrpcd which should kill any remeining llcds;
- fixes and cleanups, comments.
*/
atomic_t lcm_count;
/**
*/
atomic_t lcm_count;
/**
- * Ptlrpc requests set. All cancel rpcs go via this request set.
- */
- struct ptlrpc_request_set *lcm_set;
- /**
* Thread control structure. Used for control commit thread.
*/
struct ptlrpcd_ctl lcm_pc;
* Thread control structure. Used for control commit thread.
*/
struct ptlrpcd_ctl lcm_pc;
- /**
- * Busy resources waitq
+ /*
+ * Lock protecting list of llcds.
+ spinlock_t lcm_lock;
+ /*
+ * Llcds in flight for debugging purposes.
+ */
+ struct list_head lcm_llcds;
/**
* Commit thread name buffer. Only used for thread start.
*/
/**
* Commit thread name buffer. Only used for thread start.
*/
* left in llcd to cookie comming cookies.
*/
int llcd_size;
* left in llcd to cookie comming cookies.
*/
int llcd_size;
+ /*
+ * Link to lcm llcds list.
+ */
+ struct list_head llcd_list;
/**
* Current llcd size while gathering cookies. This should not be
* more than ->llcd_size. Used for determining if we need to
/**
* Current llcd size while gathering cookies. This should not be
* more than ->llcd_size. Used for determining if we need to
if (cb) {
rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
if (cb) {
rc = llog_cat_process(llh, (llog_cb_t)cb, NULL);
- if (rc != LLOG_PROC_BREAK)
+ if (rc != LLOG_PROC_BREAK && rc != 0)
CERROR("llog_cat_process() failed %d\n", rc);
} else {
CWARN("No callback function for recovery\n");
CERROR("llog_cat_process() failed %d\n", rc);
} else {
CWARN("No callback function for recovery\n");
class_import_put(ctxt->loc_imp);
ctxt->loc_imp = NULL;
}
class_import_put(ctxt->loc_imp);
ctxt->loc_imp = NULL;
}
+ LASSERT(ctxt->loc_llcd == NULL);
OBD_FREE_PTR(ctxt);
return;
}
OBD_FREE_PTR(ctxt);
return;
}
ENTRY;
ctxt = llog_group_get_ctxt(&obd->obd_olg, LLOG_MDS_OST_REPL_CTXT);
ENTRY;
ctxt = llog_group_get_ctxt(&obd->obd_olg, LLOG_MDS_OST_REPL_CTXT);
- LASSERT(ctxt != NULL);
- mutex_down(&ctxt->loc_sem);
- if (ctxt->loc_imp) {
- /*
- * Balance class_import_get() in llog_receptor_accept(). This
- * is safe to do here, as llog is already synchronized and its
- * import may go.
- */
- class_import_put(ctxt->loc_imp);
- ctxt->loc_imp = NULL;
+ if (ctxt) {
+ mutex_down(&ctxt->loc_sem);
+ if (ctxt->loc_imp) {
+ /*
+ * Make sure that no cached llcds left in recov_thread.
+ * We actually do sync in disconnect time, but disconnect
+ * may not come being marked rq_no_resend = 1.
+ */
+ llog_sync(ctxt, NULL);
+
+ /*
+ * Balance class_import_get() in llog_receptor_accept().
+ * This is safe to do, as llog is already synchronized
+ * and its import may go.
+ */
+ class_import_put(ctxt->loc_imp);
+ ctxt->loc_imp = NULL;
+ }
+ mutex_up(&ctxt->loc_sem);
+ llog_ctxt_put(ctxt);
- mutex_up(&ctxt->loc_sem);
- llog_ctxt_put(ctxt);
if (filter->fo_lcm) {
llog_recov_thread_fini(filter->fo_lcm, obd->obd_force);
if (filter->fo_lcm) {
llog_recov_thread_fini(filter->fo_lcm, obd->obd_force);
(dexp == olg_min->olg_exp || dexp == NULL)) {
int err;
ctxt = llog_group_get_ctxt(olg_min,
(dexp == olg_min->olg_exp || dexp == NULL)) {
int err;
ctxt = llog_group_get_ctxt(olg_min,
- LLOG_MDS_OST_REPL_CTXT);
- LASSERT(ctxt != NULL);
- err = llog_sync(ctxt, olg_min->olg_exp);
- llog_ctxt_put(ctxt);
- if (err)
- CERROR("error flushing logs to MDS: rc %d\n",
- err);
+ LLOG_MDS_OST_REPL_CTXT);
+ if (ctxt) {
+ err = llog_sync(ctxt, olg_min->olg_exp);
+ llog_ctxt_put(ctxt);
+ if (err) {
+ CERROR("error flushing logs to MDS: "
+ "rc %d\n", err);
+ }
+ }
}
} while (olg_min != NULL);
}
}
} while (olg_min != NULL);
}
-/* also incredibly similar to mds_disconnect */
+/* Also incredibly similar to mds_disconnect */
static int filter_disconnect(struct obd_export *exp)
{
struct obd_device *obd = exp->exp_obd;
static int filter_disconnect(struct obd_export *exp)
{
struct obd_device *obd = exp->exp_obd;
filter_grant_sanity_check(obd, __FUNCTION__);
filter_grant_discard(exp);
filter_grant_sanity_check(obd, __FUNCTION__);
filter_grant_discard(exp);
+ /* Flush any remaining cancel messages out to the target */
+ filter_sync_llogs(obd, exp);
+
/* Disconnect early so that clients can't keep using export */
rc = class_disconnect(exp);
if (exp->exp_obd->obd_namespace != NULL)
/* Disconnect early so that clients can't keep using export */
rc = class_disconnect(exp);
if (exp->exp_obd->obd_namespace != NULL)
lprocfs_exp_cleanup(exp);
lprocfs_exp_cleanup(exp);
- /* flush any remaining cancel messages out to the target */
- filter_sync_llogs(obd, exp);
class_export_put(exp);
RETURN(rc);
}
class_export_put(exp);
RETURN(rc);
}
filter = &exp->exp_obd->u.filter;
filter = &exp->exp_obd->u.filter;
- /* an objid of zero is taken to mean "sync whole filesystem" */
+ /* An objid of zero is taken to mean "sync whole filesystem" */
if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
rc = fsfilt_sync(exp->exp_obd, filter->fo_obt.obt_sb);
if (!oa || !(oa->o_valid & OBD_MD_FLID)) {
rc = fsfilt_sync(exp->exp_obd, filter->fo_obt.obt_sb);
- /* flush any remaining cancel messages out to the target */
+ /* Flush any remaining cancel messages out to the target */
filter_sync_llogs(exp->exp_obd, exp);
RETURN(rc);
}
filter_sync_llogs(exp->exp_obd, exp);
RETURN(rc);
}
static int osc_disconnect(struct obd_export *exp)
{
struct obd_device *obd = class_exp2obd(exp);
static int osc_disconnect(struct obd_export *exp)
{
struct obd_device *obd = class_exp2obd(exp);
- struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
+ struct llog_ctxt *ctxt;
- if (obd->u.cli.cl_conn_count == 1)
- /* flush any remaining cancel messages out to the target */
- llog_sync(ctxt, exp);
-
- llog_ctxt_put(ctxt);
+ ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
+ if (ctxt) {
+ if (obd->u.cli.cl_conn_count == 1) {
+ /* Flush any remaining cancel messages out to the
+ * target */
+ llog_sync(ctxt, exp);
+ }
+ llog_ctxt_put(ctxt);
+ } else {
+ CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
+ obd);
+ }
rc = client_disconnect_export(exp);
return rc;
rc = client_disconnect_export(exp);
return rc;
request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
rq_list);
request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
rq_list);
- list_del(&request->rq_list);
+ list_del_init(&request->rq_list);
spin_unlock(&pool->prp_lock);
LASSERT(request->rq_reqbuf);
spin_unlock(&pool->prp_lock);
LASSERT(request->rq_reqbuf);
req->rq_import_generation = imp->imp_generation;
if (ptlrpc_import_delay_req(imp, req, &rc)) {
req->rq_import_generation = imp->imp_generation;
if (ptlrpc_import_delay_req(imp, req, &rc)) {
- spin_lock (&req->rq_lock);
+ spin_lock(&req->rq_lock);
- spin_unlock (&req->rq_lock);
+ spin_unlock(&req->rq_lock);
DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
"(%s != %s)", lustre_msg_get_status(req->rq_reqmsg),
DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
"(%s != %s)", lustre_msg_get_status(req->rq_reqmsg),
req->rq_import_generation = imp->imp_generation;
restart:
if (ptlrpc_import_delay_req(imp, req, &rc)) {
req->rq_import_generation = imp->imp_generation;
restart:
if (ptlrpc_import_delay_req(imp, req, &rc)) {
- list_del(&req->rq_list);
+ list_del_init(&req->rq_list);
list_add_tail(&req->rq_list, &imp->imp_delayed_list);
atomic_inc(&imp->imp_inflight);
spin_unlock(&imp->imp_lock);
list_add_tail(&req->rq_list, &imp->imp_delayed_list);
atomic_inc(&imp->imp_inflight);
spin_unlock(&imp->imp_lock);
+ CFS_INIT_LIST_HEAD(&llcd->llcd_list);
llcd->llcd_size = llcd_size;
llcd->llcd_cookiebytes = 0;
atomic_inc(&llcd_count);
llcd->llcd_size = llcd_size;
llcd->llcd_cookiebytes = 0;
atomic_inc(&llcd_count);
LASSERT_SEM_LOCKED(&ctxt->loc_sem);
LASSERT(ctxt->loc_llcd == NULL);
lcm = ctxt->loc_lcm;
LASSERT_SEM_LOCKED(&ctxt->loc_sem);
LASSERT(ctxt->loc_llcd == NULL);
lcm = ctxt->loc_lcm;
+ spin_lock(&lcm->lcm_lock);
atomic_inc(&lcm->lcm_count);
atomic_inc(&lcm->lcm_count);
+ list_add_tail(&llcd->llcd_list, &lcm->lcm_llcds);
+ spin_unlock(&lcm->lcm_lock);
CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p (%d)\n",
llcd, ctxt, atomic_read(&lcm->lcm_count));
llcd->llcd_ctxt = llog_ctxt_get(ctxt);
CDEBUG(D_RPCTRACE, "Attach llcd %p to ctxt %p (%d)\n",
llcd, ctxt, atomic_read(&lcm->lcm_count));
llcd->llcd_ctxt = llog_ctxt_get(ctxt);
llcd_print(llcd, __FUNCTION__, __LINE__);
LBUG();
}
llcd_print(llcd, __FUNCTION__, __LINE__);
LBUG();
}
+ spin_lock(&lcm->lcm_lock);
+ LASSERT(!list_empty(&llcd->llcd_list));
+ list_del_init(&llcd->llcd_list);
atomic_dec(&lcm->lcm_count);
atomic_dec(&lcm->lcm_count);
+ spin_unlock(&lcm->lcm_lock);
ctxt->loc_llcd = NULL;
CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p (%d)\n",
ctxt->loc_llcd = NULL;
CDEBUG(D_RPCTRACE, "Detach llcd %p from ctxt %p (%d)\n",
llcd = llcd_detach(ctxt);
if (llcd)
llcd_free(llcd);
llcd = llcd_detach(ctxt);
if (llcd)
llcd_free(llcd);
-
- if (atomic_read(&lcm->lcm_count) == 0)
- cfs_waitq_signal(&lcm->lcm_waitq);
rc, lcm->lcm_name);
RETURN(rc);
}
rc, lcm->lcm_name);
RETURN(rc);
}
- lcm->lcm_set = lcm->lcm_pc.pc_set;
RETURN(rc);
}
EXPORT_SYMBOL(llog_recov_thread_start);
RETURN(rc);
}
EXPORT_SYMBOL(llog_recov_thread_start);
*/
void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
{
*/
void llog_recov_thread_stop(struct llog_commit_master *lcm, int force)
{
- struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
* Let all know that we're stopping. This will also make
* llcd_send() refuse any new llcds.
*/
set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags);
* Let all know that we're stopping. This will also make
* llcd_send() refuse any new llcds.
*/
set_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags);
* Stop processing thread. No new rpcs will be accepted for
* for processing now.
*/
ptlrpcd_stop(&lcm->lcm_pc, force);
/*
* Stop processing thread. No new rpcs will be accepted for
* for processing now.
*/
ptlrpcd_stop(&lcm->lcm_pc, force);
/*
- * Wait for llcd number == 0. Note, this is infinite wait.
- * All other parts should make sure that no lost llcd is left.
+ * By this point no alive inflight llcds should be left. Only
+ * those forgotten in sync may still be attached to ctxt. Let's
+ * print them.
- l_wait_event(lcm->lcm_waitq,
- atomic_read(&lcm->lcm_count) == 0, &lwi);
+ if (atomic_read(&lcm->lcm_count) != 0) {
+ struct llog_canceld_ctxt *llcd;
+ struct list_head *tmp;
+
+ CERROR("Busy llcds found (%d) on lcm %p\n",
+ atomic_read(&lcm->lcm_count) == 0, lcm);
+
+ spin_lock(&lcm->lcm_lock);
+ list_for_each(tmp, &lcm->lcm_llcds) {
+ llcd = list_entry(tmp, struct llog_canceld_ctxt,
+ llcd_list);
+ llcd_print(llcd, __FUNCTION__, __LINE__);
+ }
+ spin_unlock(&lcm->lcm_lock);
+ }
EXIT;
}
EXPORT_SYMBOL(llog_recov_thread_stop);
EXIT;
}
EXPORT_SYMBOL(llog_recov_thread_stop);
"ll_log_commit_%s", name);
strncpy(lcm->lcm_name, name, sizeof(lcm->lcm_name));
"ll_log_commit_%s", name);
strncpy(lcm->lcm_name, name, sizeof(lcm->lcm_name));
- cfs_waitq_init(&lcm->lcm_waitq);
atomic_set(&lcm->lcm_count, 0);
atomic_set(&lcm->lcm_count, 0);
+ spin_lock_init(&lcm->lcm_lock);
+ CFS_INIT_LIST_HEAD(&lcm->lcm_llcds);
rc = llog_recov_thread_start(lcm);
if (rc) {
CERROR("Can't start commit thread, rc %d\n", rc);
rc = llog_recov_thread_start(lcm);
if (rc) {
CERROR("Can't start commit thread, rc %d\n", rc);
GOTO(out, rc = -ENODEV);
}
GOTO(out, rc = -ENODEV);
}
- if (ctxt->loc_obd->obd_stopping) {
- CDEBUG(D_RPCTRACE, "Obd is stopping for ctxt %p\n", ctxt);
- GOTO(out, rc = -ENODEV);
- }
-
if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) {
CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n",
ctxt);
if (test_bit(LLOG_LCM_FL_EXIT, &lcm->lcm_flags)) {
CDEBUG(D_RPCTRACE, "Commit thread is stopping for ctxt %p\n",
ctxt);
- * Copy cookies to @llcd, no matter old or new allocated one.
+ * Copy cookies to @llcd, no matter old or new allocated
+ * one.
*/
llcd_copy(llcd, cookies);
}
/*
*/
llcd_copy(llcd, cookies);
}
/*
- * Let's check if we need to send copied @cookies asap. If yes - do it.
+ * Let's check if we need to send copied @cookies asap. If yes
+ * then do it.
*/
if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) {
rc = llcd_push(ctxt);
*/
if (llcd && (flags & OBD_LLOG_FL_SENDNOW)) {
rc = llcd_push(ctxt);
+ /*
+ * Flush any remaining llcd.
+ */
mutex_down(&ctxt->loc_sem);
if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
mutex_down(&ctxt->loc_sem);
if (exp && (ctxt->loc_imp == exp->exp_imp_reverse)) {
- CDEBUG(D_RPCTRACE, "Reverse import disconnect\n");
- * Check for llcd which might be left attached to @ctxt.
- * Let's kill it.
+ * This is ost->mds connection, we can't be sure that mds
+ * can still receive cookies, let's killed the cached llcd.
+ CDEBUG(D_RPCTRACE, "Kill cached llcd\n");
llcd_put(ctxt);
mutex_up(&ctxt->loc_sem);
} else {
llcd_put(ctxt);
mutex_up(&ctxt->loc_sem);
} else {
+ /*
+ * This is either llog_sync() from generic llog code or sync
+ * on client disconnect. In either way let's do it and send
+ * llcds to the target with waiting for completion.
+ */
+ CDEBUG(D_RPCTRACE, "Sync cached llcd\n");
mutex_up(&ctxt->loc_sem);
rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
}
mutex_up(&ctxt->loc_sem);
rc = llog_cancel(ctxt, NULL, 0, NULL, OBD_LLOG_FL_SENDNOW);
}