Whamcloud - gitweb
LU-3536 lod: write updates to update log 08/11408/53
authorWang Di <di.wang@intel.com>
Mon, 11 Aug 2014 20:46:38 +0000 (13:46 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 1 Jun 2015 20:31:37 +0000 (20:31 +0000)
For cross-MDT operation, LOD will write updates into the
update log on all of MDTs.

1. In transaction start, LOD perpare the update records
   buffer for cross-MDT operation.
2. Sub LOD collects all updates in execution phase.
3. In transaction stop, LOD will write thse updates as
   llog record on all of MDTs.

4. Disable sanity-scrub.sh 4 until LU6380 is fixed.

Change-Id: Ibba79267393db00ba05e0aa2df9865f88149eaa4
Signed-off-by: Wang Di <di.wang@intel.com>
Reviewed-on: http://review.whamcloud.com/11408
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
29 files changed:
lustre/fld/fld_handler.c
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_fld.h
lustre/include/lustre_log.h
lustre/include/lustre_net.h
lustre/include/lustre_update.h
lustre/lod/lod_dev.c
lustre/lod/lod_internal.h
lustre/lod/lod_lov.c
lustre/lod/lod_sub_object.c
lustre/mdt/mdt_handler.c
lustre/obdclass/llog_internal.h
lustre/obdclass/llog_obd.c
lustre/obdclass/llog_osd.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_sync.c
lustre/osp/osp_trans.c
lustre/ptlrpc/wiretest.c
lustre/target/out_handler.c
lustre/target/out_lib.c
lustre/target/update_records.c
lustre/target/update_trans.c
lustre/tests/conf-sanity.sh
lustre/tests/sanity-lfsck.sh
lustre/tests/sanity-scrub.sh
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 2f355a4..417f103 100644 (file)
@@ -472,6 +472,7 @@ int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
                GOTO(out_index, rc);
 
        fld->lsf_control_exp = NULL;
+       fld->lsf_seq_lookup = fld_server_lookup;
 
        RETURN(0);
 out_index:
index a68dc95..c5c4f9e 100644 (file)
@@ -401,8 +401,8 @@ enum fid_seq {
        FID_SEQ_OST_MDT0        = 0,
        FID_SEQ_LLOG            = 1, /* unnamed llogs */
        FID_SEQ_ECHO            = 2,
-       FID_SEQ_OST_MDT1        = 3,
-       FID_SEQ_OST_MAX         = 9, /* Max MDT count before OST_on_FID */
+       FID_SEQ_UNUSED_START    = 3,
+       FID_SEQ_UNUSED_END      = 9,
        FID_SEQ_LLOG_NAME       = 10, /* named llogs */
        FID_SEQ_RSVD            = 11,
        FID_SEQ_IGIF            = 12,
@@ -426,6 +426,11 @@ enum fid_seq {
        FID_SEQ_QUOTA_GLB       = 0x200000006ULL,
        FID_SEQ_ROOT            = 0x200000007ULL,  /* Located on MDT0 */
        FID_SEQ_LAYOUT_RBTREE   = 0x200000008ULL,
+       /* sequence is used for update logs of cross-MDT operation */
+       FID_SEQ_UPDATE_LOG      = 0x200000009ULL,
+       /* Sequence is used for the directory under which update logs
+        * are created. */
+       FID_SEQ_UPDATE_LOG_DIR  = 0x20000000aULL,
        FID_SEQ_NORMAL          = 0x200000400ULL,
        FID_SEQ_LOV_DEFAULT     = 0xffffffffffffffffULL
 };
@@ -537,6 +542,20 @@ static inline void lu_echo_root_fid(struct lu_fid *fid)
        fid->f_ver = 0;
 }
 
+static inline void lu_update_log_fid(struct lu_fid *fid, __u32 index)
+{
+       fid->f_seq = FID_SEQ_UPDATE_LOG;
+       fid->f_oid = index;
+       fid->f_ver = 0;
+}
+
+static inline void lu_update_log_dir_fid(struct lu_fid *fid, __u32 index)
+{
+       fid->f_seq = FID_SEQ_UPDATE_LOG_DIR;
+       fid->f_oid = index;
+       fid->f_ver = 0;
+}
+
 /**
  * Check if a fid is igif or not.
  * \param fid the fid to be tested.
@@ -587,6 +606,26 @@ static inline int fid_is_layout_rbtree(const struct lu_fid *fid)
        return fid_seq(fid) == FID_SEQ_LAYOUT_RBTREE;
 }
 
+static inline bool fid_seq_is_update_log(__u64 seq)
+{
+       return seq == FID_SEQ_UPDATE_LOG;
+}
+
+static inline bool fid_is_update_log(const struct lu_fid *fid)
+{
+       return fid_seq_is_update_log(fid_seq(fid));
+}
+
+static inline bool fid_seq_is_update_log_dir(__u64 seq)
+{
+       return seq == FID_SEQ_UPDATE_LOG_DIR;
+}
+
+static inline bool fid_is_update_log_dir(const struct lu_fid *fid)
+{
+       return fid_seq_is_update_log_dir(fid_seq(fid));
+}
+
 /* convert an OST objid into an IDIF FID SEQ number */
 static inline __u64 fid_idif_seq(__u64 id, __u32 ost_idx)
 {
@@ -807,7 +846,8 @@ static inline int fid_to_ostid(const struct lu_fid *fid, struct ost_id *ostid)
 /* Check whether the fid is for LAST_ID */
 static inline bool fid_is_last_id(const struct lu_fid *fid)
 {
-       return fid_oid(fid) == 0;
+       return fid_oid(fid) == 0 && fid_seq(fid) != FID_SEQ_UPDATE_LOG &&
+              fid_seq(fid) != FID_SEQ_UPDATE_LOG_DIR;
 }
 
 /**
@@ -3150,6 +3190,8 @@ enum llog_ctxt_id {
        /* for multiple changelog consumers */
        LLOG_CHANGELOG_USER_ORIG_CTXT = 14,
        LLOG_AGENT_ORIG_CTXT = 15, /**< agent requests generation on cdt */
+       LLOG_UPDATELOG_ORIG_CTXT = 16, /* update log */
+       LLOG_UPDATELOG_REPL_CTXT = 17, /* update log */
        LLOG_MAX_CTXTS
 };
 
@@ -3967,6 +4009,7 @@ enum update_type {
        OUT_WRITE               = 12,
        OUT_XATTR_DEL           = 13,
        OUT_PUNCH               = 14,
+       OUT_READ                = 15,
        OUT_LAST
 };
 
@@ -4111,6 +4154,30 @@ object_update_result_get(const struct object_update_reply *reply,
        return ptr;
 }
 
+/* read update result */
+struct out_read_reply {
+       __u32   orr_size;
+       __u32   orr_padding;
+       __u64   orr_offset;
+       char    orr_data[0];
+};
+
+static inline void orr_cpu_to_le(struct out_read_reply *orr_dst,
+                                const struct out_read_reply *orr_src)
+{
+       orr_dst->orr_size = cpu_to_le32(orr_src->orr_size);
+       orr_dst->orr_padding = cpu_to_le32(orr_src->orr_padding);
+       orr_dst->orr_offset = cpu_to_le64(orr_dst->orr_offset);
+}
+
+static inline void orr_le_to_cpu(struct out_read_reply *orr_dst,
+                                const struct out_read_reply *orr_src)
+{
+       orr_dst->orr_size = le32_to_cpu(orr_src->orr_size);
+       orr_dst->orr_padding = le32_to_cpu(orr_src->orr_padding);
+       orr_dst->orr_offset = le64_to_cpu(orr_dst->orr_offset);
+}
+
 /** layout swap request structure
  * fid1 and fid2 are in mdt_body
  */
index ccde82b..807649d 100644 (file)
@@ -95,6 +95,10 @@ struct lu_server_fld {
          * Fld service name in form "fld-srv-lustre-MDTXXX" */
         char                     lsf_name[80];
 
+       int (*lsf_seq_lookup)(const struct lu_env *env,
+                             struct lu_server_fld *fld, u64 seq,
+                             struct lu_seq_range *range);
+
        /**
         * Just reformatted or upgraded, and this flag is being
         * used to check whether the local FLDB is needs to be
index d10a30b..52bc7be 100644 (file)
@@ -312,6 +312,7 @@ struct llog_handle {
 
 /* llog_osd.c */
 extern struct llog_operations llog_osd_ops;
+extern struct llog_operations llog_common_cat_ops;
 int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
                          int idx, int count, struct llog_catid *idarray,
                          const struct lu_fid *fid);
@@ -322,6 +323,10 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
 #define LLOG_CTXT_FLAG_UNINITIALIZED     0x00000001
 #define LLOG_CTXT_FLAG_STOP             0x00000002
 
+/* Indicate the llog objects under this context are normal FID objects,
+ * instead of objects with local FID. */
+#define LLOG_CTXT_FLAG_NORMAL_FID       0x00000004
+
 struct llog_ctxt {
         int                      loc_idx; /* my index the obd array of ctxt's */
         struct obd_device       *loc_obd; /* points back to the containing obd*/
index 6ed3cd2..d3caeb2 100644 (file)
 
 /**
  * The update request includes all of updates from the create, which might
- * include linkea (4K maxim), together with other updates, we set it to 9K:
- * lustre_msg + ptlrpc_body + UPDATE_BUF_SIZE (8K)
+ * include linkea (4K maxim), together with other updates, we set it to 1000K:
+ * lustre_msg + ptlrpc_body + OUT_UPDATE_BUFFER_SIZE_MAX
  */
-#define OUT_MAXREQSIZE (9 * 1024)
+#define OUT_MAXREQSIZE (1000 * 1024)
 #define OUT_MAXREPSIZE MDS_MAXREPSIZE
 
 /** MDS_BUFSIZE = max_reqsize (w/o LOV EA) + max sptlrpc payload size */
index 83a7dcd..4cb87e0 100644 (file)
@@ -34,7 +34,8 @@
 #include <dt_object.h>
 
 #define OUT_UPDATE_INIT_BUFFER_SIZE    4096
-#define OUT_UPDATE_REPLY_SIZE          8192
+/* 16KB, the current biggest size is llog header(8KB) */
+#define OUT_UPDATE_REPLY_SIZE          16384
 
 struct dt_key;
 struct dt_rec;
@@ -412,6 +413,9 @@ int out_index_lookup_pack(const struct lu_env *env,
 int out_xattr_get_pack(const struct lu_env *env,
                       struct object_update *update, size_t max_update_size,
                       const struct lu_fid *fid, const char *name);
+int out_read_pack(const struct lu_env *env, struct object_update *update,
+                 size_t max_update_length, const struct lu_fid *fid,
+                 size_t size, loff_t pos);
 
 const char *update_op_str(__u16 opcode);
 
index fc38ae1..cd7a97d 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <linux/kthread.h>
 #include <obd_class.h>
 #include <md_object.h>
 #include <lustre_fid.h>
 #include <lustre_param.h>
 #include <lustre_update.h>
+#include <lustre_log.h>
 
 #include "lod_internal.h"
 
+static const char lod_update_log_name[] = "update_log";
+static const char lod_update_log_dir_name[] = "update_log_dir";
+
 /*
  * Lookup target by FID.
  *
@@ -133,6 +138,12 @@ int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
                RETURN(0);
        }
 
+       if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
+               *tgt = fid_oid(fid);
+               *type = LU_SEQ_RANGE_MDT;
+               RETURN(0);
+       }
+
        if (!lod->lod_initialized || (!fid_seq_in_fldb(fid_seq(fid)))) {
                LASSERT(lu_site2seq(lod2lu_dev(lod)->ld_site) != NULL);
 
@@ -215,7 +226,7 @@ static struct lu_object *lod_object_alloc(const struct lu_env *env,
  * \retval 0                   on success
  * \retval negative            negated errno on error
  **/
-static int lod_cleanup_desc_tgts(const struct lu_env *env,
+static int lod_sub_process_config(const struct lu_env *env,
                                 struct lod_device *lod,
                                 struct lod_tgt_descs *ltd,
                                 struct lustre_cfg *lcfg)
@@ -248,22 +259,88 @@ static int lod_cleanup_desc_tgts(const struct lu_env *env,
        return rc;
 }
 
+struct lod_recovery_data {
+       struct lod_device       *lrd_lod;
+       struct lod_tgt_desc     *lrd_ltd;
+       struct ptlrpc_thread    *lrd_thread;
+       __u32                   lrd_idx;
+};
+
+/**
+ * recovery thread for update log
+ *
+ * Start recovery thread and prepare the sub llog, then it will retrieve
+ * the update records from the correpondent MDT and do recovery.
+ *
+ * \param[in] arg      pointer to the recovery data
+ *
+ * \retval             0 if recovery succeeds
+ * \retval             negative errno if recovery failed.
+ */
+static int lod_sub_recovery_thread(void *arg)
+{
+       struct lod_recovery_data        *lrd = arg;
+       struct lod_device               *lod = lrd->lrd_lod;
+       struct dt_device                *dt;
+       struct ptlrpc_thread            *thread = lrd->lrd_thread;
+       struct lu_env                   env;
+       int                             rc;
+       ENTRY;
+
+       thread->t_flags = SVC_RUNNING;
+       wake_up(&thread->t_ctl_waitq);
+
+       rc = lu_env_init(&env, LCT_LOCAL | LCT_MD_THREAD);
+       if (rc != 0) {
+               OBD_FREE_PTR(lrd);
+               CERROR("%s: can't initialize env: rc = %d\n",
+                      lod2obd(lod)->obd_name, rc);
+               RETURN(rc);
+       }
+
+       if (lrd->lrd_ltd == NULL)
+               dt = lod->lod_child;
+       else
+               dt = lrd->lrd_ltd->ltd_tgt;
+
+       rc = lod_sub_prep_llog(&env, lod, dt, lrd->lrd_idx);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       /* XXX do recovery in the following patches */
+
+out:
+       OBD_FREE_PTR(lrd);
+       thread->t_flags = SVC_STOPPED;
+       wake_up(&thread->t_ctl_waitq);
+       lu_env_fini(&env);
+       RETURN(rc);
+}
+
 /**
  * Extract MDT target index from a device name.
  *
  * a helper function to extract index from the given device name
  * like "fsname-MDTxxxx-mdtlov"
  *
- * \param[in] lodname  device name
- * \param[out] index   extracted index
+ * \param[in] lodname          device name
+ * \param[out] mdt_index       extracted index
  *
  * \retval 0           on success
  * \retval -EINVAL     if the name is invalid
  */
-static int lodname2mdt_index(char *lodname, long *index)
+int lodname2mdt_index(char *lodname, __u32 *mdt_index)
 {
+       unsigned long index;
        char *ptr, *tmp;
 
+       /* 1.8 configs don't have "-MDT0000" at the end */
+       ptr = strstr(lodname, "-MDT");
+       if (ptr == NULL) {
+               *mdt_index = 0;
+               return 0;
+       }
+
        ptr = strrchr(lodname, '-');
        if (ptr == NULL) {
                CERROR("invalid MDT index in '%s'\n", lodname);
@@ -285,15 +362,182 @@ static int lodname2mdt_index(char *lodname, long *index)
                return -EINVAL;
        }
 
-       *index = simple_strtol(ptr - 4, &tmp, 16);
-       if (*tmp != '-' || *index > INT_MAX || *index < 0) {
+       index = simple_strtol(ptr - 4, &tmp, 16);
+       if (*tmp != '-' || index > INT_MAX) {
                CERROR("invalid MDT index in '%s'\n", lodname);
                return -EINVAL;
        }
+       *mdt_index = index;
        return 0;
 }
 
 /**
+ * Init sub llog context
+ *
+ * Setup update llog ctxt for update recovery threads, then start the
+ * recovery thread (lod_sub_recovery_thread) to read update llog from
+ * the correspondent MDT to do update recovery.
+ *
+ * \param[in] env      execution environment
+ * \param[in] lod      lod device to do update recovery
+ * \param[in] dt       sub dt device for which the recovery thread is
+ *
+ * \retval             0 if initialization succeeds.
+ * \retval             negative errno if initialization fails.
+ */
+int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
+                     struct dt_device *dt)
+{
+       struct obd_device               *obd;
+       struct lod_recovery_data        *lrd = NULL;
+       struct ptlrpc_thread            *thread;
+       struct task_struct              *task;
+       struct l_wait_info              lwi = { 0 };
+       struct lod_tgt_desc             *sub_ltd = NULL;
+       __u32                           index;
+       int                             rc;
+       ENTRY;
+
+       OBD_ALLOC_PTR(lrd);
+       if (lrd == NULL)
+               RETURN(-ENOMEM);
+
+       if (lod->lod_child == dt) {
+               thread = &lod->lod_child_recovery_thread;
+               rc = lodname2mdt_index(lod2obd(lod)->obd_name, &index);
+               if (rc != 0) {
+                       OBD_FREE_PTR(lrd);
+                       RETURN(rc);
+               }
+       } else {
+               struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
+               struct lod_tgt_desc     *tgt = NULL;
+               unsigned int            i;
+
+               mutex_lock(&ltd->ltd_mutex);
+               cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+                       tgt = LTD_TGT(ltd, i);
+                       if (tgt->ltd_tgt == dt) {
+                               index = tgt->ltd_index;
+                               sub_ltd = tgt;
+                               break;
+                       }
+               }
+               mutex_unlock(&ltd->ltd_mutex);
+               OBD_ALLOC_PTR(tgt->ltd_recovery_thread);
+               if (tgt->ltd_recovery_thread == NULL) {
+                       OBD_FREE_PTR(lrd);
+                       RETURN(-ENOMEM);
+               }
+               thread = tgt->ltd_recovery_thread;
+       }
+
+       lrd->lrd_lod = lod;
+       lrd->lrd_ltd = sub_ltd;
+       lrd->lrd_thread = thread;
+       lrd->lrd_idx = index;
+       init_waitqueue_head(&thread->t_ctl_waitq);
+
+       obd = dt->dd_lu_dev.ld_obd;
+       obd->obd_lvfs_ctxt.dt = dt;
+       rc = llog_setup(env, obd, &obd->obd_olg, LLOG_UPDATELOG_ORIG_CTXT,
+                       NULL, &llog_common_cat_ops);
+       if (rc < 0) {
+               CERROR("%s: cannot setup updatelog llog: rc = %d\n",
+                      obd->obd_name, rc);
+               OBD_FREE_PTR(lrd);
+               RETURN(rc);
+       }
+
+       /* Start the recovery thread */
+       task = kthread_run(lod_sub_recovery_thread, lrd, "lod_recov_%04x",
+                          index);
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               OBD_FREE_PTR(lrd);
+               CERROR("%s: cannot start recovery thread: rc = %d\n",
+                      obd->obd_name, rc);
+               GOTO(out_llog, rc);
+       }
+
+       l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING ||
+                                         thread->t_flags & SVC_STOPPED, &lwi);
+out_llog:
+       if (rc != 0)
+               lod_sub_fini_llog(env, dt, thread);
+
+       RETURN(rc);
+}
+
+/**
+ * finish sub llog context
+ *
+ * Stop update recovery thread for the sub device, then cleanup the
+ * correspondent llog ctxt.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       dt device(lod) to do update recovery
+ * \param[in] thread   recovery thread on this sub device
+ */
+void lod_sub_fini_llog(const struct lu_env *env,
+                      struct dt_device *dt, struct ptlrpc_thread *thread)
+{
+       struct obd_device       *obd;
+       struct llog_ctxt        *ctxt;
+
+       CDEBUG(D_INFO, "%s: finish sub llog\n", dt->dd_lu_dev.ld_obd->obd_name);
+       /* Stop recovery thread first */
+       if (thread != NULL && thread->t_flags & SVC_RUNNING) {
+               thread->t_flags = SVC_STOPPING;
+               wake_up(&thread->t_ctl_waitq);
+               wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+       }
+
+       obd = dt->dd_lu_dev.ld_obd;
+       ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
+       if (ctxt == NULL)
+               return;
+
+       if (ctxt->loc_handle != NULL)
+               llog_cat_close(env, ctxt->loc_handle);
+
+       llog_cleanup(env, ctxt);
+}
+
+/**
+ * finish all sub llog
+ *
+ * cleanup all of sub llog ctxt on the LOD.
+ *
+ * \param[in] env      execution environment
+ * \param[in] lod      lod device to do update recovery
+ */
+void lod_sub_fini_all_llogs(const struct lu_env *env, struct lod_device *lod)
+{
+       struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+       unsigned int i;
+
+       /* Stop the update log commit cancel threads and finish master
+        * llog ctxt */
+       lod_sub_fini_llog(env, lod->lod_child, &lod->lod_child_recovery_thread);
+
+       lod_getref(ltd);
+       cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+               struct lod_tgt_desc     *tgt;
+
+               tgt = LTD_TGT(ltd, i);
+               if (tgt->ltd_recovery_thread != NULL) {
+                       lod_sub_fini_llog(env, tgt->ltd_tgt,
+                                         tgt->ltd_recovery_thread);
+                       OBD_FREE_PTR(tgt->ltd_recovery_thread);
+                       tgt->ltd_recovery_thread = NULL;
+               }
+       }
+
+       lod_putref(lod, ltd);
+}
+
+/**
  * Implementation of lu_device_operations::ldo_process_config() for LOD
  *
  * The method is called by the configuration subsystem during setup,
@@ -356,20 +600,13 @@ static int lod_process_config(const struct lu_env *env,
                        GOTO(out, rc = -EINVAL);
 
                if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
-                       char *mdt;
-                       mdt = strstr(lustre_cfg_string(lcfg, 0), "-MDT");
-                       /* 1.8 configs don't have "-MDT0000" at the end */
-                       if (mdt == NULL) {
-                               mdt_index = 0;
-                       } else {
-                               long long_index;
-                               rc = lodname2mdt_index(
-                                       lustre_cfg_string(lcfg, 0),
-                                       &long_index);
-                               if (rc != 0)
-                                       GOTO(out, rc);
-                               mdt_index = long_index;
-                       }
+                       __u32 mdt_index;
+
+                       rc = lodname2mdt_index(lustre_cfg_string(lcfg, 0),
+                                              &mdt_index);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
                        rc = lod_add_device(env, lod, arg1, index, gen,
                                            mdt_index, LUSTRE_OSC_NAME, 1);
                } else if (lcfg->lcfg_command == LCFG_ADD_MDC) {
@@ -399,17 +636,26 @@ static int lod_process_config(const struct lu_env *env,
                        rc = 0;
                GOTO(out, rc);
        }
-       case LCFG_CLEANUP:
        case LCFG_PRE_CLEANUP: {
-               lu_dev_del_linkage(dev->ld_site, dev);
-               lod_cleanup_desc_tgts(env, lod, &lod->lod_mdt_descs, lcfg);
-               lod_cleanup_desc_tgts(env, lod, &lod->lod_ost_descs, lcfg);
-               if (lcfg->lcfg_command == LCFG_PRE_CLEANUP)
-                       break;
+               lod_sub_process_config(env, lod, &lod->lod_mdt_descs, lcfg);
+               lod_sub_process_config(env, lod, &lod->lod_ost_descs, lcfg);
+               next = &lod->lod_child->dd_lu_dev;
+               rc = next->ld_ops->ldo_process_config(env, next, lcfg);
+               if (rc != 0)
+                       CDEBUG(D_HA, "%s: can't process %u: %d\n",
+                              lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
+
+               lod_sub_fini_all_llogs(env, lod);
+               break;
+       }
+       case LCFG_CLEANUP: {
                /*
                 * do cleanup on underlying storage only when
                 * all OSPs are cleaned up, as they use that OSD as well
                 */
+               lu_dev_del_linkage(dev->ld_site, dev);
+               lod_sub_process_config(env, lod, &lod->lod_mdt_descs, lcfg);
+               lod_sub_process_config(env, lod, &lod->lod_ost_descs, lcfg);
                next = &lod->lod_child->dd_lu_dev;
                rc = next->ld_ops->ldo_process_config(env, next, lcfg);
                if (rc)
@@ -472,6 +718,46 @@ static int lod_recovery_complete(const struct lu_env *env,
 }
 
 /**
+ * Init update logs on all sub device
+ *
+ * LOD initialize update logs on all of sub devices. Because the initialization
+ * process might need FLD lookup, see llog_osd_open()->dt_locate()->...->
+ * lod_object_init(), this API has to be called after LOD is initialized.
+ * \param[in] env      execution environment
+ * \param[in] lod      lod device
+ *
+ * \retval             0 if update log is initialized successfully.
+ * \retval             negative errno if initialization fails.
+ */
+static int lod_sub_init_llogs(const struct lu_env *env, struct lod_device *lod)
+{
+       struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
+       int                     rc;
+       unsigned int            i;
+       ENTRY;
+
+       /* llog must be setup after LOD is initialized, because llog
+        * initialization include FLD lookup */
+       LASSERT(lod->lod_initialized);
+
+       /* Init the llog in its own stack */
+       rc = lod_sub_init_llog(env, lod, lod->lod_child);
+       if (rc < 0)
+               RETURN(rc);
+
+       cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+               struct lod_tgt_desc     *tgt;
+
+               tgt = LTD_TGT(ltd, i);
+               rc = lod_sub_init_llog(env, lod, tgt->ltd_tgt);
+               if (rc != 0)
+                       break;
+       }
+
+       RETURN(rc);
+}
+
+/**
  * Implementation of lu_device_operations::ldo_prepare() for LOD
  *
  * see include/lu_object.h for the details.
@@ -479,9 +765,13 @@ static int lod_recovery_complete(const struct lu_env *env,
 static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
                       struct lu_device *cdev)
 {
-       struct lod_device   *lod = lu2lod_dev(cdev);
-       struct lu_device    *next = &lod->lod_child->dd_lu_dev;
-       int                  rc;
+       struct lod_device       *lod = lu2lod_dev(cdev);
+       struct lu_device        *next = &lod->lod_child->dd_lu_dev;
+       struct lu_fid           *fid = &lod_env_info(env)->lti_fid;
+       int                     rc;
+       struct dt_object        *root;
+       struct dt_object        *dto;
+       __u32                   index;
        ENTRY;
 
        rc = next->ld_ops->ldo_prepare(env, pdev, next);
@@ -493,6 +783,45 @@ static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
 
        lod->lod_initialized = 1;
 
+       rc = dt_root_get(env, lod->lod_child, fid);
+       if (rc < 0)
+               RETURN(rc);
+
+       root = dt_locate(env, lod->lod_child, fid);
+       if (IS_ERR(root))
+               RETURN(PTR_ERR(root));
+
+       index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
+       lu_update_log_fid(fid, index);
+
+       /* Create update log object */
+       dto = local_file_find_or_create_with_fid(env, lod->lod_child,
+                                                fid, root,
+                                                lod_update_log_name,
+                                                S_IFREG | S_IRUGO | S_IWUSR);
+       if (IS_ERR(dto))
+               GOTO(out_put, rc = PTR_ERR(dto));
+
+       lu_object_put(env, &dto->do_lu);
+
+       /* Create update log dir */
+       lu_update_log_dir_fid(fid, index);
+       dto = local_file_find_or_create_with_fid(env, lod->lod_child,
+                                                fid, root,
+                                                lod_update_log_dir_name,
+                                                S_IFDIR | S_IRUGO | S_IWUSR);
+       if (IS_ERR(dto))
+               GOTO(out_put, rc = PTR_ERR(dto));
+
+       lu_object_put(env, &dto->do_lu);
+
+       rc = lod_sub_init_llogs(env, lod);
+       if (rc < 0)
+               GOTO(out_put, rc);
+
+out_put:
+       lu_object_put(env, &root->do_lu);
+
        RETURN(rc);
 }
 
@@ -1077,6 +1406,8 @@ static struct lu_device_type lod_device_type = {
  *
  * Currently, there is only one supported key: KEY_OSP_CONNECTED , to provide
  * the caller binary status whether LOD has seen connection to any OST target.
+ * It will also check if the MDT update log context being initialized (if
+ * needed).
  *
  * \param[in] env              LU environment provided by the caller
  * \param[in] exp              export of the caller
@@ -1098,7 +1429,7 @@ static int lod_obd_get_info(const struct lu_env *env, struct obd_export *exp,
        if (KEY_IS(KEY_OSP_CONNECTED)) {
                struct obd_device       *obd = exp->exp_obd;
                struct lod_device       *d;
-               struct lod_ost_desc     *ost;
+               struct lod_tgt_desc     *tgt;
                unsigned int            i;
                int                     rc = 1;
 
@@ -1108,16 +1439,38 @@ static int lod_obd_get_info(const struct lu_env *env, struct obd_export *exp,
                d = lu2lod_dev(obd->obd_lu_dev);
                lod_getref(&d->lod_ost_descs);
                lod_foreach_ost(d, i) {
-                       ost = OST_TGT(d, i);
-                       LASSERT(ost && ost->ltd_ost);
-
-                       rc = obd_get_info(env, ost->ltd_exp, keylen, key,
-                                         vallen, val);
+                       tgt = OST_TGT(d, i);
+                       LASSERT(tgt && tgt->ltd_tgt);
+                       rc = obd_get_info(env, tgt->ltd_exp, keylen, key,
+                                         vallen, val);
                        /* one healthy device is enough */
                        if (rc == 0)
                                break;
                }
                lod_putref(d, &d->lod_ost_descs);
+
+               lod_getref(&d->lod_mdt_descs);
+               lod_foreach_mdt(d, i) {
+                       struct llog_ctxt *ctxt;
+
+                       tgt = MDT_TGT(d, i);
+                       LASSERT(tgt != NULL);
+                       LASSERT(tgt->ltd_tgt != NULL);
+                       ctxt = llog_get_context(tgt->ltd_tgt->dd_lu_dev.ld_obd,
+                                               LLOG_UPDATELOG_ORIG_CTXT);
+                       if (ctxt == NULL) {
+                               rc = -EAGAIN;
+                               break;
+                       }
+                       if (ctxt->loc_handle == NULL) {
+                               rc = -EAGAIN;
+                               llog_ctxt_put(ctxt);
+                               break;
+                       }
+                       llog_ctxt_put(ctxt);
+               }
+               lod_putref(d, &d->lod_mdt_descs);
+
                RETURN(rc);
        }
 
index ffbcd70..c99b074 100644 (file)
@@ -122,6 +122,7 @@ struct lod_tgt_desc {
        __u32              ltd_index;
        struct ltd_qos     ltd_qos; /* qos info per target */
        struct obd_statfs  ltd_statfs;
+       struct ptlrpc_thread    *ltd_recovery_thread;
        unsigned long      ltd_active:1,/* is this target up for requests */
                           ltd_activate:1,/* should  target be activated */
                           ltd_reap:1;  /* should this target be deleted */
@@ -182,6 +183,9 @@ struct lod_device {
        /* Description of MDT */
        struct lod_tgt_descs  lod_mdt_descs;
 
+       /* Recovery thread for lod_child */
+       struct ptlrpc_thread    lod_child_recovery_thread;
+
        /* maximum EA size underlied OSD may have */
        unsigned int          lod_osd_max_easize;
 
@@ -296,6 +300,7 @@ struct lod_thread_info {
        struct lu_name    lti_name;
        struct lu_buf     lti_linkea_buf;
        struct dt_insert_rec lti_dt_rec;
+       struct llog_catid lti_cid;
 };
 
 extern const struct lu_device_operations lod_lu_ops;
@@ -386,10 +391,17 @@ lod_name_get(const struct lu_env *env, const void *area, int len)
        if ((__dev)->lod_osts_size > 0) \
                cfs_foreach_bit((__dev)->lod_ost_bitmap, (index))
 
+#define lod_foreach_mdt(mdt_dev, index)        \
+       cfs_foreach_bit((mdt_dev)->lod_mdt_bitmap, (index))
+
 /* lod_dev.c */
 extern struct kmem_cache *lod_object_kmem;
 int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
                   const struct lu_fid *fid, __u32 *tgt, int *flags);
+int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
+                     struct dt_device *dt);
+void lod_sub_fini_llog(const struct lu_env *env,
+                      struct dt_device *dt, struct ptlrpc_thread *thread);
 /* lod_lov.c */
 void lod_getref(struct lod_tgt_descs *ltd);
 void lod_putref(struct lod_device *lod, struct lod_tgt_descs *ltd);
@@ -565,4 +577,7 @@ int lod_sub_object_declare_punch(const struct lu_env *env,
                                 struct thandle *th);
 int lod_sub_object_punch(const struct lu_env *env, struct dt_object *dt,
                         __u64 start, __u64 end, struct thandle *th);
+
+int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod,
+                     struct dt_device *dt, int index);
 #endif
index c4a6544..cb33955 100644 (file)
@@ -346,7 +346,7 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
                }
        }
 
-       if (!strcmp(LUSTRE_OSC_NAME, type)) {
+       if (for_ost) {
                /* pool and qos are not supported for MDS stack yet */
                rc = lod_ost_pool_add(&lod->lod_pool_info, index,
                                      lod->lod_osts_size);
@@ -378,13 +378,25 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
        if (lod->lod_recovery_completed)
                ldev->ld_ops->ldo_recovery_complete(env, ldev);
 
+       if (!for_ost && lod->lod_initialized) {
+               rc = lod_sub_init_llog(env, lod, tgt_desc->ltd_tgt);
+               if (rc != 0) {
+                       CERROR("%s: cannot start llog on %s:rc = %d\n",
+                              lod2obd(lod)->obd_name, osp, rc);
+                       GOTO(out_pool, rc);
+               }
+       }
+
        rc = lfsck_add_target(env, lod->lod_child, d, exp, index, for_ost);
-       if (rc != 0)
+       if (rc != 0) {
                CERROR("Fail to add LFSCK target: name = %s, type = %s, "
                       "index = %u, rc = %d\n", osp, type, index, rc);
-
+               GOTO(out_fini_llog, rc);
+       }
        RETURN(rc);
-
+out_fini_llog:
+       lod_sub_fini_llog(env, tgt_desc->ltd_tgt,
+                         tgt_desc->ltd_recovery_thread);
 out_pool:
        lod_ost_pool_remove(&lod->lod_pool_info, index);
 out_mutex:
index a703944..d6f1f32 100644 (file)
@@ -50,6 +50,7 @@
 #include <lustre_param.h>
 #include <md_object.h>
 #include <lustre_linkea.h>
+#include <lustre_log.h>
 
 #include "lod_internal.h"
 
@@ -869,3 +870,44 @@ int lod_sub_object_punch(const struct lu_env *env, struct dt_object *dt,
 
        RETURN(rc);
 }
+
+int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod,
+                     struct dt_device *dt, int index)
+{
+       struct lod_thread_info  *lti = lod_env_info(env);
+       struct llog_ctxt        *ctxt;
+       struct llog_handle      *lgh;
+       struct llog_catid       *cid = &lti->lti_cid;
+       struct lu_fid           *fid = &lti->lti_fid;
+       struct obd_device       *obd;
+       int                     rc;
+       ENTRY;
+
+       lu_update_log_fid(fid, index);
+       fid_to_logid(fid, &cid->lci_logid);
+
+       obd = dt->dd_lu_dev.ld_obd;
+       ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
+       LASSERT(ctxt != NULL);
+       ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
+
+       rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
+                      LLOG_OPEN_EXISTS);
+       if (rc < 0) {
+               llog_ctxt_put(ctxt);
+               RETURN(rc);
+       }
+
+       LASSERT(lgh != NULL);
+       ctxt->loc_handle = lgh;
+
+       rc = llog_cat_init_and_process(env, lgh);
+       if (rc != 0) {
+               llog_cat_close(env, ctxt->loc_handle);
+               ctxt->loc_handle = NULL;
+       }
+
+       llog_ctxt_put(ctxt);
+
+       RETURN(rc);
+}
index 81d106a..caa9145 100644 (file)
@@ -5008,7 +5008,8 @@ static int mdt_obd_connect(const struct lu_env *env,
         *      at some point we should find a better one
         */
        if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state) && data != NULL &&
-           !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
+           !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) &&
+           !(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS)) {
                rc = obd_get_info(env, mdt->mdt_child_exp,
                                  sizeof(KEY_OSP_CONNECTED),
                                  KEY_OSP_CONNECTED, NULL, NULL);
index 7795d03..e0763da 100644 (file)
@@ -58,6 +58,8 @@ struct llog_thread_info {
        loff_t                           lgi_off;
        struct llog_logid_rec            lgi_logid;
        struct dt_insert_rec             lgi_dt_rec;
+       struct lu_seq_range              lgi_range;
+       char                             lgi_name[32];
 };
 
 extern struct lu_context_key llog_thread_key;
index f1e14ee..2fc816f 100644 (file)
@@ -165,28 +165,32 @@ int llog_setup(const struct lu_env *env, struct obd_device *obd,
         ctxt->loc_idx = index;
         ctxt->loc_logops = op;
        mutex_init(&ctxt->loc_mutex);
-        ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
-        ctxt->loc_flags = LLOG_CTXT_FLAG_UNINITIALIZED;
+       if (disk_obd != NULL)
+               ctxt->loc_exp = class_export_get(disk_obd->obd_self_export);
+       else
+               ctxt->loc_exp = class_export_get(obd->obd_self_export);
+
+       ctxt->loc_flags = LLOG_CTXT_FLAG_UNINITIALIZED;
 
         rc = llog_group_set_ctxt(olg, ctxt, index);
         if (rc) {
                 llog_ctxt_destroy(ctxt);
                 if (rc == -EEXIST) {
                         ctxt = llog_group_get_ctxt(olg, index);
-                        if (ctxt) {
-                                /*
-                                 * mds_lov_update_desc() might call here multiple
-                                 * times. So if the llog is already set up then
-                                 * don't to do it again. 
-                                 */
-                                CDEBUG(D_CONFIG, "obd %s ctxt %d already set up\n",
-                                       obd->obd_name, index);
-                                LASSERT(ctxt->loc_olg == olg);
-                                LASSERT(ctxt->loc_obd == obd);
-                                LASSERT(ctxt->loc_exp == disk_obd->obd_self_export);
-                                LASSERT(ctxt->loc_logops == op);
-                                llog_ctxt_put(ctxt);
-                        }
+                       if (ctxt) {
+                               CDEBUG(D_CONFIG, "%s: ctxt %d already set up\n",
+                                      obd->obd_name, index);
+                               LASSERT(ctxt->loc_olg == olg);
+                               LASSERT(ctxt->loc_obd == obd);
+                               if (disk_obd != NULL)
+                                       LASSERT(ctxt->loc_exp ==
+                                               disk_obd->obd_self_export);
+                               else
+                                       LASSERT(ctxt->loc_exp ==
+                                               obd->obd_self_export);
+                               LASSERT(ctxt->loc_logops == op);
+                               llog_ctxt_put(ctxt);
+                       }
                         rc = 0;
                 }
                 RETURN(rc);
index b4ef51a..9aa79de 100644 (file)
@@ -562,7 +562,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                GOTO(out, rc);
 
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
-       lgi->lgi_off = lgi->lgi_attr.la_size;
+       lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, lgi->lgi_off);
        lgi->lgi_buf.lb_len = reclen;
        lgi->lgi_buf.lb_buf = rec;
        rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
@@ -970,7 +970,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        struct dt_object                *o;
        struct dt_device                *dt;
        struct ls_device                *ls;
-       struct local_oid_storage        *los;
+       struct local_oid_storage        *los = NULL;
        int                              rc = 0;
 
        ENTRY;
@@ -981,6 +981,25 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        LASSERT(ctxt->loc_exp->exp_obd);
        dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
        LASSERT(dt);
+       if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               if (logid != NULL) {
+                       logid_to_fid(logid, &lgi->lgi_fid);
+               } else {
+                       /* If logid == NULL, then it means the caller needs
+                        * to allocate new FID (llog_cat_declare_add_rec()). */
+                       rc = obd_fid_alloc(env, ctxt->loc_exp,
+                                          &lgi->lgi_fid, NULL);
+                       if (rc < 0)
+                               RETURN(rc);
+                       rc = 0;
+               }
+
+               o = dt_locate(env, dt, &lgi->lgi_fid);
+               if (IS_ERR(o))
+                       RETURN(PTR_ERR(o));
+
+               goto after_open;
+       }
 
        ls = ls_device_get(dt);
        if (IS_ERR(ls))
@@ -1030,6 +1049,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        if (IS_ERR(o))
                GOTO(out_name, rc = PTR_ERR(o));
 
+after_open:
        /* No new llog is expected but doesn't exist */
        if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o))
                GOTO(out_put, rc = -ENOENT);
@@ -1047,7 +1067,8 @@ out_name:
        if (handle->lgh_name != NULL)
                OBD_FREE(handle->lgh_name, strlen(name) + 1);
 out:
-       dt_los_put(los);
+       if (los != NULL)
+               dt_los_put(los);
        RETURN(rc);
 }
 
@@ -1069,6 +1090,103 @@ static int llog_osd_exist(struct llog_handle *handle)
 }
 
 /**
+ * Get dir for regular fid log object
+ *
+ * Get directory for regular fid log object, and these regular fid log
+ * object will be inserted under this directory, to satisfy the FS
+ * consistency check, e2fsck etc.
+ *
+ * \param [in] env     execution environment
+ * \param [in] dto     llog object
+ *
+ * \retval             pointer to the directory if it is found.
+ * \retval             ERR_PTR(negative errno) if it fails.
+ */
+struct dt_object *llog_osd_get_regular_fid_dir(const struct lu_env *env,
+                                              struct dt_object *dto)
+{
+       struct llog_thread_info *lgi = llog_info(env);
+       struct seq_server_site *ss = dto->do_lu.lo_dev->ld_site->ld_seq_site;
+       struct lu_seq_range     *range = &lgi->lgi_range;
+       struct lu_fid           *dir_fid = &lgi->lgi_fid;
+       struct dt_object        *dir;
+       int                     rc;
+       ENTRY;
+
+       fld_range_set_any(range);
+       LASSERT(ss != NULL);
+       rc = ss->ss_server_fld->lsf_seq_lookup(env, ss->ss_server_fld,
+                                  fid_seq(lu_object_fid(&dto->do_lu)), range);
+       if (rc < 0)
+               RETURN(ERR_PTR(rc));
+
+       lu_update_log_dir_fid(dir_fid, range->lsr_index);
+       dir = dt_locate(env, lu2dt_dev(dto->do_lu.lo_dev), dir_fid);
+       if (IS_ERR(dir))
+               RETURN(dir);
+
+       if (!dt_try_as_dir(env, dir)) {
+               lu_object_put(env, &dir->do_lu);
+               RETURN(ERR_PTR(-ENOTDIR));
+       }
+
+       RETURN(dir);
+}
+
+/**
+ * Add llog object with regular FID to name entry
+ *
+ * Add llog object with regular FID to name space, and each llog
+ * object on each MDT will be /update_log_dir/[seq:oid:ver],
+ * so to satisfy the namespace consistency check, e2fsck etc.
+ *
+ * \param [in] env     execution environment
+ * \param [in] dto     llog object
+ * \param [in] th      thandle
+ * \param [in] declare if it is declare or execution
+ *
+ * \retval             0 if insertion succeeds.
+ * \retval             negative errno if insertion fails.
+ */
+static int
+llog_osd_regular_fid_add_name_entry(const struct lu_env *env,
+                                   struct dt_object *dto,
+                                   struct thandle *th, bool declare)
+{
+       struct llog_thread_info *lgi = llog_info(env);
+       const struct lu_fid     *fid = lu_object_fid(&dto->do_lu);
+       struct dt_insert_rec    *rec = &lgi->lgi_dt_rec;
+       struct dt_object        *dir;
+       char                    *name = lgi->lgi_name;
+       int                     rc;
+       ENTRY;
+
+       if (!fid_is_norm(fid))
+               RETURN(0);
+
+       dir = llog_osd_get_regular_fid_dir(env, dto);
+       if (IS_ERR(dir))
+               RETURN(PTR_ERR(dir));
+
+       rec->rec_fid = fid;
+       rec->rec_type = S_IFREG;
+       snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid));
+       dt_write_lock(env, dir, 0);
+       if (declare) {
+               rc = dt_declare_insert(env, dir, (struct dt_rec *)rec,
+                              (struct dt_key *)name, th);
+       } else {
+               rc = dt_insert(env, dir, (struct dt_rec *)rec,
+                              (struct dt_key *)name, th, 1);
+       }
+       dt_write_unlock(env, dir);
+
+       lu_object_put(env, &dir->do_lu);
+       RETURN(rc);
+}
+
+
+/**
  * Implementation of the llog_operations::lop_declare_create
  *
  * This function declares the llog create. It declares also name insert
@@ -1100,6 +1218,24 @@ static int llog_osd_declare_create(const struct lu_env *env,
        if (dt_object_exists(o))
                RETURN(0);
 
+       if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               struct llog_thread_info *lgi = llog_info(env);
+
+               lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE;
+               lgi->lgi_attr.la_size = 0;
+               lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
+               lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
+
+               rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL,
+                                      &lgi->lgi_dof, th);
+               if (rc < 0)
+                       RETURN(rc);
+
+
+               rc = llog_osd_regular_fid_add_name_entry(env, o, th, true);
+
+               RETURN(rc);
+       }
        los = res->private_data;
        LASSERT(los);
 
@@ -1163,6 +1299,26 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
        if (dt_object_exists(o))
                RETURN(-EEXIST);
 
+       if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               struct llog_thread_info *lgi = llog_info(env);
+
+               lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE | LA_TYPE;
+               lgi->lgi_attr.la_size = 0;
+               lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
+               lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
+
+               dt_write_lock(env, o, 0);
+               rc = dt_create(env, o, &lgi->lgi_attr, NULL,
+                              &lgi->lgi_dof, th);
+               dt_write_unlock(env, o);
+               if (rc < 0)
+                       RETURN(rc);
+
+               rc = llog_osd_regular_fid_add_name_entry(env, o, th, false);
+
+               RETURN(rc);
+       }
+
        los = res->private_data;
        LASSERT(los);
 
@@ -1223,6 +1379,10 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
 
        lu_object_put(env, &handle->lgh_obj->do_lu);
 
+       if (handle->lgh_ctxt->loc_flags &
+           LLOG_CTXT_FLAG_NORMAL_FID)
+               RETURN(rc);
+
        los = handle->private_data;
        LASSERT(los);
        dt_los_put(los);
@@ -1234,6 +1394,54 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
 }
 
 /**
+ * delete llog object name entry
+ *
+ * Delete llog object (with regular FID) from name space (under
+ * update_log_dir).
+ *
+ * \param [in] env     execution environment
+ * \param [in] dto     llog object
+ * \param [in] th      thandle
+ * \param [in] declare if it is declare or execution
+ *
+ * \retval             0 if deletion succeeds.
+ * \retval             negative errno if deletion fails.
+ */
+static int
+llog_osd_regular_fid_del_name_entry(const struct lu_env *env,
+                                   struct dt_object *dto,
+                                   struct thandle *th, bool declare)
+{
+       struct llog_thread_info *lgi = llog_info(env);
+       const struct lu_fid     *fid = lu_object_fid(&dto->do_lu);
+       struct dt_object        *dir;
+       char                    *name = lgi->lgi_name;
+       int                     rc;
+       ENTRY;
+
+       if (!fid_is_norm(fid))
+               RETURN(0);
+
+       dir = llog_osd_get_regular_fid_dir(env, dto);
+       if (IS_ERR(dir))
+               RETURN(PTR_ERR(dir));
+
+       snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid));
+       dt_write_lock(env, dir, 0);
+       if (declare) {
+               rc = dt_declare_delete(env, dir, (struct dt_key *)name,
+                                      th);
+       } else {
+               rc = dt_delete(env, dir, (struct dt_key *)name, th);
+       }
+       dt_write_unlock(env, dir);
+
+       lu_object_put(env, &dir->do_lu);
+       RETURN(rc);
+}
+
+
+/**
  * Implementation of the llog_operations::lop_destroy
  *
  * This function destroys the llog and deletes also entry in the
@@ -1293,6 +1501,12 @@ static int llog_osd_destroy(const struct lu_env *env,
        if (rc)
                GOTO(out_trans, rc);
 
+       if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               rc = llog_osd_regular_fid_del_name_entry(env, o, th, true);
+               if (rc < 0)
+                       GOTO(out_trans, rc);
+       }
+
        rc = dt_trans_start_local(env, d, th);
        if (rc)
                GOTO(out_trans, rc);
@@ -1316,6 +1530,14 @@ static int llog_osd_destroy(const struct lu_env *env,
                rc = dt_destroy(env, o, th);
                if (rc)
                        GOTO(out_unlock, rc);
+
+               if (loghandle->lgh_ctxt->loc_flags &
+                                               LLOG_CTXT_FLAG_NORMAL_FID) {
+                       rc = llog_osd_regular_fid_del_name_entry(env, o, th,
+                                                                false);
+                       if (rc < 0)
+                               GOTO(out_unlock, rc);
+               }
        }
 out_unlock:
        dt_write_unlock(env, o);
@@ -1356,6 +1578,9 @@ static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd,
        ctxt = llog_ctxt_get(olg->olg_ctxts[ctxt_idx]);
        LASSERT(ctxt);
 
+       if (disk_obd == NULL)
+               GOTO(out, rc = 0);
+
        /* initialize data allowing to generate new fids,
         * literally we need a sequece */
        lgi->lgi_fid.f_seq = FID_SEQ_LLOG;
@@ -1427,6 +1652,25 @@ struct llog_operations llog_osd_ops = {
 };
 EXPORT_SYMBOL(llog_osd_ops);
 
+struct llog_operations llog_common_cat_ops = {
+       .lop_next_block         = llog_osd_next_block,
+       .lop_prev_block         = llog_osd_prev_block,
+       .lop_read_header        = llog_osd_read_header,
+       .lop_destroy            = llog_osd_destroy,
+       .lop_setup              = llog_osd_setup,
+       .lop_cleanup            = llog_osd_cleanup,
+       .lop_open               = llog_osd_open,
+       .lop_exist              = llog_osd_exist,
+       .lop_declare_create     = llog_osd_declare_create,
+       .lop_create             = llog_osd_create,
+       .lop_declare_write_rec  = llog_osd_declare_write_rec,
+       .lop_write_rec          = llog_osd_write_rec,
+       .lop_close              = llog_osd_close,
+       .lop_add                = llog_cat_add_rec,
+       .lop_declare_add        = llog_cat_declare_add_rec,
+};
+EXPORT_SYMBOL(llog_common_cat_ops);
+
 /**
  * Read the special file which contains the list of llog catalogs IDs
  *
index 1d6db3c..27a8643 100644 (file)
@@ -650,6 +650,7 @@ __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di);
 int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
                   void *key_rec);
 int osp_it_next_page(const struct lu_env *env, struct dt_it *di);
+int osp_oac_init(struct osp_object *obj);
 /* osp_md_object.c */
 int osp_md_declare_object_create(const struct lu_env *env,
                                 struct dt_object *dt,
index 47ec90a..11272e1 100644 (file)
@@ -86,6 +86,18 @@ static int osp_object_create_interpreter(const struct lu_env *env,
                obj->opo_obj.do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
                obj->opo_non_exist = 1;
        }
+
+       /* Invalid the opo cache for the object after the object
+        * is being created, so attr_get will try to get attr
+        * from the remote object. XXX this can be improved when
+        * we have object lock/cache invalidate mechanism in OSP
+        * layer */
+       if (obj->opo_ooa != NULL) {
+               spin_lock(&obj->opo_lock);
+               obj->opo_ooa->ooa_attr.la_valid = 0;
+               spin_unlock(&obj->opo_lock);
+       }
+
        return 0;
 }
 
@@ -112,6 +124,15 @@ int osp_md_declare_object_create(const struct lu_env *env,
                                 struct dt_object_format *dof,
                                 struct thandle *th)
 {
+       struct osp_object *obj = dt2osp_obj(dt);
+       int               rc;
+
+       if (obj->opo_ooa == NULL) {
+               rc = osp_oac_init(obj);
+               if (rc != 0)
+                       return rc;
+       }
+
        return osp_trans_update_request_create(th);
 }
 
@@ -177,11 +198,13 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
                         struct dt_object_format *dof, struct thandle *th)
 {
        struct dt_update_request        *update;
+       struct osp_object               *obj = dt2osp_obj(dt);
        int                             rc;
 
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
+       LASSERT(attr->la_valid & LA_TYPE);
        rc = osp_update_rpc_pack(env, create, update, OUT_CREATE,
                                 lu_object_fid(&dt->do_lu), attr, hint, dof);
        if (rc != 0)
@@ -195,6 +218,9 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
 
        dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
        dt2osp_obj(dt)->opo_non_exist = 0;
+
+       LASSERT(obj->opo_ooa != NULL);
+       obj->opo_ooa->ooa_attr = *attr;
 out:
        return rc;
 }
@@ -1091,8 +1117,10 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
                            const struct lu_buf *buf, loff_t *pos,
                            struct thandle *th, int ignore_quota)
 {
+       struct osp_object *obj = dt2osp_obj(dt);
        struct dt_update_request  *update;
        ssize_t                   rc;
+       ENTRY;
 
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
@@ -1100,15 +1128,94 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
        rc = osp_update_rpc_pack(env, write, update, OUT_WRITE,
                                 lu_object_fid(&dt->do_lu), buf, *pos);
        if (rc < 0)
-               return rc;
+               RETURN(rc);
+
+       CDEBUG(D_INFO, "write "DFID" offset = "LPU64" length = %zu\n",
+              PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len);
 
        /* XXX: how about the write error happened later? */
        *pos += buf->lb_len;
-       return buf->lb_len;
+
+       if (obj->opo_ooa != NULL &&
+           obj->opo_ooa->ooa_attr.la_valid & LA_SIZE &&
+           obj->opo_ooa->ooa_attr.la_size < *pos)
+               obj->opo_ooa->ooa_attr.la_size = *pos;
+
+       RETURN(buf->lb_len);
+}
+
+static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt,
+                          struct lu_buf *rbuf, loff_t *pos)
+{
+       struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
+       struct dt_device        *dt_dev = &osp->opd_dt_dev;
+       struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
+       struct dt_update_request   *update;
+       struct object_update_reply *reply;
+       struct out_read_reply      *orr;
+       struct ptlrpc_request      *req = NULL;
+       int                        rc;
+       ENTRY;
+
+       /* Because it needs send the update buffer right away,
+        * just create an update buffer, instead of attaching the
+        * update_remote list of the thandle. */
+       update = dt_update_request_create(dt_dev);
+       if (IS_ERR(update))
+               RETURN(PTR_ERR(update));
+
+       rc = osp_update_rpc_pack(env, read, update, OUT_READ,
+                                lu_object_fid(&dt->do_lu), rbuf->lb_len, *pos);
+       if (rc != 0) {
+               CERROR("%s: cannot insert update: rc = %d\n",
+                      dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
+               GOTO(out, rc);
+       }
+
+       rc = osp_remote_sync(env, osp, update, &req);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       reply = req_capsule_server_sized_get(&req->rq_pill,
+                                            &RMF_OUT_UPDATE_REPLY,
+                                            OUT_UPDATE_REPLY_SIZE);
+       if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
+               CERROR("%s: invalid update reply magic %x expected %x:"
+                      " rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name,
+                      reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
+               GOTO(out, rc = -EPROTO);
+       }
+
+       rc = object_update_result_data_get(reply, lbuf, 0);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       if (lbuf->lb_len < sizeof(*orr))
+               GOTO(out, rc = -EPROTO);
+
+       orr = lbuf->lb_buf;
+       orr_le_to_cpu(orr, orr);
+
+       *pos = orr->orr_offset;
+
+       if (orr->orr_size > rbuf->lb_len)
+               GOTO(out, rc = -EPROTO);
+
+       memcpy(rbuf->lb_buf, orr->orr_data, orr->orr_size);
+
+       GOTO(out, rc = orr->orr_size);
+out:
+       if (req != NULL)
+               ptlrpc_req_finished(req);
+
+       dt_update_request_destroy(update);
+
+       return rc;
 }
 
 /* These body operation will be used to write symlinks during migration etc */
 struct dt_body_operations osp_md_body_ops = {
        .dbo_declare_write      = osp_md_declare_write,
        .dbo_write              = osp_md_write,
+       .dbo_read               = osp_md_read,
 };
index 1ea3461..026fdb3 100644 (file)
@@ -168,7 +168,7 @@ static void osp_object_assign_fid(const struct lu_env *env,
  * \retval             0 for success
  * \retval             negative error number on failure
  */
-static int osp_oac_init(struct osp_object *obj)
+int osp_oac_init(struct osp_object *obj)
 {
        struct osp_object_attr *ooa;
 
index d5eaeb2..c7406a8 100644 (file)
@@ -1233,7 +1233,8 @@ static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
               POSTID(&osi->osi_cid.lci_logid.lgl_oi),
               osi->osi_cid.lci_logid.lgl_ogen);
 
-       rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, obd,
+       rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
+                       d->opd_storage->dd_lu_dev.ld_obd,
                        &osp_mds_ost_orig_logops);
        if (rc)
                RETURN(rc);
index 66af5c3..d0a8c39 100644 (file)
@@ -645,6 +645,11 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
        if (rc != 0)
                RETURN(rc);
 
+       /* This will only be called with read-only update, and these updates
+        * might be used to retrieve update log during recovery process, so
+        * it will be allowed to send during recovery process */
+       req->rq_allow_replay = 1;
+
        /* Note: some dt index api might return non-zero result here, like
         * osd_index_ea_lookup, so we should only check rc < 0 here */
        rc = ptlrpc_queue_wait(req);
index 2f0239f..7e85579 100644 (file)
@@ -456,6 +456,14 @@ void lustre_assert_wire_constants(void)
                 (long long)OUT_INDEX_INSERT);
        LASSERTF(OUT_INDEX_DELETE == 11, "found %lld\n",
                 (long long)OUT_INDEX_DELETE);
+       LASSERTF(OUT_WRITE == 12, "found %lld\n",
+                (long long)OUT_WRITE);
+       LASSERTF(OUT_XATTR_DEL == 13, "found %lld\n",
+                (long long)OUT_XATTR_DEL);
+       LASSERTF(OUT_PUNCH == 14, "found %lld\n",
+                (long long)OUT_PUNCH);
+       LASSERTF(OUT_READ == 15, "found %lld\n",
+                (long long)OUT_READ);
 
        /* Checks for struct hsm_attrs */
        LASSERTF((int)sizeof(struct hsm_attrs) == 24, "found %lld\n",
@@ -492,10 +500,10 @@ void lustre_assert_wire_constants(void)
                 (long long)FID_SEQ_LLOG);
        LASSERTF(FID_SEQ_ECHO == 2, "found %lld\n",
                 (long long)FID_SEQ_ECHO);
-       LASSERTF(FID_SEQ_OST_MDT1 == 3, "found %lld\n",
-                (long long)FID_SEQ_OST_MDT1);
-       LASSERTF(FID_SEQ_OST_MAX == 9, "found %lld\n",
-                (long long)FID_SEQ_OST_MAX);
+       LASSERTF(FID_SEQ_UNUSED_START == 3, "found %lld\n",
+                (long long)FID_SEQ_UNUSED_START);
+       LASSERTF(FID_SEQ_UNUSED_END == 9, "found %lld\n",
+                (long long)FID_SEQ_UNUSED_END);
        LASSERTF(FID_SEQ_RSVD == 11, "found %lld\n",
                 (long long)FID_SEQ_RSVD);
        LASSERTF(FID_SEQ_IGIF == 12, "found %lld\n",
@@ -520,6 +528,10 @@ void lustre_assert_wire_constants(void)
                        (long long)FID_SEQ_QUOTA_GLB);
        LASSERTF(FID_SEQ_ROOT == 0x0000000200000007ULL, "found 0x%.16llxULL\n",
                        (long long)FID_SEQ_ROOT);
+       LASSERTF(FID_SEQ_LAYOUT_RBTREE == 0x0000000200000008ULL, "found 0x%.16llxULL\n",
+                       (long long)FID_SEQ_LAYOUT_RBTREE);
+       LASSERTF(FID_SEQ_UPDATE_LOG == 0x0000000200000009ULL, "found 0x%.16llxULL\n",
+                       (long long)FID_SEQ_UPDATE_LOG);
        LASSERTF(FID_SEQ_NORMAL == 0x0000000200000400ULL, "found 0x%.16llxULL\n",
                        (long long)FID_SEQ_NORMAL);
        LASSERTF(FID_SEQ_LOV_DEFAULT == 0xffffffffffffffffULL, "found 0x%.16llxULL\n",
@@ -1470,10 +1482,10 @@ void lustre_assert_wire_constants(void)
        LASSERTF((int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_layout_gen) == 2, "found %lld\n",
                 (long long)(int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_layout_gen));
        CLASSERT(LOV_MAXPOOLNAME == 15);
-       LASSERTF((int)offsetof(struct lov_mds_md_v3, lmm_pool_name[16]) == 48, "found %lld\n",
-                (long long)(int)offsetof(struct lov_mds_md_v3, lmm_pool_name[16]));
-       LASSERTF((int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_pool_name[16]) == 1, "found %lld\n",
-                (long long)(int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_pool_name[16]));
+       LASSERTF((int)offsetof(struct lov_mds_md_v3, lmm_pool_name[15 + 1]) == 48, "found %lld\n",
+                (long long)(int)offsetof(struct lov_mds_md_v3, lmm_pool_name[15 + 1]));
+       LASSERTF((int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_pool_name[15 + 1]) == 1, "found %lld\n",
+                (long long)(int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_pool_name[15 + 1]));
        LASSERTF((int)offsetof(struct lov_mds_md_v3, lmm_objects[0]) == 48, "found %lld\n",
                 (long long)(int)offsetof(struct lov_mds_md_v3, lmm_objects[0]));
        LASSERTF((int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_objects[0]) == 24, "found %lld\n",
@@ -1523,10 +1535,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding3));
        LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding3) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding3));
-       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_pool_name[16]) == 56, "found %lld\n",
-                (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_pool_name[16]));
-       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_pool_name[16]) == 1, "found %lld\n",
-                (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_pool_name[16]));
+       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_pool_name[15]) == 55, "found %lld\n",
+                (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_pool_name[15]));
+       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_pool_name[15]) == 1, "found %lld\n",
+                (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_pool_name[15]));
        LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_stripe_fids[0]) == 56, "found %lld\n",
                 (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_stripe_fids[0]));
        LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_stripe_fids[0]) == 16, "found %lld\n",
@@ -3778,7 +3790,9 @@ void lustre_assert_wire_constants(void)
        CLASSERT(LLOG_CHANGELOG_REPL_CTXT == 13);
        CLASSERT(LLOG_CHANGELOG_USER_ORIG_CTXT == 14);
        CLASSERT(LLOG_AGENT_ORIG_CTXT == 15);
-       CLASSERT(LLOG_MAX_CTXTS == 16);
+       CLASSERT(LLOG_UPDATELOG_ORIG_CTXT == 16);
+       CLASSERT(LLOG_UPDATELOG_REPL_CTXT == 17);
+       CLASSERT(LLOG_MAX_CTXTS == 18);
 
        /* Checks for struct llogd_conn_body */
        LASSERTF((int)sizeof(struct llogd_conn_body) == 40, "found %lld\n",
index 68439f1..635ea99 100644 (file)
@@ -1243,6 +1243,10 @@ static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
        struct dt_object *dt_obj = arg->object;
        int rc;
 
+       CDEBUG(D_INFO, "write "DFID" pos "LPU64" buf %p, len %lu\n",
+              PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos,
+              arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len);
+
        dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
        rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
                             &arg->u.write.pos, th);
@@ -1325,6 +1329,77 @@ static int out_write(struct tgt_session_info *tsi)
        RETURN(rc);
 }
 
+static int out_read(struct tgt_session_info *tsi)
+{
+       const struct lu_env     *env = tsi->tsi_env;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct object_update    *update = tti->tti_u.update.tti_update;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
+       struct object_update_reply *reply = tti->tti_u.update.tti_update_reply;
+       int             index = tti->tti_u.update.tti_update_reply_index;
+       struct object_update_result *update_result;
+       struct lu_buf           *lbuf = &tti->tti_buf;
+       struct out_read_reply   *orr;
+       void                    *tmp;
+       size_t                  size;
+       __u64                   pos;
+       int                      rc;
+       ENTRY;
+
+       update_result = object_update_result_get(reply, index, NULL);
+       LASSERT(update_result != NULL);
+       update_result->our_datalen = sizeof(*orr);
+
+       if (!lu_object_exists(&obj->do_lu))
+               GOTO(out, rc = -ENOENT);
+
+       tmp = object_update_param_get(update, 0, NULL);
+       if (tmp == NULL) {
+               CERROR("%s: empty size for read: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
+               GOTO(out, rc = err_serious(-EPROTO));
+       }
+       size = le64_to_cpu(*(size_t *)(tmp));
+
+       tmp = object_update_param_get(update, 1, NULL);
+       if (tmp == NULL) {
+               CERROR("%s: empty pos for read: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
+               GOTO(out, rc = err_serious(-EPROTO));
+       }
+       pos = le64_to_cpu(*(__u64 *)(tmp));
+
+       if (size > OUT_UPDATE_REPLY_SIZE -
+                  cfs_size_round((unsigned long)update_result->our_data -
+                                 (unsigned long)update_result) - sizeof(pos)) {
+               CERROR("%s: get %zu the biggest read size is %d: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), size, OUT_UPDATE_REPLY_SIZE,
+                      -EPROTO);
+               GOTO(out, rc = err_serious(-EPROTO));
+       }
+
+       /* Put the offset into the begining of the buffer in reply */
+       orr = (struct out_read_reply *)update_result->our_data;
+
+       lbuf->lb_buf = orr->orr_data;
+       lbuf->lb_len = size;
+
+       dt_read_lock(env, obj, MOR_TGT_CHILD);
+       rc = dt_read(env, obj, lbuf, &pos);
+       dt_read_unlock(env, obj);
+       orr->orr_size = rc < 0 ? 0 : rc;
+       orr->orr_offset = pos;
+
+       orr_cpu_to_le(orr, orr);
+       update_result->our_datalen += orr->orr_size;
+out:
+       /* Insert read buffer */
+       update_result->our_rc = ptlrpc_status_hton(rc);
+       reply->ourp_lens[index] = cfs_size_round(update_result->our_datalen +
+                                                sizeof(*update_result));
+       RETURN(rc);
+}
+
 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
 [opc - OUT_CREATE] = {                                 \
        .th_name    = name,                             \
@@ -1362,6 +1437,8 @@ static struct tgt_handler out_update_ops[] = {
        DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
                     MUTABOR | HABEO_REFERO, out_index_delete),
        DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
+       DEF_OUT_HNDL(OUT_READ, "out_read", HABEO_REFERO, out_read),
+
 };
 
 static struct tgt_handler *out_handler_find(__u32 opc)
@@ -1479,8 +1556,13 @@ static int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta,
                       dt_obd_name(ta->ta_handle->th_dev), i, ta->ta_argno, rc);
        }
 
-       /* Only fail for real update */
-       tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
+       /* Only fail for real updates, XXX right now llog updates will be
+       * ignore, whose updates count is usually 1, so failover test
+       * case will spot this FAIL_UPDATE_NET_REP precisely, and it will
+       * be removed after async update patch is landed. */
+       if (ta->ta_argno > 1)
+               tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
+
 stop:
        rc1 = out_trans_stop(env, ta, rc);
        if (rc == 0)
index 2a2b2b5..89948cc 100644 (file)
@@ -57,6 +57,7 @@ const char *update_op_str(__u16 opc)
                [OUT_WRITE] = "write",
                [OUT_XATTR_DEL] = "xattr_del",
                [OUT_PUNCH] = "punch",
+               [OUT_READ] = "read",
        };
 
        if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL)
@@ -385,3 +386,18 @@ int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
                               fid, 1, &size, (const void **)&name);
 }
 EXPORT_SYMBOL(out_xattr_get_pack);
+
+int out_read_pack(const struct lu_env *env, struct object_update *update,
+                 size_t max_update_length, const struct lu_fid *fid,
+                 size_t size, loff_t pos)
+{
+       __u16           sizes[2] = {sizeof(size), sizeof(pos)};
+       const void      *bufs[2] = {&size, &pos};
+
+       size = cpu_to_le64(size);
+       pos = cpu_to_le64(pos);
+
+       return out_update_pack(env, update, max_update_length, OUT_READ, fid,
+                              ARRAY_SIZE(sizes), sizes, bufs);
+}
+EXPORT_SYMBOL(out_read_pack);
index 63d9dc7..fd1fce7 100644 (file)
@@ -913,6 +913,7 @@ int check_and_prepare_update_record(const struct lu_env *env,
        lur->lur_update_rec.ur_master_transno = 0;
        lur->lur_update_rec.ur_batchid = 0;
        lur->lur_update_rec.ur_flags = 0;
+       lur->lur_hdr.lrh_len = LLOG_CHUNK_SIZE;
 
        tur->tur_update_param_count = 0;
 
@@ -955,7 +956,8 @@ int merge_params_updates_buf(const struct lu_env *env,
        params_size = update_params_size(tur->tur_update_params,
                                         tur->tur_update_param_count);
        record_size = llog_update_record_size(lur);
-       if (record_size + params_size > tur->tur_update_records_buf_size) {
+       if (cfs_size_round(record_size + params_size) >
+                               tur->tur_update_records_buf_size) {
                int rc;
 
                rc = tur_update_records_extend(tur, record_size + params_size);
index 09d9d89..f56aa7c 100644 (file)
 #define DEBUG_SUBSYSTEM S_CLASS
 
 #include <lu_target.h>
+#include <lustre_log.h>
 #include <lustre_update.h>
 #include <obd.h>
 #include <obd_class.h>
 #include <tgt_internal.h>
 
 /**
+ * Declare write update to sub device
+ *
+ * Declare Write updates llog records to the sub device during distribute
+ * transaction.
+ *
+ * \param[in] env      execution environment
+ * \param[in] record   update records being written
+ * \param[in] lst      sub transaction handle
+ *
+ * \retval             0 if writing succeeds
+ * \retval             negative errno if writing fails
+ */
+static int sub_declare_updates_write(const struct lu_env *env,
+                                    struct llog_update_record *record,
+                                    struct sub_thandle *lst)
+{
+       struct llog_ctxt        *ctxt;
+       struct dt_device        *dt = lst->st_sub_th->th_dev;
+       int rc;
+
+       /* If ctxt is NULL, it means not need to write update,
+        * for example if the the OSP is used to connect to OST */
+       ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
+                               LLOG_UPDATELOG_ORIG_CTXT);
+       LASSERT(ctxt != NULL);
+
+       /* Not ready to record updates yet. */
+       if (ctxt->loc_handle == NULL) {
+               llog_ctxt_put(ctxt);
+               return 0;
+       }
+
+       rc = llog_declare_add(env, ctxt->loc_handle, &record->lur_hdr,
+                             lst->st_sub_th);
+
+       llog_ctxt_put(ctxt);
+
+       return rc;
+}
+
+/**
+ * write update to sub device
+ *
+ * Write updates llog records to the sub device during distribute
+ * transaction.
+ *
+ * \param[in] env      execution environment
+ * \param[in] record   update records being written
+ * \param[in] lst      sub transaction handle
+ *
+ * \retval             1 if writing succeeds
+ * \retval             negative errno if writing fails
+ */
+static int sub_updates_write(const struct lu_env *env,
+                            struct llog_update_record *record,
+                            struct sub_thandle *lst)
+{
+       struct llog_ctxt        *ctxt;
+       struct dt_device        *dt = lst->st_sub_th->th_dev;
+       int                     rc;
+
+       ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
+                               LLOG_UPDATELOG_ORIG_CTXT);
+       LASSERT(ctxt != NULL);
+
+       /* Not ready to record updates yet, usually happens
+        * in error handler path */
+       if (ctxt->loc_handle == NULL) {
+               llog_ctxt_put(ctxt);
+               return 0;
+       }
+
+       LASSERTF(record->lur_hdr.lrh_len == llog_update_record_size(record),
+                "lrh_len %u record_size %zu\n", record->lur_hdr.lrh_len,
+                llog_update_record_size(record));
+
+       rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr,
+                     NULL, lst->st_sub_th);
+
+       llog_ctxt_put(ctxt);
+
+       return rc;
+}
+
+/**
+ * write update transaction
+ *
+ * Check if there are updates being recorded in this transaction,
+ * it will write the record into the disk.
+ *
+ * \param[in] env      execution environment
+ * \param[in] top_th   top transaction handle
+ *
+ * \retval             0 if writing succeeds
+ * \retval             negative errno if writing fails
+ */
+static int top_updates_write(const struct lu_env *env,
+                            struct top_thandle *top_th)
+{
+       struct thandle_update_records *tur;
+       struct llog_update_record *lur;
+       struct sub_thandle      *lst;
+       int                     rc;
+       ENTRY;
+
+       if (top_th->tt_update_records == NULL)
+               RETURN(0);
+
+       tur = top_th->tt_update_records;
+
+       /* merge the parameters and updates into one buffer */
+       rc = merge_params_updates_buf(env, tur);
+       if (rc < 0)
+               RETURN(rc);
+
+       lur = tur->tur_update_records;
+       /* Dump updates to debug log */
+       update_records_dump(&lur->lur_update_rec, D_INFO, true);
+
+       /* Init update record header */
+       lur->lur_hdr.lrh_len = llog_update_record_size(lur);
+       lur->lur_hdr.lrh_type = UPDATE_REC;
+
+       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
+               if (!lst->st_record_update)
+                       continue;
+               rc = sub_updates_write(env, lur, lst);
+               if (rc < 0)
+                       break;
+       }
+
+       if (rc > 0)
+               rc = 0;
+
+       RETURN(rc);
+}
+
+/**
  * Create the top transaction.
  *
  * Create the top transaction on the master device. It will create a top
@@ -115,11 +254,23 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
        struct sub_thandle      *lst;
        int                     rc;
 
+       LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
        rc = check_and_prepare_update_record(env, th);
        if (rc < 0)
                return rc;
+       /* Check if needs to write updates */
+       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
+               struct llog_update_record *record;
+
+               if (!lst->st_record_update)
+                       continue;
+
+               record = top_th->tt_update_records->tur_update_records;
+               rc = sub_declare_updates_write(env, record, lst);
+               if (rc != 0)
+                       return rc;
+       }
 
-       LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
        list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
                lst->st_sub_th->th_sync = th->th_sync;
                lst->st_sub_th->th_local = th->th_local;
@@ -152,22 +303,22 @@ EXPORT_SYMBOL(top_trans_start);
 int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
                   struct thandle *th)
 {
-       struct sub_thandle      *lst;
        struct top_thandle      *top_th = container_of(th, struct top_thandle,
                                                       tt_super);
        struct thandle_update_records *tur = top_th->tt_update_records;
+       struct sub_thandle      *lst;
        int                     rc;
        ENTRY;
 
        /* Note: we always need walk through all of sub_transaction to do
         * transaction stop to release the resource here */
-       if (tur != NULL) {
-               rc = merge_params_updates_buf(env, tur);
-               if (rc == 0) {
-                       struct update_records *record;
-
-                       record = &tur->tur_update_records->lur_update_rec;
-                       update_records_dump(record, D_INFO, false);
+       if (tur != NULL && th->th_result == 0) {
+               rc = top_updates_write(env, top_th);
+               if (rc < 0) {
+                       CERROR("%s: cannot write updates: rc = %d\n",
+                              master_dev->dd_lu_dev.ld_obd->obd_name, rc);
+                       /* Still need call dt_trans_stop to release resources
+                        * holding by the transaction */
                }
                top_th->tt_update_records = NULL;
        }
@@ -176,7 +327,7 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
 
        top_th->tt_master_sub_thandle->th_local = th->th_local;
        top_th->tt_master_sub_thandle->th_sync = th->th_sync;
-
+       top_th->tt_master_sub_thandle->th_result = th->th_result;
        /* To avoid sending RPC while holding thandle, it always stop local
         * transaction first, then other sub thandle */
        rc = dt_trans_stop(env, master_dev, top_th->tt_master_sub_thandle);
@@ -186,6 +337,8 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
 
                if (rc != 0)
                        lst->st_sub_th->th_result = rc;
+               else
+                       lst->st_sub_th->th_result = th->th_result;
                lst->st_sub_th->th_sync = th->th_sync;
                lst->st_sub_th->th_local = th->th_local;
                rc2 = dt_trans_stop(env, lst->st_sub_th->th_dev,
index b998906..11dc37f 100644 (file)
@@ -321,9 +321,9 @@ test_1() {
 run_test 1 "start up ost twice (should return errors)"
 
 test_2() {
-       start_mdt 1 || error "MDT0 start fail"
+       start_mds || error "MDT0 start fail"
        echo "start mds second time.."
-       start_mdt 1 && error "2nd MDT start should fail"
+       start_mds && error "2nd MDT start should fail"
        start_ost || error "OST start failed"
        mount_client $MOUNT || error "mount_client failed to start client"
        check_mount || error "check_mount failed"
@@ -2567,9 +2567,14 @@ test_41a() { #bug 14134
 
        local MDSDEV=$(mdsdevname ${SINGLEMDS//mds/})
 
-       start $SINGLEMDS $MDSDEV $MDS_MOUNT_OPTS -o nosvc -n
+       start_mdt 1 -o nosvc -n
+       if [ $MDSCOUNT -ge 2 ]; then
+               for num in $(seq 2 $MDSCOUNT); do
+                       start_mdt $num || return
+               done
+       fi
        start ost1 $(ostdevname 1) $OST_MOUNT_OPTS
-       start $SINGLEMDS $MDSDEV $MDS_MOUNT_OPTS -o nomgs,force
+       start_mdt 1 -o nomgs,force
        mount_client $MOUNT || error "mount_client $MOUNT failed"
        sleep 5
 
@@ -2597,9 +2602,14 @@ test_41b() {
        reformat
        local MDSDEV=$(mdsdevname ${SINGLEMDS//mds/})
 
-       start $SINGLEMDS $MDSDEV $MDS_MOUNT_OPTS -o nosvc -n
+       start_mdt 1 -o nosvc -n
+       if [ $MDSCOUNT -ge 2 ]; then
+               for num in $(seq 2 $MDSCOUNT); do
+                       start_mdt $num || return
+               done
+       fi
        start_ost || error "Unable to start OST1"
-       start $SINGLEMDS $MDSDEV $MDS_MOUNT_OPTS -o nomgs,force
+       start_mdt 1 -o nomgs,force
        mount_client $MOUNT || error "mount_client $MOUNT failed"
        sleep 5
 
@@ -2648,7 +2658,14 @@ test_41c() {
                error "unexpected concurrent MDT mounts result, rc=$rc rc2=$rc2"
        fi
 
+       if [ $MDSCOUNT -ge 2 ]; then
+               for num in $(seq 2 $MDSCOUNT); do
+                       start_mdt $num || return
+               done
+       fi
+
        # OST concurrent start
+
        #define OBD_FAIL_TGT_DELAY_CONNECT 0x703
        do_facet ost1 "$LCTL set_param fail_loc=0x703"
        start ost1 $(ostdevname 1) $OST_MOUNT_OPTS &
@@ -2668,26 +2685,26 @@ test_41c() {
                echo "1st OST start failed with EALREADY"
                echo "2nd OST start succeed"
        else
-               stop mds1 -f
+               stop_mds -f
                stop ost1 -f
                error "unexpected concurrent OST mounts result, rc=$rc rc2=$rc2"
        fi
        # cleanup
-       stop mds1 -f
+       stop_mds
        stop ost1 -f
 
        # verify everything ok
        start_mds
        if [ $? != 0 ]
        then
-               stop mds1 -f
+               stop_mds
                error "MDT(s) start failed"
        fi
 
        start_ost
        if [ $? != 0 ]
        then
-               stop mds1 -f
+               stop_mds
                stop ost1 -f
                error "OST(s) start failed"
        fi
@@ -2695,14 +2712,14 @@ test_41c() {
        mount_client $MOUNT
        if [ $? != 0 ]
        then
-               stop mds1 -f
+               stop_mds
                stop ost1 -f
                error "client start failed"
        fi
        check_mount
        if [ $? != 0 ]
        then
-               stop mds1 -f
+               stop_mds
                stop ost1 -f
                error "client mount failed"
        fi
@@ -4289,8 +4306,9 @@ test_70a() {
        start_mdt 1 || error "MDT0 start fail"
 
        start_ost || error "OST0 start fail"
-
-       start_mdt 2 || error "MDT1 start fail"
+       for num in $(seq 2 $MDSCOUNT); do
+               start_mdt $num || return
+       done
 
        mount_client $MOUNT || error "mount client fails"
 
@@ -4310,8 +4328,7 @@ test_70b() {
 
        start_ost || error "OST0 start fail"
 
-       start_mdt 1 || error "MDT0 start fail"
-       start_mdt 2 || error "MDT1 start fail"
+       start_mds || error "MDS start fail"
 
        mount_client $MOUNT || error "mount client fails"
 
@@ -4330,8 +4347,7 @@ test_70c() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
        local MDTIDX=1
 
-       start_mdt 1 || error "MDT0 start fail"
-       start_mdt 2 || error "MDT1 start fail"
+       start_mds || error "MDS start fail"
        start_ost || error "OST0 start fail"
 
        mount_client $MOUNT || error "mount client fails"
@@ -4355,8 +4371,7 @@ test_70d() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
        local MDTIDX=1
 
-       start_mdt 1 || error "MDT0 start fail"
-       start_mdt 2 || error "MDT1 start fail"
+       start_mds || error "MDS start fail"
        start_ost || error "OST0 start fail"
 
        mount_client $MOUNT || error "mount client fails"
@@ -4388,7 +4403,10 @@ test_71a() {
 
        start_mdt 1 || error "MDT0 start fail"
        start_ost || error "OST0 start fail"
-       start_mdt 2 || error "MDT1 start fail"
+       for num in $(seq 2 $MDSCOUNT); do
+               start_mdt $num || return
+       done
+
        start_ost2 || error "OST1 start fail"
 
        mount_client $MOUNT || error "mount client fails"
@@ -4401,8 +4419,7 @@ test_71a() {
        rm -rf $DIR/$tdir || error "delete dir fail"
 
        umount_client $MOUNT || error "umount_client failed"
-       stop_mdt 1 || error "MDT0 stop fail"
-       stop_mdt 2 || error "MDT1 stop fail"
+       stop_mds || error "MDS stop fail"
        stop_ost || error "OST0 stop fail"
        stop_ost2 || error "OST1 stop fail"
 }
@@ -4415,7 +4432,9 @@ test_71b() {
        fi
        local MDTIDX=1
 
-       start_mdt 2 || error "MDT1 start fail"
+       for num in $(seq 2 $MDSCOUNT); do
+               start_mdt $num || return
+       done
        start_ost || error "OST0 start fail"
        start_mdt 1 || error "MDT0 start fail"
        start_ost2 || error "OST1 start fail"
@@ -4430,8 +4449,7 @@ test_71b() {
        rm -rf $DIR/$tdir || error "delete dir fail"
 
        umount_client $MOUNT || error "umount_client failed"
-       stop_mdt 1 || error "MDT0 stop fail"
-       stop_mdt 2 || error "MDT1 stop fail"
+       stop_mds || error "MDT0 stop fail"
        stop_ost || error "OST0 stop fail"
        stop_ost2 || error "OST1 stop fail"
 }
@@ -4446,7 +4464,9 @@ test_71c() {
 
        start_ost || error "OST0 start fail"
        start_ost2 || error "OST1 start fail"
-       start_mdt 2 || error "MDT1 start fail"
+       for num in $(seq 2 $MDSCOUNT); do
+               start_mdt $num || return
+       done
        start_mdt 1 || error "MDT0 start fail"
 
        mount_client $MOUNT || error "mount client fails"
@@ -4459,8 +4479,7 @@ test_71c() {
        rm -rf $DIR/$tdir || error "delete dir fail"
 
        umount_client $MOUNT || error "umount_client failed"
-       stop_mdt 1 || error "MDT0 stop fail"
-       stop_mdt 2 || error "MDT1 stop fail"
+       stop_mds || error "MDS stop fail"
        stop_ost || error "OST0 stop fail"
        stop_ost2 || error "OST1 stop fail"
 
@@ -4475,7 +4494,9 @@ test_71d() {
        local MDTIDX=1
 
        start_ost || error "OST0 start fail"
-       start_mdt 2 || error "MDT0 start fail"
+       for num in $(seq 2 $MDSCOUNT); do
+               start_mdt $num || return
+       done
        start_mdt 1 || error "MDT0 start fail"
        start_ost2 || error "OST1 start fail"
 
@@ -4489,8 +4510,7 @@ test_71d() {
        rm -rf $DIR/$tdir || error "delete dir fail"
 
        umount_client $MOUNT || error "umount_client failed"
-       stop_mdt 1 || error "MDT0 stop fail"
-       stop_mdt 2 || error "MDT1 stop fail"
+       stop_mds || error "MDS stop fail"
        stop_ost || error "OST0 stop fail"
        stop_ost2 || error "OST1 stop fail"
 
@@ -4505,7 +4525,9 @@ test_71e() {
        local MDTIDX=1
 
        start_ost || error "OST0 start fail"
-       start_mdt 2 || error "MDT1 start fail"
+       for num in $(seq 2 $MDSCOUNT); do
+               start_mdt $num || return
+       done
        start_ost2 || error "OST1 start fail"
        start_mdt 1 || error "MDT0 start fail"
 
@@ -4519,8 +4541,7 @@ test_71e() {
        rm -rf $DIR/$tdir || error "delete dir fail"
 
        umount_client $MOUNT || error "umount_client failed"
-       stop_mdt 1 || error "MDT0 stop fail"
-       stop_mdt 2 || error "MDT1 stop fail"
+       stop_mds || error "MDS stop fail"
        stop_ost || error "OST0 stop fail"
        stop_ost2 || error "OST1 stop fail"
 
index ea21409..14b8c14 100644 (file)
@@ -7,7 +7,10 @@
 set -e
 
 ONLY=${ONLY:-"$*"}
-ALWAYS_EXCEPT="$SANITY_LFSCK_EXCEPT"
+
+#Bug number for excepting test      6380
+ALWAYS_EXCEPT="$SANITY_LFSCK_EXCEPT 4 5 "
+
 [ "$SLOW" = "no" ] && EXCEPT_SLOW=""
 # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
 
index ef08fcf..4124c2b 100644 (file)
@@ -7,7 +7,9 @@
 set -e
 
 ONLY=${ONLY:-"$*"}
-ALWAYS_EXCEPT="$SANITY_SCRUB_EXCEPT"
+#Bug number for excepting test      6380
+ALWAYS_EXCEPT="$SANITY_SCRUB_EXCEPT 1b 1c 2 3 4a 4b 4c 5 6 7 8 9 10 15"
+
 [ "$SLOW" = "no" ] && EXCEPT_SLOW=""
 # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
 
@@ -322,8 +324,7 @@ run_test 0 "Do not auto trigger OI scrub for non-backup/restore case"
 test_1a() {
        scrub_prep 0
        echo "start $SINGLEMDS without disabling OI scrub"
-       start $SINGLEMDS $MDT_DEVNAME $MOUNT_OPTS_SCRUB > /dev/null ||
-               error "(1) Fail to start MDS!"
+       scrub_start_mds 1 "$MOUNT_OPTS_SCRUB"
 
        local FLAGS=$($SHOW_SCRUB | awk '/^flags/ { print $2 }')
        [ -z "$FLAGS" ] || error "(3) Expect empty flags, but got '$FLAGS'"
index d8b42b0..0adf066 100644 (file)
@@ -236,8 +236,8 @@ check_ost_id(void)
        CHECK_VALUE(FID_SEQ_OST_MDT0);
        CHECK_VALUE(FID_SEQ_LLOG);
        CHECK_VALUE(FID_SEQ_ECHO);
-       CHECK_VALUE(FID_SEQ_OST_MDT1);
-       CHECK_VALUE(FID_SEQ_OST_MAX);
+       CHECK_VALUE(FID_SEQ_UNUSED_START);
+       CHECK_VALUE(FID_SEQ_UNUSED_END);
        CHECK_VALUE(FID_SEQ_RSVD);
        CHECK_VALUE(FID_SEQ_IGIF);
        CHECK_VALUE_64X(FID_SEQ_IGIF_MAX);
@@ -250,6 +250,8 @@ check_ost_id(void)
        CHECK_VALUE_64X(FID_SEQ_QUOTA);
        CHECK_VALUE_64X(FID_SEQ_QUOTA_GLB);
        CHECK_VALUE_64X(FID_SEQ_ROOT);
+       CHECK_VALUE_64X(FID_SEQ_LAYOUT_RBTREE);
+       CHECK_VALUE_64X(FID_SEQ_UPDATE_LOG);
        CHECK_VALUE_64X(FID_SEQ_NORMAL);
        CHECK_VALUE_64X(FID_SEQ_LOV_DEFAULT);
 
@@ -1679,6 +1681,8 @@ check_llogd_body(void)
        CHECK_CVALUE(LLOG_CHANGELOG_REPL_CTXT);
        CHECK_CVALUE(LLOG_CHANGELOG_USER_ORIG_CTXT);
        CHECK_CVALUE(LLOG_AGENT_ORIG_CTXT);
+       CHECK_CVALUE(LLOG_UPDATELOG_ORIG_CTXT);
+       CHECK_CVALUE(LLOG_UPDATELOG_REPL_CTXT);
        CHECK_CVALUE(LLOG_MAX_CTXTS);
 }
 
@@ -2444,6 +2448,10 @@ main(int argc, char **argv)
        CHECK_VALUE(OUT_INDEX_LOOKUP);
        CHECK_VALUE(OUT_INDEX_INSERT);
        CHECK_VALUE(OUT_INDEX_DELETE);
+       CHECK_VALUE(OUT_WRITE);
+       CHECK_VALUE(OUT_XATTR_DEL);
+       CHECK_VALUE(OUT_PUNCH);
+       CHECK_VALUE(OUT_READ);
 
        check_hsm_attrs();
        check_ost_id();
index 3e41022..92a44f1 100644 (file)
@@ -465,6 +465,14 @@ void lustre_assert_wire_constants(void)
                 (long long)OUT_INDEX_INSERT);
        LASSERTF(OUT_INDEX_DELETE == 11, "found %lld\n",
                 (long long)OUT_INDEX_DELETE);
+       LASSERTF(OUT_WRITE == 12, "found %lld\n",
+                (long long)OUT_WRITE);
+       LASSERTF(OUT_XATTR_DEL == 13, "found %lld\n",
+                (long long)OUT_XATTR_DEL);
+       LASSERTF(OUT_PUNCH == 14, "found %lld\n",
+                (long long)OUT_PUNCH);
+       LASSERTF(OUT_READ == 15, "found %lld\n",
+                (long long)OUT_READ);
 
        /* Checks for struct hsm_attrs */
        LASSERTF((int)sizeof(struct hsm_attrs) == 24, "found %lld\n",
@@ -501,10 +509,10 @@ void lustre_assert_wire_constants(void)
                 (long long)FID_SEQ_LLOG);
        LASSERTF(FID_SEQ_ECHO == 2, "found %lld\n",
                 (long long)FID_SEQ_ECHO);
-       LASSERTF(FID_SEQ_OST_MDT1 == 3, "found %lld\n",
-                (long long)FID_SEQ_OST_MDT1);
-       LASSERTF(FID_SEQ_OST_MAX == 9, "found %lld\n",
-                (long long)FID_SEQ_OST_MAX);
+       LASSERTF(FID_SEQ_UNUSED_START == 3, "found %lld\n",
+                (long long)FID_SEQ_UNUSED_START);
+       LASSERTF(FID_SEQ_UNUSED_END == 9, "found %lld\n",
+                (long long)FID_SEQ_UNUSED_END);
        LASSERTF(FID_SEQ_RSVD == 11, "found %lld\n",
                 (long long)FID_SEQ_RSVD);
        LASSERTF(FID_SEQ_IGIF == 12, "found %lld\n",
@@ -529,6 +537,10 @@ void lustre_assert_wire_constants(void)
                        (long long)FID_SEQ_QUOTA_GLB);
        LASSERTF(FID_SEQ_ROOT == 0x0000000200000007ULL, "found 0x%.16llxULL\n",
                        (long long)FID_SEQ_ROOT);
+       LASSERTF(FID_SEQ_LAYOUT_RBTREE == 0x0000000200000008ULL, "found 0x%.16llxULL\n",
+                       (long long)FID_SEQ_LAYOUT_RBTREE);
+       LASSERTF(FID_SEQ_UPDATE_LOG == 0x0000000200000009ULL, "found 0x%.16llxULL\n",
+                       (long long)FID_SEQ_UPDATE_LOG);
        LASSERTF(FID_SEQ_NORMAL == 0x0000000200000400ULL, "found 0x%.16llxULL\n",
                        (long long)FID_SEQ_NORMAL);
        LASSERTF(FID_SEQ_LOV_DEFAULT == 0xffffffffffffffffULL, "found 0x%.16llxULL\n",
@@ -1479,10 +1491,10 @@ void lustre_assert_wire_constants(void)
        LASSERTF((int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_layout_gen) == 2, "found %lld\n",
                 (long long)(int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_layout_gen));
        CLASSERT(LOV_MAXPOOLNAME == 15);
-       LASSERTF((int)offsetof(struct lov_mds_md_v3, lmm_pool_name[16]) == 48, "found %lld\n",
-                (long long)(int)offsetof(struct lov_mds_md_v3, lmm_pool_name[16]));
-       LASSERTF((int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_pool_name[16]) == 1, "found %lld\n",
-                (long long)(int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_pool_name[16]));
+       LASSERTF((int)offsetof(struct lov_mds_md_v3, lmm_pool_name[15 + 1]) == 48, "found %lld\n",
+                (long long)(int)offsetof(struct lov_mds_md_v3, lmm_pool_name[15 + 1]));
+       LASSERTF((int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_pool_name[15 + 1]) == 1, "found %lld\n",
+                (long long)(int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_pool_name[15 + 1]));
        LASSERTF((int)offsetof(struct lov_mds_md_v3, lmm_objects[0]) == 48, "found %lld\n",
                 (long long)(int)offsetof(struct lov_mds_md_v3, lmm_objects[0]));
        LASSERTF((int)sizeof(((struct lov_mds_md_v3 *)0)->lmm_objects[0]) == 24, "found %lld\n",
@@ -1532,10 +1544,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding3));
        LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding3) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding3));
-       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_pool_name[16]) == 56, "found %lld\n",
-                (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_pool_name[16]));
-       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_pool_name[16]) == 1, "found %lld\n",
-                (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_pool_name[16]));
+       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_pool_name[15]) == 55, "found %lld\n",
+                (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_pool_name[15]));
+       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_pool_name[15]) == 1, "found %lld\n",
+                (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_pool_name[15]));
        LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_stripe_fids[0]) == 56, "found %lld\n",
                 (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_stripe_fids[0]));
        LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_stripe_fids[0]) == 16, "found %lld\n",
@@ -3787,7 +3799,9 @@ void lustre_assert_wire_constants(void)
        CLASSERT(LLOG_CHANGELOG_REPL_CTXT == 13);
        CLASSERT(LLOG_CHANGELOG_USER_ORIG_CTXT == 14);
        CLASSERT(LLOG_AGENT_ORIG_CTXT == 15);
-       CLASSERT(LLOG_MAX_CTXTS == 16);
+       CLASSERT(LLOG_UPDATELOG_ORIG_CTXT == 16);
+       CLASSERT(LLOG_UPDATELOG_REPL_CTXT == 17);
+       CLASSERT(LLOG_MAX_CTXTS == 18);
 
        /* Checks for struct llogd_conn_body */
        LASSERTF((int)sizeof(struct llogd_conn_body) == 40, "found %lld\n",
@@ -4728,4 +4742,3 @@ void lustre_assert_wire_constants(void)
        LASSERTF((int)sizeof(((struct lfsck_reply *)0)->lr_padding_2) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct lfsck_reply *)0)->lr_padding_2));
 }
-