Whamcloud - gitweb
b=2110
authortianying <tianying>
Thu, 23 Oct 2003 13:33:31 +0000 (13:33 +0000)
committertianying <tianying>
Thu, 23 Oct 2003 13:33:31 +0000 (13:33 +0000)
r=Peter

1.Unlink log record will be created when mds_reint_unlink, mds_mfd_close or mds_cleanup_orphans is called. And that record will be cancelled by MDS when it receives the commit callback for filter_destroy.

2.It resolves the resetup problem for lustre built with enable-orphans on.

3.It resolves the cleanup problem of mds when recovery procedure is abort.

lustre/ldlm/ldlm_lib.c
lustre/lov/lov_log.c
lustre/mds/mds_unlink_open.c
lustre/obdclass/llog_obd.c
lustre/ptlrpc/llog_net.c

index 0d55df4..a1502d8 100644 (file)
@@ -451,10 +451,12 @@ void target_abort_recovery(void *data)
         target_cancel_recovery_timer(obd);
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
+        obd_precleanup(obd, OBD_OPT_FORCE);
+        class_disconnect_exports(obd, 0);
+
         /* XXX can't call this with spin_lock_bh, but it probably
            should be protected, somehow. */
-        if (OBT(obd) && OBP(obd, postsetup))
-                OBP(obd, postsetup)(obd);
+        obd_postsetup(obd);
 
         /* when recovery was abort, cleanup orphans for mds */
         if (OBT(obd) && OBP(obd, postrecov)) {
@@ -462,7 +464,6 @@ void target_abort_recovery(void *data)
                 CERROR("Cleanup %d orphans after recovery was abort!\n", rc);
         }
 
-        class_disconnect_exports(obd, 0);
         abort_delayed_replies(obd);
         abort_recovery_queue(obd);
         ptlrpc_run_recovery_over_upcall(obd);
@@ -742,9 +743,7 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                        obd->obd_name);
                 obd->obd_recovering = 0;
 
-                if (OBT(obd) && OBP(obd, postsetup))
-                        OBP(obd, postsetup)(obd);
-
+                obd_postsetup(obd);
                 /* when recovering finished, cleanup orphans for mds       */
                 if (OBT(obd) && OBP(obd, postrecov)) {
                         CERROR("cleanup orphans after all clients recovered\n");
index e20ce4a..f8fe8cb 100644 (file)
@@ -62,6 +62,7 @@ int lov_llog_setup(struct obd_device *obd, struct obd_device *disk_obd,
         LASSERT(lov->desc.ld_tgt_count  == count);
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                 struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd;
+                child->obd_log_exp = disk_obd->obd_log_exp;
                 rc = obd_llog_setup(child, disk_obd, index, 1, logids + i);
                 if (rc) {
                         CERROR("error lov_llog_open %d\n", i);
index 491fda5..6fb4ba1 100644 (file)
@@ -197,7 +197,7 @@ static int mds_unlink(struct obd_device *obd, struct dentry *dchild,
                 rc = PTR_ERR(handle);
                 CERROR("error fsfilt_start: %d\n", rc);
                 handle = NULL;
-                GOTO(out_free_req, rc);
+                GOTO(out_free_msg, rc);
         }
         rc = vfs_unlink(pending_dir, dchild);
         if (rc) 
@@ -215,10 +215,13 @@ static int mds_unlink(struct obd_device *obd, struct dentry *dchild,
                         CERROR("error committing orphan unlink: %d\n",
                                err);
                         rc = err;
-                        GOTO(out_free_req, rc);
+                        GOTO(out_free_msg, rc);
                 }
         }
         rc = mds_osc_destroy(mds, req);
+out_free_msg:
+        OBD_FREE(req->rq_repmsg, req->rq_replen);
+        req->rq_repmsg = NULL;
 out_free_req:
         OBD_FREE(req, sizeof(*req));
 err_alloc_req:
index e9269ac..903b6c6 100644 (file)
@@ -85,6 +85,7 @@ int llog_obd_setup(struct obd_device *obd, struct obd_device *disk_obd,
 {
         struct llog_obd_ctxt *ctxt;
         struct llog_handle *handle;
+        struct obd_run_ctxt saved;
         int rc;
 
         LASSERT(count == 1);
@@ -119,7 +120,9 @@ int llog_obd_setup(struct obd_device *obd, struct obd_device *disk_obd,
                 GOTO(out, rc);
 
         disk_obd->obd_llog_ctxt->loc_handles[index] = handle;
+        push_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
         rc = llog_init_handle(handle,  LLOG_F_IS_CAT, NULL);
+        pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
  out:
         if (ctxt && rc) 
                 OBD_FREE(ctxt, sizeof(*ctxt));
@@ -156,6 +159,7 @@ int llog_obd_origin_add(struct obd_export *exp,
                     struct llog_cookie *logcookies, int numcookies)
 {
         struct llog_handle *cathandle;
+        struct obd_export *export = exp->exp_obd->obd_log_exp;
         int rc;
         ENTRY;
 
@@ -164,7 +168,8 @@ int llog_obd_origin_add(struct obd_export *exp,
                 RETURN(-EINVAL);
         }
 
-        cathandle = exp->exp_obd->obd_llog_ctxt->loc_handles[index];
+        //cathandle = exp->exp_obd->obd_llog_ctxt->loc_handles[index];
+        cathandle = export->exp_obd->obd_llog_ctxt->loc_handles[index];
         LASSERT(cathandle != NULL);
         rc = llog_cat_add_rec(cathandle, rec, logcookies, NULL);
         if (rc != 1)
@@ -176,9 +181,13 @@ EXPORT_SYMBOL(llog_obd_origin_add);
 /* initialize the local storage obd for the logs */
 int llog_initialize(struct obd_device *obd)
 {
-        struct obd_export *exp = class_new_export(obd);
+        struct obd_export *exp;
         ENTRY;
 
+        if (obd->obd_log_exp)
+                RETURN(0);
+
+        exp = class_new_export(obd);
         if (exp == NULL)
                 RETURN(-ENOMEM);
         memcpy(&exp->exp_client_uuid, &obd->obd_uuid, 
@@ -191,6 +200,32 @@ int llog_initialize(struct obd_device *obd)
 }
 EXPORT_SYMBOL(llog_initialize);
 
+/* disconnect the local storage obd for the logs */
+int llog_disconnect(struct obd_device *obd)
+{
+        struct obd_export *exp;
+        ENTRY;
+
+        LASSERT(obd->obd_log_exp);
+        exp = obd->obd_log_exp;
+
+        class_handle_unhash(&exp->exp_handle);
+        spin_lock(&exp->exp_obd->obd_dev_lock);
+        list_del_init(&exp->exp_obd_chain);
+        exp->exp_obd->obd_num_exports--;
+        spin_unlock(&exp->exp_obd->obd_dev_lock);
+        OBD_FREE(exp, sizeof(*exp));
+        if (obd->obd_set_up) {
+                atomic_dec(&obd->obd_refcount);
+                wake_up(&obd->obd_refcount_waitq);
+        }
+
+        obd->obd_log_exp = NULL;
+        obd->obd_logops = NULL;
+        RETURN(0);
+}
+EXPORT_SYMBOL(llog_disconnect);
+
 int llog_cat_initialize(struct obd_device *obd, int count)
 {
         int rc, i;
index 97b13c2..1154dbe 100644 (file)
@@ -75,15 +75,12 @@ int llog_origin_handle_cancel(struct obd_device *obd,
 {
         struct llog_cookie *logcookies;
         int num_cookies, rc = 0;
-        struct obd_device *log_obd;
         struct obd_run_ctxt saved;
         struct llog_handle *cathandle;
         int i;
         ENTRY;
 
         LASSERT(obd->obd_llog_ctxt);
-        log_obd = obd->obd_llog_ctxt->loc_obd;
-        LASSERT(log_obd);
 
         logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies));
         num_cookies = req->rq_reqmsg->buflens[0]/sizeof(*logcookies);
@@ -91,15 +88,14 @@ int llog_origin_handle_cancel(struct obd_device *obd,
                 DEBUG_REQ(D_HA, req, "no cookies sent");
                 RETURN(-EFAULT);
         }
-
+#if 0
         /* workaround until we don't need to send replies */
         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
         req->rq_repmsg->status = rc;
         if (rc)
                 RETURN(rc);
         /* end workaround */
-
-        push_ctxt(&saved, &obd->obd_ctxt, NULL); 
+#endif
         i = logcookies->lgc_subsys;
         if (i < 0 || i > LLOG_OBD_MAX_HANDLES) {
                 LBUG();
@@ -108,13 +104,13 @@ int llog_origin_handle_cancel(struct obd_device *obd,
         cathandle = obd->obd_llog_ctxt->loc_handles[i];
         LASSERT(cathandle);
 
+        push_ctxt(&saved, &obd->obd_ctxt, NULL); 
         rc = llog_cat_cancel_records(cathandle, num_cookies, logcookies);
         if (rc)
                 CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc);
-        pop_ctxt(&saved, &log_obd->obd_ctxt, NULL);
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
 
-        RETURN(rc);
-        req->rq_repmsg->status = rc;
+        //req->rq_repmsg->status = rc;
         RETURN(rc);
 }
 EXPORT_SYMBOL(llog_origin_handle_cancel);