Whamcloud - gitweb
fix according to nikita's inspection result; mainly:
authorhuanghua <huanghua>
Mon, 28 Aug 2006 07:45:41 +0000 (07:45 +0000)
committerhuanghua <huanghua>
Mon, 28 Aug 2006 07:45:41 +0000 (07:45 +0000)
(1) add special lu_context for commit callback;
(2) add some comment for data structures and interfaces;
(3) handle transaction commit error;
(4) add lu_context_{enter, exit}() for transaction context.

lustre/include/dt_object.h
lustre/mdt/mdt_recovery.c
lustre/obdclass/dt_object.c
lustre/osd/osd_handler.c

index b43670c..d30a1f9 100644 (file)
@@ -319,12 +319,20 @@ struct dt_index_operations {
 struct dt_device {
         struct lu_device             dd_lu_dev;
         struct dt_device_operations *dd_ops;
+
         /*
          * List of dt_txn_callback (see below). This is not protected in any
          * way, because callbacks are supposed to be added/deleted only during
          * single-threaded start-up shut-down procedures.
          */
         struct list_head             dd_txn_callbacks;
+
+        /* Thread context for transaction commit callback.
+         * Transaction commit is serialized, that is there is no more than one
+         * transaction commit at a time (jbd journal_commit_transaction() is 
+         * serialized). This means that it's enough to have _one_ lu_context.
+         */ 
+        struct lu_context            dd_ctx_for_commit;
 };
 
 int  dt_device_init(struct dt_device *dev, struct lu_device_type *t);
@@ -359,12 +367,33 @@ static inline int dt_object_exists(const struct dt_object *dt)
 }
 
 struct txn_param {
+        /* number of blocks this transaction will modify */
         unsigned int tp_credits;
 };
 
+/*
+ * This is the general purpose transaction handle.
+ * 1. Transaction Life Cycle
+ *      This transaction handle is allocated upon starting a new transaction,
+ *      and deallocated after this transaction is committed.
+ * 2. Transaction Nesting
+ *      We do _NOT_ support nested transaction. So, every thread should only
+ *      have one active transaction, and a transaction only belongs to one
+ *      thread. Due to this, transaction handle need no reference count.
+ * 3. Transaction & dt_object locking
+ *      dt_object locks should be taken inside transaction.
+ * 4. Transaction & RPC
+ *      No RPC request should be issued inside transaction.
+ */
 struct thandle {
+        /* the dt device on which the transactions are executed */
         struct dt_device *th_dev;
+
+        /* context for this transaction, tag is LCT_TX_HANDLE */
         struct lu_context th_ctx;
+
+        /* the last operation result in this transaction.
+         * this value is used in recovery */
         __s32             th_result;
 };
 
@@ -381,12 +410,10 @@ struct thandle {
  */
 struct dt_txn_callback {
         int (*dtc_txn_start)(const struct lu_context *ctx,
-                             struct dt_device *dev,
                              struct txn_param *param, void *cookie);
         int (*dtc_txn_stop)(const struct lu_context *ctx,
-                            struct dt_device *dev,
                             struct thandle *txn, void *cookie);
-        int (*dtc_txn_commit)(struct dt_device *dev,
+        int (*dtc_txn_commit)(const struct lu_context *ctx, 
                               struct thandle *txn, void *cookie);
         void            *dtc_cookie;
         struct list_head dtc_linkage;
@@ -397,9 +424,8 @@ void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb);
 
 int dt_txn_hook_start(const struct lu_context *ctx,
                       struct dt_device *dev, struct txn_param *param);
-int dt_txn_hook_stop(const struct lu_context *ctx,
-                     struct dt_device *dev, struct thandle *txn);
-int dt_txn_hook_commit(struct dt_device *dev, struct thandle *txn);
+int dt_txn_hook_stop(const struct lu_context *ctx, struct thandle *txn);
+int dt_txn_hook_commit(const struct lu_context *ctx, struct thandle *txn);
 
 int dt_try_as_dir(const struct lu_context *ctx, struct dt_object *obj);
 struct dt_object *dt_store_open(const struct lu_context *ctx,
index 5c5afc8..da2d648 100644 (file)
@@ -551,7 +551,6 @@ enum {
 
 /* add credits for last_rcvd update */
 static int mdt_txn_start_cb(const struct lu_context *ctx,
-                            struct dt_device *dev,
                             struct txn_param *param, void *cookie)
 {
         param->tp_credits += MDT_TXN_LAST_RCVD_CREDITS;
@@ -560,7 +559,6 @@ static int mdt_txn_start_cb(const struct lu_context *ctx,
 
 /* Update last_rcvd records with latests transaction data */
 static int mdt_txn_stop_cb(const struct lu_context *ctx,
-                           struct dt_device *dev,
                            struct thandle *txn, void *cookie)
 {
         struct mdt_device *mdt = cookie;
@@ -594,7 +592,7 @@ static int mdt_txn_stop_cb(const struct lu_context *ctx,
 }
 
 /* commit callback, need to update last_commited value */
-static int mdt_txn_commit_cb(struct dt_device *dev,
+static int mdt_txn_commit_cb(const struct lu_context *ctxt, 
                              struct thandle *txn, void *cookie)
 {
         struct mdt_device *mdt = cookie;
@@ -608,7 +606,7 @@ static int mdt_txn_commit_cb(struct dt_device *dev,
         if (txi->txi_transno > obd->obd_last_committed) {
                 obd->obd_last_committed = txi->txi_transno;
                 spin_unlock(&mdt->mdt_transno_lock);
-                ptlrpc_commit_replies (obd);
+                ptlrpc_commit_replies(obd);
         } else
                 spin_unlock(&mdt->mdt_transno_lock);
 
index e495c5a..58906e1 100644 (file)
@@ -36,6 +36,9 @@
 #include <dt_object.h>
 #include <libcfs/list.h>
 
+/* no lock is necessary to protect the list, because call-backs 
+ * are added during system startup. Please refer to "struct dt_device".
+ */
 void dt_txn_callback_add(struct dt_device *dev, struct dt_txn_callback *cb)
 {
         list_add(&cb->dtc_linkage, &dev->dd_txn_callbacks);
@@ -48,7 +51,7 @@ void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb)
 }
 EXPORT_SYMBOL(dt_txn_callback_del);
 
-int dt_txn_hook_start(const struct lu_context *ctx,
+int dt_txn_hook_start(const struct lu_context *ctxt,
                       struct dt_device *dev, struct txn_param *param)
 {
         int result;
@@ -58,7 +61,7 @@ int dt_txn_hook_start(const struct lu_context *ctx,
         list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
                 if (cb->dtc_txn_start == NULL)
                         continue;
-                result = cb->dtc_txn_start(ctx, dev, param, cb->dtc_cookie);
+                result = cb->dtc_txn_start(ctxt, param, cb->dtc_cookie);
                 if (result < 0)
                         break;
         }
@@ -66,17 +69,17 @@ int dt_txn_hook_start(const struct lu_context *ctx,
 }
 EXPORT_SYMBOL(dt_txn_hook_start);
 
-int dt_txn_hook_stop(const struct lu_context *ctx,
-                     struct dt_device *dev, struct thandle *txn)
+int dt_txn_hook_stop(const struct lu_context *ctxt, struct thandle *txn)
 {
-        int result;
+        struct dt_device       *dev = txn->th_dev;
         struct dt_txn_callback *cb;
+        int                     result;
 
         result = 0;
         list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
                 if (cb->dtc_txn_stop == NULL)
                         continue;
-                result = cb->dtc_txn_stop(ctx, dev, txn, cb->dtc_cookie);
+                result = cb->dtc_txn_stop(ctxt, txn, cb->dtc_cookie);
                 if (result < 0)
                         break;
         }
@@ -84,16 +87,17 @@ int dt_txn_hook_stop(const struct lu_context *ctx,
 }
 EXPORT_SYMBOL(dt_txn_hook_stop);
 
-int dt_txn_hook_commit(struct dt_device *dev, struct thandle *txn)
+int dt_txn_hook_commit(const struct lu_context *ctxt, struct thandle *txn)
 {
-        int result;
+        struct dt_device       *dev = txn->th_dev;
         struct dt_txn_callback *cb;
+        int                     result;
 
         result = 0;
         list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
                 if (cb->dtc_txn_commit == NULL)
                         continue;
-                result = cb->dtc_txn_commit(dev, txn, cb->dtc_cookie);
+                result = cb->dtc_txn_commit(ctxt, txn, cb->dtc_cookie);
                 if (result < 0)
                         break;
         }
@@ -103,13 +107,26 @@ EXPORT_SYMBOL(dt_txn_hook_commit);
 
 int dt_device_init(struct dt_device *dev, struct lu_device_type *t)
 {
+        int rc;
+
         CFS_INIT_LIST_HEAD(&dev->dd_txn_callbacks);
-        return lu_device_init(&dev->dd_lu_dev, t);
+        rc = lu_context_init(&dev->dd_ctx_for_commit, LCT_MD_THREAD);
+        if (rc == 0) {
+                lu_context_enter(&dev->dd_ctx_for_commit);
+                rc = lu_device_init(&dev->dd_lu_dev, t);
+                if (rc != 0) {
+                        lu_context_exit(&dev->dd_ctx_for_commit);
+                        lu_context_fini(&dev->dd_ctx_for_commit);
+                }
+        }
+        return rc;
 }
 EXPORT_SYMBOL(dt_device_init);
 
 void dt_device_fini(struct dt_device *dev)
 {
+        lu_context_exit(&dev->dd_ctx_for_commit);
+        lu_context_fini(&dev->dd_ctx_for_commit);
         lu_device_fini(&dev->dd_lu_dev);
 }
 EXPORT_SYMBOL(dt_device_fini);
index 900df6b..7117c31 100644 (file)
@@ -448,13 +448,23 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
 {
         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
         struct thandle     *th = &oh->ot_super;
+        struct dt_device   *dev = th->th_dev;
 
-        dt_txn_hook_commit(th->th_dev, th);
+        LASSERT(dev != NULL);
 
-        if (th->th_dev != NULL) {
-                lu_device_put(&th->th_dev->dd_lu_dev);
-                th->th_dev = NULL;
+        if (error) {
+                CERROR("transaction @0x%p commit error: %d\n", th, error);
+        } else {
+                /* This dd_ctx_for_commit is only for commit usage.
+                 * see "struct dt_device" 
+                 */ 
+                dt_txn_hook_commit(&dev->dd_ctx_for_commit, th);
         }
+
+        lu_device_put(&dev->dd_lu_dev);
+        th->th_dev = NULL;
+
+        lu_context_exit(&th->th_ctx);
         lu_context_fini(&th->th_ctx);
         OBD_FREE_PTR(oh);
 }
@@ -478,9 +488,6 @@ static struct thandle *osd_trans_start(const struct lu_context *ctx,
 
         if (osd_param_is_sane(dev, p)) {
                 OBD_ALLOC_GFP(oh, sizeof *oh, GFP_NOFS);
-                /*TODO: it seems we need something like that:
-                 * OBD_SLAB_ALLOC(oh, oh_cache, GFP_NOFS, sizeof *oh);
-                 */
                 if (oh != NULL) {
                         /*
                          * XXX temporary stuff. Some abstraction layer should
@@ -490,14 +497,12 @@ static struct thandle *osd_trans_start(const struct lu_context *ctx,
                         jh = journal_start(osd_journal(dev), p->tp_credits);
                         if (!IS_ERR(jh)) {
                                 oh->ot_handle = jh;
-                                /* hmm, since oh init and start are done together
-                                 * the ref starts from 2
-                                 */
                                 th = &oh->ot_super;
                                 th->th_dev = d;
                                 lu_device_get(&d->dd_lu_dev);
                                 /* add commit callback */
                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
+                                lu_context_enter(&th->th_ctx);
                                 journal_callback_set(jh, osd_trans_commit_cb,
                                                      (struct journal_callback *)&oh->ot_jcb);
                                 LASSERT(oti->oti_txns == 0);
@@ -532,7 +537,7 @@ static void osd_trans_stop(const struct lu_context *ctx, struct thandle *th)
                 /*
                  * XXX temporary stuff. Some abstraction layer should be used.
                  */
-                result = dt_txn_hook_stop(ctx, th->th_dev, th);
+                result = dt_txn_hook_stop(ctx, th);
                 if (result != 0)
                         CERROR("Failure in transaction hook: %d\n", result);