From 808c5044374180e592a54fb22b9827ffc12d5ae8 Mon Sep 17 00:00:00 2001 From: tianying Date: Thu, 23 Oct 2003 13:33:31 +0000 Subject: [PATCH] b=2110 r=Peter 1.Unlink log record will be created when mds_reint_unlink, mds_mfd_close or mds_cleanup_orphans is called. And that record will be cancelled by MDS when it receives the commit callback for filter_destroy. 2.It resolves the resetup problem for lustre built with enable-orphans on. 3.It resolves the cleanup problem of mds when recovery procedure is abort. --- lustre/ldlm/ldlm_lib.c | 11 +++++------ lustre/lov/lov_log.c | 1 + lustre/mds/mds_unlink_open.c | 7 +++++-- lustre/obdclass/llog_obd.c | 39 +++++++++++++++++++++++++++++++++++++-- lustre/ptlrpc/llog_net.c | 14 +++++--------- 5 files changed, 53 insertions(+), 19 deletions(-) diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 0d55df4..a1502d8 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -451,10 +451,12 @@ void target_abort_recovery(void *data) target_cancel_recovery_timer(obd); spin_unlock_bh(&obd->obd_processing_task_lock); + obd_precleanup(obd, OBD_OPT_FORCE); + class_disconnect_exports(obd, 0); + /* XXX can't call this with spin_lock_bh, but it probably should be protected, somehow. */ - if (OBT(obd) && OBP(obd, postsetup)) - OBP(obd, postsetup)(obd); + obd_postsetup(obd); /* when recovery was abort, cleanup orphans for mds */ if (OBT(obd) && OBP(obd, postrecov)) { @@ -462,7 +464,6 @@ void target_abort_recovery(void *data) CERROR("Cleanup %d orphans after recovery was abort!\n", rc); } - class_disconnect_exports(obd, 0); abort_delayed_replies(obd); abort_recovery_queue(obd); ptlrpc_run_recovery_over_upcall(obd); @@ -742,9 +743,7 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) obd->obd_name); obd->obd_recovering = 0; - if (OBT(obd) && OBP(obd, postsetup)) - OBP(obd, postsetup)(obd); - + obd_postsetup(obd); /* when recovering finished, cleanup orphans for mds */ if (OBT(obd) && OBP(obd, postrecov)) { CERROR("cleanup orphans after all clients recovered\n"); diff --git a/lustre/lov/lov_log.c b/lustre/lov/lov_log.c index e20ce4a..f8fe8cb 100644 --- a/lustre/lov/lov_log.c +++ b/lustre/lov/lov_log.c @@ -62,6 +62,7 @@ int lov_llog_setup(struct obd_device *obd, struct obd_device *disk_obd, LASSERT(lov->desc.ld_tgt_count == count); for (i = 0; i < lov->desc.ld_tgt_count; i++) { struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd; + child->obd_log_exp = disk_obd->obd_log_exp; rc = obd_llog_setup(child, disk_obd, index, 1, logids + i); if (rc) { CERROR("error lov_llog_open %d\n", i); diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c index 491fda5..6fb4ba1 100644 --- a/lustre/mds/mds_unlink_open.c +++ b/lustre/mds/mds_unlink_open.c @@ -197,7 +197,7 @@ static int mds_unlink(struct obd_device *obd, struct dentry *dchild, rc = PTR_ERR(handle); CERROR("error fsfilt_start: %d\n", rc); handle = NULL; - GOTO(out_free_req, rc); + GOTO(out_free_msg, rc); } rc = vfs_unlink(pending_dir, dchild); if (rc) @@ -215,10 +215,13 @@ static int mds_unlink(struct obd_device *obd, struct dentry *dchild, CERROR("error committing orphan unlink: %d\n", err); rc = err; - GOTO(out_free_req, rc); + GOTO(out_free_msg, rc); } } rc = mds_osc_destroy(mds, req); +out_free_msg: + OBD_FREE(req->rq_repmsg, req->rq_replen); + req->rq_repmsg = NULL; out_free_req: OBD_FREE(req, sizeof(*req)); err_alloc_req: diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c index e9269ac..903b6c6 100644 --- a/lustre/obdclass/llog_obd.c +++ b/lustre/obdclass/llog_obd.c @@ -85,6 +85,7 @@ int llog_obd_setup(struct obd_device *obd, struct obd_device *disk_obd, { struct llog_obd_ctxt *ctxt; struct llog_handle *handle; + struct obd_run_ctxt saved; int rc; LASSERT(count == 1); @@ -119,7 +120,9 @@ int llog_obd_setup(struct obd_device *obd, struct obd_device *disk_obd, GOTO(out, rc); disk_obd->obd_llog_ctxt->loc_handles[index] = handle; + push_ctxt(&saved, &disk_obd->obd_ctxt, NULL); rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL); + pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL); out: if (ctxt && rc) OBD_FREE(ctxt, sizeof(*ctxt)); @@ -156,6 +159,7 @@ int llog_obd_origin_add(struct obd_export *exp, struct llog_cookie *logcookies, int numcookies) { struct llog_handle *cathandle; + struct obd_export *export = exp->exp_obd->obd_log_exp; int rc; ENTRY; @@ -164,7 +168,8 @@ int llog_obd_origin_add(struct obd_export *exp, RETURN(-EINVAL); } - cathandle = exp->exp_obd->obd_llog_ctxt->loc_handles[index]; + //cathandle = exp->exp_obd->obd_llog_ctxt->loc_handles[index]; + cathandle = export->exp_obd->obd_llog_ctxt->loc_handles[index]; LASSERT(cathandle != NULL); rc = llog_cat_add_rec(cathandle, rec, logcookies, NULL); if (rc != 1) @@ -176,9 +181,13 @@ EXPORT_SYMBOL(llog_obd_origin_add); /* initialize the local storage obd for the logs */ int llog_initialize(struct obd_device *obd) { - struct obd_export *exp = class_new_export(obd); + struct obd_export *exp; ENTRY; + if (obd->obd_log_exp) + RETURN(0); + + exp = class_new_export(obd); if (exp == NULL) RETURN(-ENOMEM); memcpy(&exp->exp_client_uuid, &obd->obd_uuid, @@ -191,6 +200,32 @@ int llog_initialize(struct obd_device *obd) } EXPORT_SYMBOL(llog_initialize); +/* disconnect the local storage obd for the logs */ +int llog_disconnect(struct obd_device *obd) +{ + struct obd_export *exp; + ENTRY; + + LASSERT(obd->obd_log_exp); + exp = obd->obd_log_exp; + + class_handle_unhash(&exp->exp_handle); + spin_lock(&exp->exp_obd->obd_dev_lock); + list_del_init(&exp->exp_obd_chain); + exp->exp_obd->obd_num_exports--; + spin_unlock(&exp->exp_obd->obd_dev_lock); + OBD_FREE(exp, sizeof(*exp)); + if (obd->obd_set_up) { + atomic_dec(&obd->obd_refcount); + wake_up(&obd->obd_refcount_waitq); + } + + obd->obd_log_exp = NULL; + obd->obd_logops = NULL; + RETURN(0); +} +EXPORT_SYMBOL(llog_disconnect); + int llog_cat_initialize(struct obd_device *obd, int count) { int rc, i; diff --git a/lustre/ptlrpc/llog_net.c b/lustre/ptlrpc/llog_net.c index 97b13c2..1154dbe 100644 --- a/lustre/ptlrpc/llog_net.c +++ b/lustre/ptlrpc/llog_net.c @@ -75,15 +75,12 @@ int llog_origin_handle_cancel(struct obd_device *obd, { struct llog_cookie *logcookies; int num_cookies, rc = 0; - struct obd_device *log_obd; struct obd_run_ctxt saved; struct llog_handle *cathandle; int i; ENTRY; LASSERT(obd->obd_llog_ctxt); - log_obd = obd->obd_llog_ctxt->loc_obd; - LASSERT(log_obd); logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies)); num_cookies = req->rq_reqmsg->buflens[0]/sizeof(*logcookies); @@ -91,15 +88,14 @@ int llog_origin_handle_cancel(struct obd_device *obd, DEBUG_REQ(D_HA, req, "no cookies sent"); RETURN(-EFAULT); } - +#if 0 /* workaround until we don't need to send replies */ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); req->rq_repmsg->status = rc; if (rc) RETURN(rc); /* end workaround */ - - push_ctxt(&saved, &obd->obd_ctxt, NULL); +#endif i = logcookies->lgc_subsys; if (i < 0 || i > LLOG_OBD_MAX_HANDLES) { LBUG(); @@ -108,13 +104,13 @@ int llog_origin_handle_cancel(struct obd_device *obd, cathandle = obd->obd_llog_ctxt->loc_handles[i]; LASSERT(cathandle); + push_ctxt(&saved, &obd->obd_ctxt, NULL); rc = llog_cat_cancel_records(cathandle, num_cookies, logcookies); if (rc) CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc); - pop_ctxt(&saved, &log_obd->obd_ctxt, NULL); + pop_ctxt(&saved, &obd->obd_ctxt, NULL); - RETURN(rc); - req->rq_repmsg->status = rc; + //req->rq_repmsg->status = rc; RETURN(rc); } EXPORT_SYMBOL(llog_origin_handle_cancel); -- 1.8.3.1